1
import { MESSAGE_CANCEL_FLAT } from '@/const/message';
2
import { LOBE_CHAT_OBSERVATION_ID, LOBE_CHAT_TRACE_ID } from '@/const/trace';
3
import { ChatErrorType } from '@/types/fetch';
4
import { SmoothingParams } from '@/types/llm';
10
} from '@/types/message';
12
import { fetchEventSource } from './fetchEventSource';
13
import { getMessageError } from './parseError';
14
import { parseToolCalls } from './parseToolCalls';
16
type SSEFinishType = 'done' | 'error' | 'abort';
18
export type OnFinishHandler = (
21
observationId?: string | null;
22
toolCalls?: MessageToolCall[];
23
traceId?: string | null;
28
export interface MessageTextChunk {
33
interface MessageToolCallsChunk {
34
isAnimationActives?: boolean[];
35
tool_calls: MessageToolCall[];
39
export interface FetchSSEOptions {
40
fetcher?: typeof fetch;
41
onAbort?: (text: string) => Promise<void>;
42
onErrorHandle?: (error: ChatMessageError) => void;
43
onFinish?: OnFinishHandler;
44
onMessageHandle?: (chunk: MessageTextChunk | MessageToolCallsChunk) => void;
45
smoothing?: SmoothingParams | boolean;
48
const START_ANIMATION_SPEED = 4;
50
const END_ANIMATION_SPEED = 15;
52
const createSmoothMessage = (params: {
53
onTextUpdate: (delta: string, text: string) => void;
56
const { startSpeed = START_ANIMATION_SPEED } = params;
60
let outputQueue: string[] = [];
61
let isAnimationActive = false;
62
let animationFrameId: number | null = null;
65
const stopAnimation = () => {
66
isAnimationActive = false;
67
if (animationFrameId !== null) {
68
cancelAnimationFrame(animationFrameId);
69
animationFrameId = null;
75
const startAnimation = (speed = startSpeed) =>
76
new Promise<void>((resolve) => {
77
if (isAnimationActive) {
82
isAnimationActive = true;
84
const updateText = () => {
86
if (!isAnimationActive) {
87
cancelAnimationFrame(animationFrameId!);
88
animationFrameId = null;
95
if (outputQueue.length > 0) {
97
const charsToAdd = outputQueue.splice(0, speed).join('');
101
params.onTextUpdate(charsToAdd, buffer);
104
isAnimationActive = false;
105
animationFrameId = null;
110
animationFrameId = requestAnimationFrame(updateText);
113
animationFrameId = requestAnimationFrame(updateText);
116
const pushToQueue = (text: string) => {
117
outputQueue.push(...text.split(''));
122
isTokenRemain: () => outputQueue.length > 0,
129
const createSmoothToolCalls = (params: {
130
onToolCallsUpdate: (toolCalls: MessageToolCall[], isAnimationActives: boolean[]) => void;
133
const { startSpeed = START_ANIMATION_SPEED } = params;
134
let toolCallsBuffer: MessageToolCall[] = [];
138
const outputQueues: string[][] = [];
139
const isAnimationActives: boolean[] = [];
140
const animationFrameIds: (number | null)[] = [];
142
const stopAnimation = (index: number) => {
143
isAnimationActives[index] = false;
144
if (animationFrameIds[index] !== null) {
145
cancelAnimationFrame(animationFrameIds[index]!);
146
animationFrameIds[index] = null;
150
const startAnimation = (index: number, speed = startSpeed) =>
151
new Promise<void>((resolve) => {
152
if (isAnimationActives[index]) {
157
isAnimationActives[index] = true;
159
const updateToolCall = () => {
160
if (!isAnimationActives[index]) {
165
if (outputQueues[index].length > 0) {
166
const charsToAdd = outputQueues[index].splice(0, speed).join('');
168
const toolCallToUpdate = toolCallsBuffer[index];
170
if (toolCallToUpdate) {
171
toolCallToUpdate.function.arguments += charsToAdd;
174
params.onToolCallsUpdate(toolCallsBuffer, [...isAnimationActives]);
177
animationFrameIds[index] = requestAnimationFrame(() => updateToolCall());
179
isAnimationActives[index] = false;
180
animationFrameIds[index] = null;
185
animationFrameIds[index] = requestAnimationFrame(() => updateToolCall());
188
const pushToQueue = (toolCallChunks: MessageToolCallChunk[]) => {
189
toolCallChunks.forEach((chunk) => {
191
if (!toolCallsBuffer[chunk.index]) {
192
toolCallsBuffer[chunk.index] = MessageToolCallSchema.parse(chunk);
195
if (!outputQueues[chunk.index]) {
196
outputQueues[chunk.index] = [];
197
isAnimationActives[chunk.index] = false;
198
animationFrameIds[chunk.index] = null;
201
outputQueues[chunk.index].push(...(chunk.function?.arguments || '').split(''));
205
const startAnimations = async (speed = startSpeed) => {
206
const pools = toolCallsBuffer.map(async (_, index) => {
207
if (outputQueues[index].length > 0 && !isAnimationActives[index]) {
208
await startAnimation(index, speed);
212
await Promise.all(pools);
214
const stopAnimations = () => {
215
toolCallsBuffer.forEach((_, index) => {
216
stopAnimation(index);
222
isTokenRemain: () => outputQueues.some((token) => token.length > 0),
233
export const fetchSSE = async (url: string, options: RequestInit & FetchSSEOptions = {}) => {
235
let toolCalls: undefined | MessageToolCall[];
236
let triggerOnMessageHandler = false;
238
let finishedType: SSEFinishType = 'done';
239
let response!: Response;
241
const { smoothing } = options;
243
const textSmoothing = typeof smoothing === 'boolean' ? smoothing : smoothing?.text;
244
const toolsCallingSmoothing =
245
typeof smoothing === 'boolean' ? smoothing : (smoothing?.toolsCalling ?? true);
246
const smoothingSpeed = typeof smoothing === 'object' ? smoothing.speed : undefined;
248
const textController = createSmoothMessage({
249
onTextUpdate: (delta, text) => {
251
options.onMessageHandle?.({ text: delta, type: 'text' });
253
startSpeed: smoothingSpeed,
256
const toolCallsController = createSmoothToolCalls({
257
onToolCallsUpdate: (toolCalls, isAnimationActives) => {
258
options.onMessageHandle?.({ isAnimationActives, tool_calls: toolCalls, type: 'tool_calls' });
260
startSpeed: smoothingSpeed,
263
await fetchEventSource(url, {
265
fetch: options?.fetcher,
266
headers: options.headers as Record<string, string>,
267
method: options.method,
268
onerror: (error) => {
269
if (error === MESSAGE_CANCEL_FLAT || (error as TypeError).name === 'AbortError') {
270
finishedType = 'abort';
271
options?.onAbort?.(output);
272
textController.stopAnimation();
274
finishedType = 'error';
276
options.onErrorHandle?.(
281
message: error.message,
285
message: error.message,
286
type: ChatErrorType.UnknownChatFetchError,
293
triggerOnMessageHandler = true;
296
data = JSON.parse(ev.data);
298
console.warn('parse error:', e);
299
options.onErrorHandle?.({
303
error: { message: (e as Error).message, name: (e as Error).name },
306
'chat response streaming chunk parse error, please contact your API Provider to fix it.',
308
message: 'parse error',
309
type: 'StreamChunkError',
317
finishedType = 'error';
318
options.onErrorHandle?.(data);
324
textController.pushToQueue(data);
326
if (!textController.isAnimationActive) textController.startAnimation();
329
options.onMessageHandle?.({ text: data, type: 'text' });
338
if (!toolCalls) toolCalls = [];
339
toolCalls = parseToolCalls(toolCalls, data);
341
if (toolsCallingSmoothing) {
345
toolCallsController.pushToQueue(data);
347
if (toolCallsController.isAnimationActives.some((value) => !value)) {
348
toolCallsController.startAnimations();
351
options.onMessageHandle?.({ tool_calls: toolCalls, type: 'tool_calls' });
356
onopen: async (res) => {
357
response = res.clone();
360
throw await getMessageError(res);
363
signal: options.signal,
369
textController.stopAnimation();
370
toolCallsController.stopAnimations();
374
if (!triggerOnMessageHandler) {
375
output = await response.clone().text();
376
options.onMessageHandle?.({ text: output, type: 'text' });
379
const traceId = response.headers.get(LOBE_CHAT_TRACE_ID);
380
const observationId = response.headers.get(LOBE_CHAT_OBSERVATION_ID);
382
if (textController.isTokenRemain()) {
383
await textController.startAnimation(END_ANIMATION_SPEED);
386
if (toolCallsController.isTokenRemain()) {
387
await toolCallsController.startAnimations(END_ANIMATION_SPEED);
390
await options?.onFinish?.(output, { observationId, toolCalls, traceId, type: finishedType });