lobe-chat

Форк
0
/
fetchSSE.ts 
395 строк · 11.4 Кб
1
import { MESSAGE_CANCEL_FLAT } from '@/const/message';
2
import { LOBE_CHAT_OBSERVATION_ID, LOBE_CHAT_TRACE_ID } from '@/const/trace';
3
import { ChatErrorType } from '@/types/fetch';
4
import { SmoothingParams } from '@/types/llm';
5
import {
6
  ChatMessageError,
7
  MessageToolCall,
8
  MessageToolCallChunk,
9
  MessageToolCallSchema,
10
} from '@/types/message';
11

12
import { fetchEventSource } from './fetchEventSource';
13
import { getMessageError } from './parseError';
14
import { parseToolCalls } from './parseToolCalls';
15

16
type SSEFinishType = 'done' | 'error' | 'abort';
17

18
export type OnFinishHandler = (
19
  text: string,
20
  context: {
21
    observationId?: string | null;
22
    toolCalls?: MessageToolCall[];
23
    traceId?: string | null;
24
    type?: SSEFinishType;
25
  },
26
) => Promise<void>;
27

28
export interface MessageTextChunk {
29
  text: string;
30
  type: 'text';
31
}
32

33
interface MessageToolCallsChunk {
34
  isAnimationActives?: boolean[];
35
  tool_calls: MessageToolCall[];
36
  type: 'tool_calls';
37
}
38

39
export interface FetchSSEOptions {
40
  fetcher?: typeof fetch;
41
  onAbort?: (text: string) => Promise<void>;
42
  onErrorHandle?: (error: ChatMessageError) => void;
43
  onFinish?: OnFinishHandler;
44
  onMessageHandle?: (chunk: MessageTextChunk | MessageToolCallsChunk) => void;
45
  smoothing?: SmoothingParams | boolean;
46
}
47

48
const START_ANIMATION_SPEED = 4;
49

50
const END_ANIMATION_SPEED = 15;
51

52
const createSmoothMessage = (params: {
53
  onTextUpdate: (delta: string, text: string) => void;
54
  startSpeed?: number;
55
}) => {
56
  const { startSpeed = START_ANIMATION_SPEED } = params;
57

58
  let buffer = '';
59
  // why use queue: https://shareg.pt/GLBrjpK
60
  let outputQueue: string[] = [];
61
  let isAnimationActive = false;
62
  let animationFrameId: number | null = null;
63

64
  // when you need to stop the animation, call this function
65
  const stopAnimation = () => {
66
    isAnimationActive = false;
67
    if (animationFrameId !== null) {
68
      cancelAnimationFrame(animationFrameId);
69
      animationFrameId = null;
70
    }
71
  };
72

73
  // define startAnimation function to display the text in buffer smooth
74
  // when you need to start the animation, call this function
75
  const startAnimation = (speed = startSpeed) =>
76
    new Promise<void>((resolve) => {
77
      if (isAnimationActive) {
78
        resolve();
79
        return;
80
      }
81

82
      isAnimationActive = true;
83

84
      const updateText = () => {
85
        // 如果动画已经不再激活,则停止更新文本
86
        if (!isAnimationActive) {
87
          cancelAnimationFrame(animationFrameId!);
88
          animationFrameId = null;
89
          resolve();
90
          return;
91
        }
92

93
        // 如果还有文本没有显示
94
        // 检查队列中是否有字符待显示
95
        if (outputQueue.length > 0) {
96
          // 从队列中获取前 n 个字符(如果存在)
97
          const charsToAdd = outputQueue.splice(0, speed).join('');
98
          buffer += charsToAdd;
99

100
          // 更新消息内容,这里可能需要结合实际情况调整
101
          params.onTextUpdate(charsToAdd, buffer);
102
        } else {
103
          // 当所有字符都显示完毕时,清除动画状态
104
          isAnimationActive = false;
105
          animationFrameId = null;
106
          resolve();
107
          return;
108
        }
109

110
        animationFrameId = requestAnimationFrame(updateText);
111
      };
112

113
      animationFrameId = requestAnimationFrame(updateText);
114
    });
115

116
  const pushToQueue = (text: string) => {
117
    outputQueue.push(...text.split(''));
118
  };
119

120
  return {
121
    isAnimationActive,
122
    isTokenRemain: () => outputQueue.length > 0,
123
    pushToQueue,
124
    startAnimation,
125
    stopAnimation,
126
  };
127
};
128

129
const createSmoothToolCalls = (params: {
130
  onToolCallsUpdate: (toolCalls: MessageToolCall[], isAnimationActives: boolean[]) => void;
131
  startSpeed?: number;
132
}) => {
133
  const { startSpeed = START_ANIMATION_SPEED } = params;
134
  let toolCallsBuffer: MessageToolCall[] = [];
135

136
  // 为每个 tool_call 维护一个输出队列和动画控制器
137

138
  const outputQueues: string[][] = [];
139
  const isAnimationActives: boolean[] = [];
140
  const animationFrameIds: (number | null)[] = [];
141

142
  const stopAnimation = (index: number) => {
143
    isAnimationActives[index] = false;
144
    if (animationFrameIds[index] !== null) {
145
      cancelAnimationFrame(animationFrameIds[index]!);
146
      animationFrameIds[index] = null;
147
    }
148
  };
149

150
  const startAnimation = (index: number, speed = startSpeed) =>
151
    new Promise<void>((resolve) => {
152
      if (isAnimationActives[index]) {
153
        resolve();
154
        return;
155
      }
156

157
      isAnimationActives[index] = true;
158

159
      const updateToolCall = () => {
160
        if (!isAnimationActives[index]) {
161
          resolve();
162
          return;
163
        }
164

165
        if (outputQueues[index].length > 0) {
166
          const charsToAdd = outputQueues[index].splice(0, speed).join('');
167

168
          const toolCallToUpdate = toolCallsBuffer[index];
169

170
          if (toolCallToUpdate) {
171
            toolCallToUpdate.function.arguments += charsToAdd;
172

173
            // 触发 ui 更新
174
            params.onToolCallsUpdate(toolCallsBuffer, [...isAnimationActives]);
175
          }
176

177
          animationFrameIds[index] = requestAnimationFrame(() => updateToolCall());
178
        } else {
179
          isAnimationActives[index] = false;
180
          animationFrameIds[index] = null;
181
          resolve();
182
        }
183
      };
184

185
      animationFrameIds[index] = requestAnimationFrame(() => updateToolCall());
186
    });
187

188
  const pushToQueue = (toolCallChunks: MessageToolCallChunk[]) => {
189
    toolCallChunks.forEach((chunk) => {
190
      // init the tool call buffer and output queue
191
      if (!toolCallsBuffer[chunk.index]) {
192
        toolCallsBuffer[chunk.index] = MessageToolCallSchema.parse(chunk);
193
      }
194

195
      if (!outputQueues[chunk.index]) {
196
        outputQueues[chunk.index] = [];
197
        isAnimationActives[chunk.index] = false;
198
        animationFrameIds[chunk.index] = null;
199
      }
200

201
      outputQueues[chunk.index].push(...(chunk.function?.arguments || '').split(''));
202
    });
203
  };
204

205
  const startAnimations = async (speed = startSpeed) => {
206
    const pools = toolCallsBuffer.map(async (_, index) => {
207
      if (outputQueues[index].length > 0 && !isAnimationActives[index]) {
208
        await startAnimation(index, speed);
209
      }
210
    });
211

212
    await Promise.all(pools);
213
  };
214
  const stopAnimations = () => {
215
    toolCallsBuffer.forEach((_, index) => {
216
      stopAnimation(index);
217
    });
218
  };
219

220
  return {
221
    isAnimationActives,
222
    isTokenRemain: () => outputQueues.some((token) => token.length > 0),
223
    pushToQueue,
224
    startAnimations,
225
    stopAnimations,
226
  };
227
};
228

229
/**
230
 * Fetch data using stream method
231
 */
232
// eslint-disable-next-line no-undef
233
export const fetchSSE = async (url: string, options: RequestInit & FetchSSEOptions = {}) => {
234
  let output = '';
235
  let toolCalls: undefined | MessageToolCall[];
236
  let triggerOnMessageHandler = false;
237

238
  let finishedType: SSEFinishType = 'done';
239
  let response!: Response;
240

241
  const { smoothing } = options;
242

243
  const textSmoothing = typeof smoothing === 'boolean' ? smoothing : smoothing?.text;
244
  const toolsCallingSmoothing =
245
    typeof smoothing === 'boolean' ? smoothing : (smoothing?.toolsCalling ?? true);
246
  const smoothingSpeed = typeof smoothing === 'object' ? smoothing.speed : undefined;
247

248
  const textController = createSmoothMessage({
249
    onTextUpdate: (delta, text) => {
250
      output = text;
251
      options.onMessageHandle?.({ text: delta, type: 'text' });
252
    },
253
    startSpeed: smoothingSpeed,
254
  });
255

256
  const toolCallsController = createSmoothToolCalls({
257
    onToolCallsUpdate: (toolCalls, isAnimationActives) => {
258
      options.onMessageHandle?.({ isAnimationActives, tool_calls: toolCalls, type: 'tool_calls' });
259
    },
260
    startSpeed: smoothingSpeed,
261
  });
262

263
  await fetchEventSource(url, {
264
    body: options.body,
265
    fetch: options?.fetcher,
266
    headers: options.headers as Record<string, string>,
267
    method: options.method,
268
    onerror: (error) => {
269
      if (error === MESSAGE_CANCEL_FLAT || (error as TypeError).name === 'AbortError') {
270
        finishedType = 'abort';
271
        options?.onAbort?.(output);
272
        textController.stopAnimation();
273
      } else {
274
        finishedType = 'error';
275

276
        options.onErrorHandle?.(
277
          error.type
278
            ? error
279
            : {
280
                body: {
281
                  message: error.message,
282
                  name: error.name,
283
                  stack: error.stack,
284
                },
285
                message: error.message,
286
                type: ChatErrorType.UnknownChatFetchError,
287
              },
288
        );
289
        return;
290
      }
291
    },
292
    onmessage: (ev) => {
293
      triggerOnMessageHandler = true;
294
      let data;
295
      try {
296
        data = JSON.parse(ev.data);
297
      } catch (e) {
298
        console.warn('parse error:', e);
299
        options.onErrorHandle?.({
300
          body: {
301
            context: {
302
              chunk: ev.data,
303
              error: { message: (e as Error).message, name: (e as Error).name },
304
            },
305
            message:
306
              'chat response streaming chunk parse error, please contact your API Provider to fix it.',
307
          },
308
          message: 'parse error',
309
          type: 'StreamChunkError',
310
        });
311

312
        return;
313
      }
314

315
      switch (ev.event) {
316
        case 'error': {
317
          finishedType = 'error';
318
          options.onErrorHandle?.(data);
319
          break;
320
        }
321

322
        case 'text': {
323
          if (textSmoothing) {
324
            textController.pushToQueue(data);
325

326
            if (!textController.isAnimationActive) textController.startAnimation();
327
          } else {
328
            output += data;
329
            options.onMessageHandle?.({ text: data, type: 'text' });
330
          }
331

332
          break;
333
        }
334

335
        case 'tool_calls': {
336
          // get finial
337
          // if there is no tool calls, we should initialize the tool calls
338
          if (!toolCalls) toolCalls = [];
339
          toolCalls = parseToolCalls(toolCalls, data);
340

341
          if (toolsCallingSmoothing) {
342
            // make the tool calls smooth
343

344
            // push the tool calls to the smooth queue
345
            toolCallsController.pushToQueue(data);
346
            // if there is no animation active, we should start the animation
347
            if (toolCallsController.isAnimationActives.some((value) => !value)) {
348
              toolCallsController.startAnimations();
349
            }
350
          } else {
351
            options.onMessageHandle?.({ tool_calls: toolCalls, type: 'tool_calls' });
352
          }
353
        }
354
      }
355
    },
356
    onopen: async (res) => {
357
      response = res.clone();
358
      // 如果不 ok 说明有请求错误
359
      if (!response.ok) {
360
        throw await getMessageError(res);
361
      }
362
    },
363
    signal: options.signal,
364
  });
365

366
  // only call onFinish when response is available
367
  // so like abort, we don't need to call onFinish
368
  if (response) {
369
    textController.stopAnimation();
370
    toolCallsController.stopAnimations();
371

372
    if (response.ok) {
373
      // if there is no onMessageHandler, we should call onHandleMessage first
374
      if (!triggerOnMessageHandler) {
375
        output = await response.clone().text();
376
        options.onMessageHandle?.({ text: output, type: 'text' });
377
      }
378

379
      const traceId = response.headers.get(LOBE_CHAT_TRACE_ID);
380
      const observationId = response.headers.get(LOBE_CHAT_OBSERVATION_ID);
381

382
      if (textController.isTokenRemain()) {
383
        await textController.startAnimation(END_ANIMATION_SPEED);
384
      }
385

386
      if (toolCallsController.isTokenRemain()) {
387
        await toolCallsController.startAnimations(END_ANIMATION_SPEED);
388
      }
389

390
      await options?.onFinish?.(output, { observationId, toolCalls, traceId, type: finishedType });
391
    }
392
  }
393

394
  return response;
395
};
396

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.