lobe-chat

Форк
0
321 строка · 9.3 Кб
1
import Debug from 'debug';
2
import { throttle, uniqBy } from 'lodash-es';
3
import type { WebrtcProvider } from 'y-webrtc';
4
import type { Doc, Transaction } from 'yjs';
5

6
import {
7
  OnAwarenessChange,
8
  OnSyncEvent,
9
  OnSyncStatusChange,
10
  PeerSyncStatus,
11
  StartDataSyncParams,
12
} from '@/types/sync';
13

14
import { LobeDBSchemaMap, browserDB } from './db';
15

16
const LOG_NAME_SPACE = 'DataSync';
17

18
class DataSync {
19
  private _ydoc: Doc | null = null;
20
  private provider: WebrtcProvider | null = null;
21

22
  private syncParams!: StartDataSyncParams;
23
  private onAwarenessChange!: OnAwarenessChange;
24

25
  private waitForConnecting: any;
26

27
  logger = Debug(LOG_NAME_SPACE);
28

29
  transact(fn: (transaction: Transaction) => unknown) {
30
    this._ydoc?.transact(fn);
31
  }
32

33
  getYMap = (tableKey: keyof LobeDBSchemaMap) => {
34
    return this._ydoc?.getMap(tableKey);
35
  };
36

37
  startDataSync = async (params: StartDataSyncParams) => {
38
    this.syncParams = params;
39
    this.onAwarenessChange = params.onAwarenessChange;
40

41
    // 开发时由于存在 fast refresh 全局实例会缓存在运行时中
42
    // 因此需要在每次重新连接时清理上一次的实例
43
    if (window.__ONLY_USE_FOR_CLEANUP_IN_DEV) {
44
      await this.cleanConnection(window.__ONLY_USE_FOR_CLEANUP_IN_DEV);
45
    }
46

47
    await this.connect(params);
48
  };
49

50
  connect = async (params: StartDataSyncParams) => {
51
    const { channel, onSyncEvent, onSyncStatusChange, user, onAwarenessChange, signaling } = params;
52
    // ====== 1. init yjs doc ====== //
53

54
    await this.initYDoc();
55

56
    this.logger('[YJS] start to listen sync event...');
57
    this.initYjsObserve(onSyncEvent, onSyncStatusChange);
58

59
    // ====== 2. init webrtc provider ====== //
60
    this.logger(`[WebRTC] init provider... room: ${channel.name}`);
61
    const { WebrtcProvider } = await import('y-webrtc');
62

63
    // clients connected to the same room-name share document updates
64
    this.provider = new WebrtcProvider(channel.name, this._ydoc!, {
65
      password: channel.password,
66
      signaling: [signaling],
67
    });
68

69
    // when fast refresh in dev, the provider will be cached in window
70
    // so we need to clean it in destory
71
    if (process.env.NODE_ENV === 'development') {
72
      window.__ONLY_USE_FOR_CLEANUP_IN_DEV = this.provider;
73
    }
74

75
    this.logger(`[WebRTC] provider init success`);
76

77
    // ====== 3. check signaling server connection  ====== //
78

79
    // 当本地设备正确连接到 WebRTC Provider 后,触发 status 事件
80
    // 当开始连接,则开始监听事件
81
    this.provider.on('status', async ({ connected }) => {
82
      this.logger('[WebRTC] peer status:', connected);
83
      if (connected) {
84
        // this.initObserve(onSyncEvent, onSyncStatusChange);
85
        onSyncStatusChange?.(PeerSyncStatus.Connecting);
86
      }
87
    });
88

89
    // check the connection with signaling server
90
    let connectionCheckCount = 0;
91

92
    this.waitForConnecting = setInterval(() => {
93
      const signalingConnection: IWebsocketClient = this.provider!.signalingConns[0];
94

95
      if (signalingConnection.connected) {
96
        onSyncStatusChange?.(PeerSyncStatus.Ready);
97
        clearInterval(this.waitForConnecting);
98
        return;
99
      }
100

101
      connectionCheckCount += 1;
102

103
      // check for 5 times, or make it failed
104
      if (connectionCheckCount > 5) {
105
        onSyncStatusChange?.(PeerSyncStatus.Unconnected);
106
        clearInterval(this.waitForConnecting);
107
      }
108
    }, 2000);
109

110
    // ====== 4. handle data sync  ====== //
111

112
    // 当各方的数据均完成同步后,YJS 对象之间的数据已经一致时,触发 synced 事件
113
    this.provider.on('synced', async ({ synced }) => {
114
      this.logger('[WebRTC] peer sync status:', synced);
115
      if (synced) {
116
        this.logger('[WebRTC] start to init yjs data...');
117
        onSyncStatusChange?.(PeerSyncStatus.Syncing);
118
        await this.initSync();
119
        onSyncStatusChange?.(PeerSyncStatus.Synced);
120
        this.logger('[WebRTC] yjs data init success');
121
      } else {
122
        this.logger('[WebRTC] data not sync, try to reconnect in 1s...');
123
        // await this.reconnect(params);
124
        setTimeout(() => {
125
          onSyncStatusChange?.(PeerSyncStatus.Syncing);
126
          this.reconnect(params);
127
        }, 1000);
128
      }
129
    });
130

131
    // ====== 5. handle awareness  ====== //
132

133
    this.initAwareness({ onAwarenessChange, user });
134

135
    return this.provider;
136
  };
137

138
  reconnect = async (params: StartDataSyncParams) => {
139
    await this.cleanConnection(this.provider);
140

141
    await this.connect(params);
142
  };
143

144
  async disconnect() {
145
    await this.cleanConnection(this.provider);
146
  }
147

148
  private initYDoc = async () => {
149
    if (typeof window === 'undefined') return;
150

151
    this.logger('[YJS] init YDoc...');
152
    const { Doc } = await import('yjs');
153
    this._ydoc = new Doc();
154
  };
155

156
  private async cleanConnection(provider: WebrtcProvider | null) {
157
    if (provider) {
158
      this.logger(`[WebRTC] clean Connection...`);
159
      this.logger(`[WebRTC] clean awareness...`);
160
      provider.awareness.destroy();
161

162
      this.logger(`[WebRTC] clean room...`);
163
      provider.room?.disconnect();
164
      provider.room?.destroy();
165

166
      this.logger(`[WebRTC] clean provider...`);
167
      provider.disconnect();
168
      provider.destroy();
169

170
      this.logger(`[WebRTC] clean yjs doc...`);
171
      this._ydoc?.destroy();
172

173
      this.logger(`[WebRTC] -------------------`);
174
    }
175
  }
176

177
  private initSync = async () => {
178
    await Promise.all(
179
      ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].map(async (tableKey) =>
180
        this.loadDataFromDBtoYjs(tableKey as keyof LobeDBSchemaMap),
181
      ),
182
    );
183
  };
184

185
  private initYjsObserve = (onEvent: OnSyncEvent, onSyncStatusChange: OnSyncStatusChange) => {
186
    ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].forEach((tableKey) => {
187
      // listen yjs change
188
      this.observeYMapChange(tableKey as keyof LobeDBSchemaMap, onEvent, onSyncStatusChange);
189
    });
190
  };
191

192
  private observeYMapChange = (
193
    tableKey: keyof LobeDBSchemaMap,
194
    onEvent: OnSyncEvent,
195
    onSyncStatusChange: OnSyncStatusChange,
196
  ) => {
197
    const table = browserDB[tableKey];
198
    const yItemMap = this.getYMap(tableKey);
199
    const updateSyncEvent = throttle(onEvent, 1000);
200

201
    // 定义一个变量来保存定时器的ID
202
    // eslint-disable-next-line no-undef
203
    let debounceTimer: NodeJS.Timeout;
204

205
    yItemMap?.observe(async (event) => {
206
      // abort local change
207
      if (event.transaction.local) return;
208

209
      // 每次有变更时,都先清除之前的定时器(如果有的话),然后设置新的定时器
210
      clearTimeout(debounceTimer);
211

212
      onSyncStatusChange(PeerSyncStatus.Syncing);
213

214
      this.logger(`[YJS] observe ${tableKey} changes:`, event.keysChanged.size);
215
      const pools = Array.from(event.keys).map(async ([id, payload]) => {
216
        const item: any = yItemMap.get(id);
217

218
        switch (payload.action) {
219
          case 'add':
220
          case 'update': {
221
            await table.put(item, id);
222

223
            break;
224
          }
225

226
          case 'delete': {
227
            await table.delete(id);
228
            break;
229
          }
230
        }
231
      });
232

233
      await Promise.all(pools);
234

235
      updateSyncEvent(tableKey);
236

237
      // 设置定时器,2000ms 后更新状态为'synced'
238
      debounceTimer = setTimeout(() => {
239
        onSyncStatusChange(PeerSyncStatus.Synced);
240
      }, 2000);
241
    });
242
  };
243

244
  private loadDataFromDBtoYjs = async (tableKey: keyof LobeDBSchemaMap) => {
245
    const table = browserDB[tableKey];
246
    const items = await table.toArray();
247
    const yItemMap = this.getYMap(tableKey);
248

249
    // 定义每批次最多包含的数据条数
250
    const batchSize = 50;
251

252
    // 计算总批次数
253
    const totalBatches = Math.ceil(items.length / batchSize);
254

255
    for (let i = 0; i < totalBatches; i++) {
256
      // 计算当前批次的起始和结束索引
257
      const start = i * batchSize;
258
      const end = start + batchSize;
259

260
      // 获取当前批次的数据
261
      const batchItems = items.slice(start, end);
262

263
      // 将当前批次的数据推送到 Yjs 中
264
      this._ydoc?.transact(() => {
265
        batchItems.forEach((item) => {
266
          yItemMap!.set(item.id, item);
267
        });
268
      });
269
    }
270

271
    this.logger('[DB]:', tableKey, yItemMap?.size);
272
  };
273

274
  private initAwareness = ({ user }: Pick<StartDataSyncParams, 'user' | 'onAwarenessChange'>) => {
275
    if (!this.provider) return;
276

277
    const awareness = this.provider.awareness;
278

279
    awareness.setLocalState({ clientID: awareness.clientID, user });
280
    this.onAwarenessChange?.([{ ...user, clientID: awareness.clientID, current: true }]);
281

282
    awareness.on('change', () => this.syncAwarenessToUI());
283
  };
284

285
  private syncAwarenessToUI = async () => {
286
    const awareness = this.provider?.awareness;
287

288
    if (!awareness) return;
289

290
    const state = Array.from(awareness.getStates().values()).map((s) => ({
291
      ...s.user,
292
      clientID: s.clientID,
293
      current: s.clientID === awareness.clientID,
294
    }));
295

296
    this.onAwarenessChange?.(uniqBy(state, 'id'));
297
  };
298
}
299

300
export const dataSync = new DataSync();
301

302
interface IWebsocketClient {
303
  binaryType: 'arraybuffer' | 'blob' | null;
304
  connect(): void;
305
  connected: boolean;
306
  connecting: boolean;
307
  destroy(): void;
308
  disconnect(): void;
309
  lastMessageReceived: number;
310
  send(message: any): void;
311
  shouldConnect: boolean;
312
  unsuccessfulReconnects: number;
313
  url: string;
314
  ws: WebSocket;
315
}
316

317
declare global {
318
  interface Window {
319
    __ONLY_USE_FOR_CLEANUP_IN_DEV?: WebrtcProvider | null;
320
  }
321
}
322

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.