openai-node
272 строки · 8.4 Кб
1import { Response } from 'node-fetch';2import { PassThrough } from 'stream';3import assert from 'assert';4import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';5
6describe('line decoder', () => {7test('basic', () => {8// baz is not included because the line hasn't ended yet9expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);10});11
12test('basic with \\r', () => {13// baz is not included because the line hasn't ended yet14expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);15});16
17test('trailing new lines', () => {18expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);19});20
21test('trailing new lines with \\r', () => {22expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);23});24
25test('escaped new lines', () => {26expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);27});28
29test('escaped new lines with \\r', () => {30expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);31});32});33
34describe('streaming decoding', () => {35test('basic', async () => {36async function* body(): AsyncGenerator<Buffer> {37yield Buffer.from('event: completion\n');38yield Buffer.from('data: {"foo":true}\n');39yield Buffer.from('\n');40}41
42const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[43Symbol.asyncIterator44]();45
46let event = await stream.next();47assert(event.value);48expect(JSON.parse(event.value.data)).toEqual({ foo: true });49
50event = await stream.next();51expect(event.done).toBeTruthy();52});53
54test('data without event', async () => {55async function* body(): AsyncGenerator<Buffer> {56yield Buffer.from('data: {"foo":true}\n');57yield Buffer.from('\n');58}59
60const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[61Symbol.asyncIterator62]();63
64let event = await stream.next();65assert(event.value);66expect(event.value.event).toBeNull();67expect(JSON.parse(event.value.data)).toEqual({ foo: true });68
69event = await stream.next();70expect(event.done).toBeTruthy();71});72
73test('event without data', async () => {74async function* body(): AsyncGenerator<Buffer> {75yield Buffer.from('event: foo\n');76yield Buffer.from('\n');77}78
79const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[80Symbol.asyncIterator81]();82
83let event = await stream.next();84assert(event.value);85expect(event.value.event).toEqual('foo');86expect(event.value.data).toEqual('');87
88event = await stream.next();89expect(event.done).toBeTruthy();90});91
92test('multiple events', async () => {93async function* body(): AsyncGenerator<Buffer> {94yield Buffer.from('event: foo\n');95yield Buffer.from('\n');96yield Buffer.from('event: ping\n');97yield Buffer.from('\n');98}99
100const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[101Symbol.asyncIterator102]();103
104let event = await stream.next();105assert(event.value);106expect(event.value.event).toEqual('foo');107expect(event.value.data).toEqual('');108
109event = await stream.next();110assert(event.value);111expect(event.value.event).toEqual('ping');112expect(event.value.data).toEqual('');113
114event = await stream.next();115expect(event.done).toBeTruthy();116});117
118test('multiple events with data', async () => {119async function* body(): AsyncGenerator<Buffer> {120yield Buffer.from('event: foo\n');121yield Buffer.from('data: {"foo":true}\n');122yield Buffer.from('\n');123yield Buffer.from('event: ping\n');124yield Buffer.from('data: {"bar":false}\n');125yield Buffer.from('\n');126}127
128const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[129Symbol.asyncIterator130]();131
132let event = await stream.next();133assert(event.value);134expect(event.value.event).toEqual('foo');135expect(JSON.parse(event.value.data)).toEqual({ foo: true });136
137event = await stream.next();138assert(event.value);139expect(event.value.event).toEqual('ping');140expect(JSON.parse(event.value.data)).toEqual({ bar: false });141
142event = await stream.next();143expect(event.done).toBeTruthy();144});145
146test('multiple data lines with empty line', async () => {147async function* body(): AsyncGenerator<Buffer> {148yield Buffer.from('event: ping\n');149yield Buffer.from('data: {\n');150yield Buffer.from('data: "foo":\n');151yield Buffer.from('data: \n');152yield Buffer.from('data:\n');153yield Buffer.from('data: true}\n');154yield Buffer.from('\n\n');155}156
157const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[158Symbol.asyncIterator159]();160
161let event = await stream.next();162assert(event.value);163expect(event.value.event).toEqual('ping');164expect(JSON.parse(event.value.data)).toEqual({ foo: true });165expect(event.value.data).toEqual('{\n"foo":\n\n\ntrue}');166
167event = await stream.next();168expect(event.done).toBeTruthy();169});170
171test('data json escaped double new line', async () => {172async function* body(): AsyncGenerator<Buffer> {173yield Buffer.from('event: ping\n');174yield Buffer.from('data: {"foo": "my long\\n\\ncontent"}');175yield Buffer.from('\n\n');176}177
178const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[179Symbol.asyncIterator180]();181
182let event = await stream.next();183assert(event.value);184expect(event.value.event).toEqual('ping');185expect(JSON.parse(event.value.data)).toEqual({ foo: 'my long\n\ncontent' });186
187event = await stream.next();188expect(event.done).toBeTruthy();189});190
191test('special new line characters', async () => {192async function* body(): AsyncGenerator<Buffer> {193yield Buffer.from('data: {"content": "culpa "}\n');194yield Buffer.from('\n');195yield Buffer.from('data: {"content": "');196yield Buffer.from([0xe2, 0x80, 0xa8]);197yield Buffer.from('"}\n');198yield Buffer.from('\n');199yield Buffer.from('data: {"content": "foo"}\n');200yield Buffer.from('\n');201}202
203const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[204Symbol.asyncIterator205]();206
207let event = await stream.next();208assert(event.value);209expect(JSON.parse(event.value.data)).toEqual({ content: 'culpa ' });210
211event = await stream.next();212assert(event.value);213expect(JSON.parse(event.value.data)).toEqual({ content: Buffer.from([0xe2, 0x80, 0xa8]).toString() });214
215event = await stream.next();216assert(event.value);217expect(JSON.parse(event.value.data)).toEqual({ content: 'foo' });218
219event = await stream.next();220expect(event.done).toBeTruthy();221});222
223test('multi-byte characters across chunks', async () => {224async function* body(): AsyncGenerator<Buffer> {225yield Buffer.from('event: completion\n');226yield Buffer.from('data: {"content": "');227// bytes taken from the string 'известни' and arbitrarily split228// so that some multi-byte characters span multiple chunks229yield Buffer.from([0xd0]);230yield Buffer.from([0xb8, 0xd0, 0xb7, 0xd0]);231yield Buffer.from([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8]);232yield Buffer.from('"}\n');233yield Buffer.from('\n');234}235
236const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[237Symbol.asyncIterator238]();239
240let event = await stream.next();241assert(event.value);242expect(event.value.event).toEqual('completion');243expect(JSON.parse(event.value.data)).toEqual({ content: 'известни' });244
245event = await stream.next();246expect(event.done).toBeTruthy();247});248});249
250async function iteratorToStream(iterator: AsyncGenerator<any>): Promise<PassThrough> {251const parts: unknown[] = [];252
253for await (const chunk of iterator) {254parts.push(chunk);255}256
257let index = 0;258
259const stream = new PassThrough({260read() {261const value = parts[index];262if (value === undefined) {263stream.end();264} else {265index += 1;266stream.write(value);267}268},269});270
271return stream;272}
273