1
import Debug from 'debug';
2
import { throttle, uniqBy } from 'lodash-es';
3
import type { WebrtcProvider } from 'y-webrtc';
4
import type { Doc, Transaction } from 'yjs';
14
import { LobeDBSchemaMap, browserDB } from './db';
16
const LOG_NAME_SPACE = 'DataSync';
19
private _ydoc: Doc | null = null;
20
private provider: WebrtcProvider | null = null;
22
private syncParams!: StartDataSyncParams;
23
private onAwarenessChange!: OnAwarenessChange;
25
private waitForConnecting: any;
27
logger = Debug(LOG_NAME_SPACE);
29
transact(fn: (transaction: Transaction) => unknown) {
30
this._ydoc?.transact(fn);
33
getYMap = (tableKey: keyof LobeDBSchemaMap) => {
34
return this._ydoc?.getMap(tableKey);
37
startDataSync = async (params: StartDataSyncParams) => {
38
this.syncParams = params;
39
this.onAwarenessChange = params.onAwarenessChange;
41
// 开发时由于存在 fast refresh 全局实例会缓存在运行时中
42
// 因此需要在每次重新连接时清理上一次的实例
43
if (window.__ONLY_USE_FOR_CLEANUP_IN_DEV) {
44
await this.cleanConnection(window.__ONLY_USE_FOR_CLEANUP_IN_DEV);
47
await this.connect(params);
50
connect = async (params: StartDataSyncParams) => {
51
const { channel, onSyncEvent, onSyncStatusChange, user, onAwarenessChange, signaling } = params;
52
// ====== 1. init yjs doc ====== //
54
await this.initYDoc();
56
this.logger('[YJS] start to listen sync event...');
57
this.initYjsObserve(onSyncEvent, onSyncStatusChange);
59
// ====== 2. init webrtc provider ====== //
60
this.logger(`[WebRTC] init provider... room: ${channel.name}`);
61
const { WebrtcProvider } = await import('y-webrtc');
63
// clients connected to the same room-name share document updates
64
this.provider = new WebrtcProvider(channel.name, this._ydoc!, {
65
password: channel.password,
66
signaling: [signaling],
69
// when fast refresh in dev, the provider will be cached in window
70
// so we need to clean it in destory
71
if (process.env.NODE_ENV === 'development') {
72
window.__ONLY_USE_FOR_CLEANUP_IN_DEV = this.provider;
75
this.logger(`[WebRTC] provider init success`);
77
// ====== 3. check signaling server connection ====== //
79
// 当本地设备正确连接到 WebRTC Provider 后,触发 status 事件
81
this.provider.on('status', async ({ connected }) => {
82
this.logger('[WebRTC] peer status:', connected);
84
// this.initObserve(onSyncEvent, onSyncStatusChange);
85
onSyncStatusChange?.(PeerSyncStatus.Connecting);
89
// check the connection with signaling server
90
let connectionCheckCount = 0;
92
this.waitForConnecting = setInterval(() => {
93
const signalingConnection: IWebsocketClient = this.provider!.signalingConns[0];
95
if (signalingConnection.connected) {
96
onSyncStatusChange?.(PeerSyncStatus.Ready);
97
clearInterval(this.waitForConnecting);
101
connectionCheckCount += 1;
103
// check for 5 times, or make it failed
104
if (connectionCheckCount > 5) {
105
onSyncStatusChange?.(PeerSyncStatus.Unconnected);
106
clearInterval(this.waitForConnecting);
110
// ====== 4. handle data sync ====== //
112
// 当各方的数据均完成同步后,YJS 对象之间的数据已经一致时,触发 synced 事件
113
this.provider.on('synced', async ({ synced }) => {
114
this.logger('[WebRTC] peer sync status:', synced);
116
this.logger('[WebRTC] start to init yjs data...');
117
onSyncStatusChange?.(PeerSyncStatus.Syncing);
118
await this.initSync();
119
onSyncStatusChange?.(PeerSyncStatus.Synced);
120
this.logger('[WebRTC] yjs data init success');
122
this.logger('[WebRTC] data not sync, try to reconnect in 1s...');
123
// await this.reconnect(params);
125
onSyncStatusChange?.(PeerSyncStatus.Syncing);
126
this.reconnect(params);
131
// ====== 5. handle awareness ====== //
133
this.initAwareness({ onAwarenessChange, user });
135
return this.provider;
138
reconnect = async (params: StartDataSyncParams) => {
139
await this.cleanConnection(this.provider);
141
await this.connect(params);
145
await this.cleanConnection(this.provider);
148
private initYDoc = async () => {
149
if (typeof window === 'undefined') return;
151
this.logger('[YJS] init YDoc...');
152
const { Doc } = await import('yjs');
153
this._ydoc = new Doc();
156
private async cleanConnection(provider: WebrtcProvider | null) {
158
this.logger(`[WebRTC] clean Connection...`);
159
this.logger(`[WebRTC] clean awareness...`);
160
provider.awareness.destroy();
162
this.logger(`[WebRTC] clean room...`);
163
provider.room?.disconnect();
164
provider.room?.destroy();
166
this.logger(`[WebRTC] clean provider...`);
167
provider.disconnect();
170
this.logger(`[WebRTC] clean yjs doc...`);
171
this._ydoc?.destroy();
173
this.logger(`[WebRTC] -------------------`);
177
private initSync = async () => {
179
['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].map(async (tableKey) =>
180
this.loadDataFromDBtoYjs(tableKey as keyof LobeDBSchemaMap),
185
private initYjsObserve = (onEvent: OnSyncEvent, onSyncStatusChange: OnSyncStatusChange) => {
186
['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].forEach((tableKey) => {
188
this.observeYMapChange(tableKey as keyof LobeDBSchemaMap, onEvent, onSyncStatusChange);
192
private observeYMapChange = (
193
tableKey: keyof LobeDBSchemaMap,
194
onEvent: OnSyncEvent,
195
onSyncStatusChange: OnSyncStatusChange,
197
const table = browserDB[tableKey];
198
const yItemMap = this.getYMap(tableKey);
199
const updateSyncEvent = throttle(onEvent, 1000);
202
// eslint-disable-next-line no-undef
203
let debounceTimer: NodeJS.Timeout;
205
yItemMap?.observe(async (event) => {
206
// abort local change
207
if (event.transaction.local) return;
209
// 每次有变更时,都先清除之前的定时器(如果有的话),然后设置新的定时器
210
clearTimeout(debounceTimer);
212
onSyncStatusChange(PeerSyncStatus.Syncing);
214
this.logger(`[YJS] observe ${tableKey} changes:`, event.keysChanged.size);
215
const pools = Array.from(event.keys).map(async ([id, payload]) => {
216
const item: any = yItemMap.get(id);
218
switch (payload.action) {
221
await table.put(item, id);
227
await table.delete(id);
233
await Promise.all(pools);
235
updateSyncEvent(tableKey);
237
// 设置定时器,2000ms 后更新状态为'synced'
238
debounceTimer = setTimeout(() => {
239
onSyncStatusChange(PeerSyncStatus.Synced);
244
private loadDataFromDBtoYjs = async (tableKey: keyof LobeDBSchemaMap) => {
245
const table = browserDB[tableKey];
246
const items = await table.toArray();
247
const yItemMap = this.getYMap(tableKey);
250
const batchSize = 50;
253
const totalBatches = Math.ceil(items.length / batchSize);
255
for (let i = 0; i < totalBatches; i++) {
257
const start = i * batchSize;
258
const end = start + batchSize;
261
const batchItems = items.slice(start, end);
264
this._ydoc?.transact(() => {
265
batchItems.forEach((item) => {
266
yItemMap!.set(item.id, item);
271
this.logger('[DB]:', tableKey, yItemMap?.size);
274
private initAwareness = ({ user }: Pick<StartDataSyncParams, 'user' | 'onAwarenessChange'>) => {
275
if (!this.provider) return;
277
const awareness = this.provider.awareness;
279
awareness.setLocalState({ clientID: awareness.clientID, user });
280
this.onAwarenessChange?.([{ ...user, clientID: awareness.clientID, current: true }]);
282
awareness.on('change', () => this.syncAwarenessToUI());
285
private syncAwarenessToUI = async () => {
286
const awareness = this.provider?.awareness;
288
if (!awareness) return;
290
const state = Array.from(awareness.getStates().values()).map((s) => ({
292
clientID: s.clientID,
293
current: s.clientID === awareness.clientID,
296
this.onAwarenessChange?.(uniqBy(state, 'id'));
300
export const dataSync = new DataSync();
302
interface IWebsocketClient {
303
binaryType: 'arraybuffer' | 'blob' | null;
309
lastMessageReceived: number;
310
send(message: any): void;
311
shouldConnect: boolean;
312
unsuccessfulReconnects: number;
319
__ONLY_USE_FOR_CLEANUP_IN_DEV?: WebrtcProvider | null;