Skip to content

Commit f00d447

Browse files
committed
feat: add update throttle
1 parent 4bf25a6 commit f00d447

File tree

1 file changed

+67
-2
lines changed

1 file changed

+67
-2
lines changed

src/y-websocket.js

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ export class WebsocketProvider extends ObservableV2 {
266266
* @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds
267267
* @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff)
268268
* @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication
269+
* @param {number} [opts.throttle] Throttle interval for sending updates (in ms)
269270
*/
270271
constructor (serverUrl, roomname, doc, {
271272
connect = true,
@@ -275,7 +276,8 @@ export class WebsocketProvider extends ObservableV2 {
275276
WebSocketPolyfill = WebSocket,
276277
resyncInterval = -1,
277278
maxBackoffTime = 2500,
278-
disableBc = false
279+
disableBc = false,
280+
throttle = 0
279281
} = {}) {
280282
super()
281283
// ensure that serverUrl does not end with /
@@ -302,6 +304,7 @@ export class WebsocketProvider extends ObservableV2 {
302304
this.disableBc = disableBc
303305
this.wsUnsuccessfulReconnects = 0
304306
this.messageHandlers = messageHandlers.slice()
307+
this.throttle = throttle
305308
/**
306309
* @type {boolean}
307310
*/
@@ -358,7 +361,64 @@ export class WebsocketProvider extends ObservableV2 {
358361
broadcastMessage(this, encoding.toUint8Array(encoder))
359362
}
360363
}
361-
this.doc.on('update', this._updateHandler)
364+
365+
/**
366+
* queued updates if throttling is enabled
367+
* @type {*[]}
368+
* @private
369+
*/
370+
this._updateQueue = [];
371+
372+
/**
373+
* timer for throttling
374+
* @type {any}
375+
* @private
376+
*/
377+
this._throttleTimer = undefined;
378+
379+
/**
380+
* send the queued updates
381+
* @private
382+
*/
383+
this._batchBroadcastUpdates = () => {
384+
if (this._updateQueue.length > 0) {
385+
const mergedUpdate = this._updateQueue.length === 1
386+
? this._updateQueue[0]
387+
: Y.mergeUpdates(this._updateQueue);
388+
389+
this._updateQueue = [];
390+
391+
const encoder = encoding.createEncoder();
392+
encoding.writeVarUint(encoder, messageSync);
393+
syncProtocol.writeUpdate(encoder, mergedUpdate);
394+
395+
broadcastMessage(this, encoding.toUint8Array(encoder));
396+
}
397+
};
398+
399+
/**
400+
* Listens to Yjs updates and adds them to the update queue for throttling
401+
* @param {Uint8Array} update
402+
* @param {any} origin
403+
*/
404+
this._throttledUpdateHandler = (update, origin) => {
405+
if (origin !== this) {
406+
this._updateQueue.push(update)
407+
if (!this._throttleTimer) {
408+
this._throttleTimer = setTimeout(() => {
409+
delete this._throttleTimer;
410+
this._batchBroadcastUpdates();
411+
}, this.throttle);
412+
}
413+
}
414+
}
415+
416+
if (this.throttle > 0) {
417+
this.doc.on('update', this._throttledUpdateHandler)
418+
} else {
419+
this.doc.on('update', this._updateHandler)
420+
}
421+
362422
/**
363423
* @param {any} changed
364424
* @param {any} _origin
@@ -498,6 +558,11 @@ export class WebsocketProvider extends ObservableV2 {
498558

499559
disconnect () {
500560
this.shouldConnect = false
561+
if (this._throttleTimer) {
562+
clearTimeout(this._throttleTimer);
563+
delete this._throttleTimer;
564+
this._batchBroadcastUpdates();
565+
}
501566
this.disconnectBc()
502567
if (this.ws !== null) {
503568
closeWebsocketConnection(this, this.ws, null)

0 commit comments

Comments
 (0)