openai-node

Форк
0
/
streaming.test.ts 
272 строки · 8.4 Кб
1
import { Response } from 'node-fetch';
2
import { PassThrough } from 'stream';
3
import assert from 'assert';
4
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
5

6
describe('line decoder', () => {
7
  test('basic', () => {
8
    // baz is not included because the line hasn't ended yet
9
    expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
10
  });
11

12
  test('basic with \\r', () => {
13
    // baz is not included because the line hasn't ended yet
14
    expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
15
  });
16

17
  test('trailing new lines', () => {
18
    expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
19
  });
20

21
  test('trailing new lines with \\r', () => {
22
    expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
23
  });
24

25
  test('escaped new lines', () => {
26
    expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
27
  });
28

29
  test('escaped new lines with \\r', () => {
30
    expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
31
  });
32
});
33

34
describe('streaming decoding', () => {
35
  test('basic', async () => {
36
    async function* body(): AsyncGenerator<Buffer> {
37
      yield Buffer.from('event: completion\n');
38
      yield Buffer.from('data: {"foo":true}\n');
39
      yield Buffer.from('\n');
40
    }
41

42
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
43
      Symbol.asyncIterator
44
    ]();
45

46
    let event = await stream.next();
47
    assert(event.value);
48
    expect(JSON.parse(event.value.data)).toEqual({ foo: true });
49

50
    event = await stream.next();
51
    expect(event.done).toBeTruthy();
52
  });
53

54
  test('data without event', async () => {
55
    async function* body(): AsyncGenerator<Buffer> {
56
      yield Buffer.from('data: {"foo":true}\n');
57
      yield Buffer.from('\n');
58
    }
59

60
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
61
      Symbol.asyncIterator
62
    ]();
63

64
    let event = await stream.next();
65
    assert(event.value);
66
    expect(event.value.event).toBeNull();
67
    expect(JSON.parse(event.value.data)).toEqual({ foo: true });
68

69
    event = await stream.next();
70
    expect(event.done).toBeTruthy();
71
  });
72

73
  test('event without data', async () => {
74
    async function* body(): AsyncGenerator<Buffer> {
75
      yield Buffer.from('event: foo\n');
76
      yield Buffer.from('\n');
77
    }
78

79
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
80
      Symbol.asyncIterator
81
    ]();
82

83
    let event = await stream.next();
84
    assert(event.value);
85
    expect(event.value.event).toEqual('foo');
86
    expect(event.value.data).toEqual('');
87

88
    event = await stream.next();
89
    expect(event.done).toBeTruthy();
90
  });
91

92
  test('multiple events', async () => {
93
    async function* body(): AsyncGenerator<Buffer> {
94
      yield Buffer.from('event: foo\n');
95
      yield Buffer.from('\n');
96
      yield Buffer.from('event: ping\n');
97
      yield Buffer.from('\n');
98
    }
99

100
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
101
      Symbol.asyncIterator
102
    ]();
103

104
    let event = await stream.next();
105
    assert(event.value);
106
    expect(event.value.event).toEqual('foo');
107
    expect(event.value.data).toEqual('');
108

109
    event = await stream.next();
110
    assert(event.value);
111
    expect(event.value.event).toEqual('ping');
112
    expect(event.value.data).toEqual('');
113

114
    event = await stream.next();
115
    expect(event.done).toBeTruthy();
116
  });
117

118
  test('multiple events with data', async () => {
119
    async function* body(): AsyncGenerator<Buffer> {
120
      yield Buffer.from('event: foo\n');
121
      yield Buffer.from('data: {"foo":true}\n');
122
      yield Buffer.from('\n');
123
      yield Buffer.from('event: ping\n');
124
      yield Buffer.from('data: {"bar":false}\n');
125
      yield Buffer.from('\n');
126
    }
127

128
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
129
      Symbol.asyncIterator
130
    ]();
131

132
    let event = await stream.next();
133
    assert(event.value);
134
    expect(event.value.event).toEqual('foo');
135
    expect(JSON.parse(event.value.data)).toEqual({ foo: true });
136

137
    event = await stream.next();
138
    assert(event.value);
139
    expect(event.value.event).toEqual('ping');
140
    expect(JSON.parse(event.value.data)).toEqual({ bar: false });
141

142
    event = await stream.next();
143
    expect(event.done).toBeTruthy();
144
  });
145

146
  test('multiple data lines with empty line', async () => {
147
    async function* body(): AsyncGenerator<Buffer> {
148
      yield Buffer.from('event: ping\n');
149
      yield Buffer.from('data: {\n');
150
      yield Buffer.from('data: "foo":\n');
151
      yield Buffer.from('data: \n');
152
      yield Buffer.from('data:\n');
153
      yield Buffer.from('data: true}\n');
154
      yield Buffer.from('\n\n');
155
    }
156

157
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
158
      Symbol.asyncIterator
159
    ]();
160

161
    let event = await stream.next();
162
    assert(event.value);
163
    expect(event.value.event).toEqual('ping');
164
    expect(JSON.parse(event.value.data)).toEqual({ foo: true });
165
    expect(event.value.data).toEqual('{\n"foo":\n\n\ntrue}');
166

167
    event = await stream.next();
168
    expect(event.done).toBeTruthy();
169
  });
170

171
  test('data json escaped double new line', async () => {
172
    async function* body(): AsyncGenerator<Buffer> {
173
      yield Buffer.from('event: ping\n');
174
      yield Buffer.from('data: {"foo": "my long\\n\\ncontent"}');
175
      yield Buffer.from('\n\n');
176
    }
177

178
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
179
      Symbol.asyncIterator
180
    ]();
181

182
    let event = await stream.next();
183
    assert(event.value);
184
    expect(event.value.event).toEqual('ping');
185
    expect(JSON.parse(event.value.data)).toEqual({ foo: 'my long\n\ncontent' });
186

187
    event = await stream.next();
188
    expect(event.done).toBeTruthy();
189
  });
190

191
  test('special new line characters', async () => {
192
    async function* body(): AsyncGenerator<Buffer> {
193
      yield Buffer.from('data: {"content": "culpa "}\n');
194
      yield Buffer.from('\n');
195
      yield Buffer.from('data: {"content": "');
196
      yield Buffer.from([0xe2, 0x80, 0xa8]);
197
      yield Buffer.from('"}\n');
198
      yield Buffer.from('\n');
199
      yield Buffer.from('data: {"content": "foo"}\n');
200
      yield Buffer.from('\n');
201
    }
202

203
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
204
      Symbol.asyncIterator
205
    ]();
206

207
    let event = await stream.next();
208
    assert(event.value);
209
    expect(JSON.parse(event.value.data)).toEqual({ content: 'culpa ' });
210

211
    event = await stream.next();
212
    assert(event.value);
213
    expect(JSON.parse(event.value.data)).toEqual({ content: Buffer.from([0xe2, 0x80, 0xa8]).toString() });
214

215
    event = await stream.next();
216
    assert(event.value);
217
    expect(JSON.parse(event.value.data)).toEqual({ content: 'foo' });
218

219
    event = await stream.next();
220
    expect(event.done).toBeTruthy();
221
  });
222

223
  test('multi-byte characters across chunks', async () => {
224
    async function* body(): AsyncGenerator<Buffer> {
225
      yield Buffer.from('event: completion\n');
226
      yield Buffer.from('data: {"content": "');
227
      // bytes taken from the string 'известни' and arbitrarily split
228
      // so that some multi-byte characters span multiple chunks
229
      yield Buffer.from([0xd0]);
230
      yield Buffer.from([0xb8, 0xd0, 0xb7, 0xd0]);
231
      yield Buffer.from([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8]);
232
      yield Buffer.from('"}\n');
233
      yield Buffer.from('\n');
234
    }
235

236
    const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
237
      Symbol.asyncIterator
238
    ]();
239

240
    let event = await stream.next();
241
    assert(event.value);
242
    expect(event.value.event).toEqual('completion');
243
    expect(JSON.parse(event.value.data)).toEqual({ content: 'известни' });
244

245
    event = await stream.next();
246
    expect(event.done).toBeTruthy();
247
  });
248
});
249

250
async function iteratorToStream(iterator: AsyncGenerator<any>): Promise<PassThrough> {
251
  const parts: unknown[] = [];
252

253
  for await (const chunk of iterator) {
254
    parts.push(chunk);
255
  }
256

257
  let index = 0;
258

259
  const stream = new PassThrough({
260
    read() {
261
      const value = parts[index];
262
      if (value === undefined) {
263
        stream.end();
264
      } else {
265
        index += 1;
266
        stream.write(value);
267
      }
268
    },
269
  });
270

271
  return stream;
272
}
273

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.