@@ -128,12 +128,6 @@ export class YSocketIO {
128128 * @readonly
129129 */
130130 debouncedPersistMap = new Map ( )
131- /**
132- * @type {Map<string, Y.Doc> }
133- * @private
134- * @readonly
135- */
136- debouncedPersistDocMap = new Map ( )
137131 /**
138132 * @type {Map<string, number> }
139133 * @private
@@ -373,8 +367,7 @@ export class YSocketIO {
373367 const nsp = this . namespaceMap . get ( ns )
374368 if ( nsp ?. sockets . size === 0 && stream ) {
375369 this . cleanupNamespace ( ns , stream , DEFAULT_CLEAR_TIMEOUT )
376- const doc = this . namespaceDocMap . get ( ns )
377- if ( doc ) this . debouncedPersist ( ns , doc . ydoc , true )
370+ if ( this . namespaceDocMap . has ( ns ) ) this . debouncedPersist ( ns , true )
378371 }
379372 }
380373 } )
@@ -492,19 +485,17 @@ export class YSocketIO {
492485 const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL
493486 if ( changed || shouldPersist || nsp . sockets . size === 0 ) {
494487 this . namespacePersistentMap . set ( namespace , now )
495- this . debouncedPersist ( namespace , doc . ydoc , nsp . sockets . size === 0 )
488+ this . debouncedPersist ( namespace , nsp . sockets . size === 0 )
496489 }
497490 this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
498491 this . namespaceDocMap . set ( namespace , doc )
499492 }
500493
501494 /**
502495 * @param {string } namespace
503- * @param {Y.Doc } doc
504496 * @param {boolean= } immediate
505497 */
506- debouncedPersist ( namespace , doc , immediate = false ) {
507- this . debouncedPersistDocMap . set ( namespace , doc )
498+ debouncedPersist ( namespace , immediate = false ) {
508499 if ( this . debouncedPersistMap . has ( namespace ) ) {
509500 if ( ! immediate ) return
510501 clearTimeout ( this . debouncedPersistMap . get ( namespace ) || undefined )
@@ -514,9 +505,17 @@ export class YSocketIO {
514505 : PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
515506 const timeout = setTimeout (
516507 async ( ) => {
508+ // wait for previous persisting operation if exists
509+ const prev = this . awaitingPersistMap . get ( namespace )
510+ if ( prev ?. promise ) await prev . promise
511+ // delete persist entry to allow queueing next persisting operation
512+ // we can delete it here because the following until awaiting persist
513+ // are all synchronize operations
514+ this . debouncedPersistMap . delete ( namespace )
515+
517516 try {
518517 assert ( this . client )
519- const doc = this . debouncedPersistDocMap . get ( namespace )
518+ const doc = this . namespaceDocMap . get ( namespace ) ?. ydoc
520519 logSocketIO ( `trying to persist ${ namespace } ` )
521520 if ( ! doc ) return
522521 if ( this . client . persistWorker ) {
@@ -534,15 +533,14 @@ export class YSocketIO {
534533 } )
535534 await promise
536535 } else {
537- await this . client . store . persistDoc ( namespace , 'index' , doc )
536+ const promise = this . client . store . persistDoc ( namespace , 'index' , doc )
537+ this . awaitingPersistMap . set ( namespace , { promise, resolve : ( ) => { } } )
538+ await promise
538539 }
539540
540541 await this . client . trimRoomStream ( namespace , 'index' )
541542 } catch ( e ) {
542543 console . error ( e )
543- } finally {
544- this . debouncedPersistDocMap . delete ( namespace )
545- this . debouncedPersistMap . delete ( namespace )
546544 }
547545 } ,
548546 timeoutInterval
0 commit comments