@@ -136,6 +136,12 @@ export class YSocketIO {
136136 * @readonly
137137 */
138138 namespacePersistentMap = new Map ( )
139+ /**
140+ * @type {Map<string, () => void> }
141+ * @private
142+ * @readonly
143+ */
144+ awaitingPersistMap = new Map ( )
139145
140146 /**
141147 * YSocketIO constructor.
@@ -156,16 +162,20 @@ export class YSocketIO {
156162 *
157163 * It also starts socket connection listeners.
158164 * @param {import('../storage.js').AbstractStorage } store
159- * @param {{ redisPrefix?: string, redisUrl?: string }= } opts
165+ * @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }= } opts
160166 * @public
161167 */
162- async initialize ( store , { redisUrl, redisPrefix = 'y' } = { } ) {
168+ async initialize ( store , { redisUrl, redisPrefix = 'y' , persistWorker } = { } ) {
163169 const [ client , subscriber ] = await promise . all ( [
164170 api . createApiClient ( store , { redisUrl, redisPrefix } ) ,
165171 createSubscriber ( store , { redisUrl, redisPrefix } )
166172 ] )
167173 this . client = client
168174 this . subscriber = subscriber
175+ if ( persistWorker ) {
176+ this . client . persistWorker = persistWorker
177+ this . registerPersistWorkerResolve ( )
178+ }
169179
170180 this . nsp = this . io . of ( / ^ \/ y j s \| .* $ / )
171181
@@ -475,7 +485,24 @@ export class YSocketIO {
475485 assert ( this . client )
476486 const doc = this . debouncedPersistDocMap . get ( namespace )
477487 if ( ! doc ) return
478- await this . client . store . persistDoc ( namespace , 'index' , doc )
488+ if ( this . client . persistWorker ) {
489+ /** @type {Promise<void> } */
490+ const promise = new Promise ( ( res ) => {
491+ assert ( this . client ?. persistWorker )
492+ this . awaitingPersistMap . set ( namespace , res )
493+
494+ const docState = Y . encodeStateAsUpdateV2 ( doc )
495+ const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
496+ buf . set ( docState )
497+ this . client . persistWorker . postMessage ( {
498+ room : namespace ,
499+ docstate : buf
500+ } )
501+ } )
502+ await promise
503+ } else {
504+ await this . client . store . persistDoc ( namespace , 'index' , doc )
505+ }
479506 await this . client . trimRoomStream ( namespace , 'index' , true )
480507 this . debouncedPersistDocMap . delete ( namespace )
481508 this . debouncedPersistMap . delete ( namespace )
@@ -569,4 +596,11 @@ export class YSocketIO {
569596 console . error ( e )
570597 }
571598 }
599+
600+ registerPersistWorkerResolve ( ) {
601+ if ( ! this . client ?. persistWorker ) return
602+ this . client . persistWorker . on ( 'message' , ( { event, room } ) => {
603+ if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. ( )
604+ } )
605+ }
572606}
0 commit comments