Skip to content

Commit eb99a03

Browse files
committed
feat: queue up ydoc sync task
1 parent e342bde commit eb99a03

File tree

1 file changed

+45
-10
lines changed

1 file changed

+45
-10
lines changed

src/y-socket-io/y-socket-io.js

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ export class YSocketIO {
9090
*/
9191
namespaceMap = new Map()
9292

93+
/**
94+
* @type {Promise<void>[]}
95+
* @private
96+
*/
97+
syncQueue = []
98+
9399
/**
94100
* YSocketIO constructor.
95101
* @constructor
@@ -157,16 +163,26 @@ export class YSocketIO {
157163
this.initAwarenessListeners(socket)
158164
this.initSocketListeners(socket)
159165

160-
const doc = await this.client.getDoc(namespace, 'index')
161-
162-
if (
163-
api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId)
164-
) {
165-
// our subscription is newer than the content that we received from the api
166-
// need to renew subscription id and make sure that we catch the latest content.
167-
this.subscriber.ensureSubId(stream, doc.redisLastId)
168-
}
169-
this.startSynchronization(socket, doc)
166+
/**
167+
* @type {Promise<void>}
168+
*/
169+
const task = new Promise((resolve) => {
170+
assert(this.client)
171+
this.client.getDoc(namespace, 'index').then((doc) => {
172+
assert(socket.user)
173+
assert(this.subscriber)
174+
if (
175+
api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId)
176+
) {
177+
// our subscription is newer than the content that we received from the api
178+
// need to renew subscription id and make sure that we catch the latest content.
179+
this.subscriber.ensureSubId(stream, doc.redisLastId)
180+
}
181+
this.startSynchronization(socket, doc)
182+
resolve()
183+
})
184+
})
185+
this.queueUpSyncTask(task)
170186
})
171187

172188
return { client, subscriber }
@@ -336,6 +352,25 @@ export class YSocketIO {
336352
}
337353
}
338354

355+
/**
356+
* @private
357+
* @param {Promise<void>} task
358+
*/
359+
queueUpSyncTask (task) {
360+
const len = this.syncQueue.push(task)
361+
if (len === 1) this.consumeSyncQueue()
362+
}
363+
364+
/**
365+
* @private
366+
*/
367+
async consumeSyncQueue () {
368+
if (this.syncQueue.length === 0) return
369+
const task = this.syncQueue.shift()
370+
await task
371+
this.consumeSyncQueue()
372+
}
373+
339374
/**
340375
* @param {Namespace} namespace
341376
*/

0 commit comments

Comments
 (0)