@@ -4,10 +4,13 @@ import * as promise from 'lib0/promise'
44import * as encoding from 'lib0/encoding'
55import * as decoding from 'lib0/decoding'
66import { assert } from 'lib0/testing'
7- import { User } from './user.js'
87import * as api from '../api.js'
98import * as protocol from '../protocol.js'
109import { createSubscriber } from '../subscriber.js'
10+ import { isDeepStrictEqual } from 'util'
11+ import { User } from './user.js'
12+
13+ const PERSIST_INTERVAL = 5000
1114
1215/**
1316 * @typedef {import('socket.io').Namespace } Namespace
@@ -89,6 +92,18 @@ export class YSocketIO {
8992 * @readonly
9093 */
9194 namespaceMap = new Map ( )
95+ /**
96+ * @type {Map<string, RedisDoc> }
97+ * @private
98+ * @readonly
99+ */
100+ namespaceDocMap = new Map ( )
101+ /**
102+ * @type {Map<Socket, { user: UserLike, validatedAt: number }> }
103+ * @private
104+ * @readonly
105+ */
106+ socketUserCache = new Map ( )
92107
93108 /**
94109 * YSocketIO constructor.
@@ -123,12 +138,21 @@ export class YSocketIO {
123138 this . nsp = this . io . of ( / ^ \/ y j s \| .* $ / )
124139
125140 this . nsp . use ( async ( socket , next ) => {
126- if ( this . configuration . authenticate == null ) return next ( )
127- const user = await this . configuration . authenticate ( socket )
128- if ( user ) {
129- socket . user = new User ( this . getNamespaceString ( socket . nsp ) , user . userid )
130- return next ( )
131- } else return next ( new Error ( 'Unauthorized' ) )
141+ if ( this . configuration . authenticate === null ) return next ( )
142+ const userCache = this . socketUserCache . get ( socket )
143+ const namespace = this . getNamespaceString ( socket . nsp )
144+ if ( ! userCache || Date . now ( ) - userCache . validatedAt > 60_000 ) {
145+ this . socketUserCache . delete ( socket )
146+ const user = await this . configuration . authenticate ( socket )
147+ if ( ! user ) return next ( new Error ( 'Unauthorized' ) )
148+ this . socketUserCache . set ( socket , { user, validatedAt : Date . now ( ) } )
149+ socket . user = new User ( namespace , user . userid )
150+ } else {
151+ socket . user = new User ( namespace , userCache . user . userid )
152+ }
153+
154+ if ( socket . user ) return next ( )
155+ else return next ( new Error ( 'Unauthorized' ) )
132156 } )
133157
134158 this . nsp . on ( 'connection' , async ( socket ) => {
@@ -156,17 +180,23 @@ export class YSocketIO {
156180 this . initSyncListeners ( socket )
157181 this . initAwarenessListeners ( socket )
158182 this . initSocketListeners ( socket )
183+ ; ( async ( ) => {
184+ assert ( this . client )
185+ assert ( socket . user )
186+ const doc =
187+ this . namespaceDocMap . get ( namespace ) ||
188+ ( await this . client . getDoc ( namespace , 'index' ) )
189+ this . namespaceDocMap . set ( namespace , doc )
159190
160- const doc = await this . client . getDoc ( namespace , 'index' )
161-
162- if (
163- api . isSmallerRedisId ( doc . redisLastId , socket . user . initialRedisSubId )
164- ) {
165- // our subscription is newer than the content that we received from the api
166- // need to renew subscription id and make sure that we catch the latest content.
167- this . subscriber . ensureSubId ( stream , doc . redisLastId )
168- }
169- this . startSynchronization ( socket , doc )
191+ if (
192+ api . isSmallerRedisId ( doc . redisLastId , socket . user . initialRedisSubId )
193+ ) {
194+ // our subscription is newer than the content that we received from the api
195+ // need to renew subscription id and make sure that we catch the latest content.
196+ this . subscriber ?. ensureSubId ( stream , doc . redisLastId )
197+ }
198+ this . startSynchronization ( socket , doc )
199+ } ) ( )
170200 } )
171201
172202 return { client, subscriber }
@@ -200,22 +230,31 @@ export class YSocketIO {
200230 syncStep2
201231 ) => {
202232 assert ( this . client )
203- const doc = await this . client . getDoc (
204- this . getNamespaceString ( socket . nsp ) ,
205- 'index'
206- )
233+ const namespace = this . getNamespaceString ( socket . nsp )
234+ const doc =
235+ this . namespaceDocMap . get ( namespace ) ||
236+ ( await this . client . getDoc ( namespace , 'index' ) )
237+ this . namespaceDocMap . set ( namespace , doc )
238+ assert ( doc )
207239 syncStep2 ( Y . encodeStateAsUpdate ( doc . ydoc , stateVector ) )
208240 }
209241 )
210242
243+ /** @type {unknown } */
244+ let prevMsg = null
211245 socket . on ( 'sync-update' , ( /** @type {ArrayBuffer } */ update ) => {
246+ if ( isDeepStrictEqual ( update , prevMsg ) ) return
212247 assert ( this . client )
248+ const namespace = this . getNamespaceString ( socket . nsp )
213249 const message = Buffer . from ( update . slice ( 0 , update . byteLength ) )
214- this . client . addMessage (
215- this . getNamespaceString ( socket . nsp ) ,
216- 'index' ,
217- Buffer . from ( this . toRedis ( 'sync-update' , message ) )
218- ) . catch ( console . error )
250+ this . client
251+ . addMessage (
252+ namespace ,
253+ 'index' ,
254+ Buffer . from ( this . toRedis ( 'sync-update' , message ) )
255+ )
256+ . catch ( console . error )
257+ prevMsg = update
219258 } )
220259 }
221260
@@ -232,14 +271,19 @@ export class YSocketIO {
232271 * @readonly
233272 */
234273 initAwarenessListeners = ( socket ) => {
274+ /** @type {unknown } */
275+ const prevMsg = null
235276 socket . on ( 'awareness-update' , ( /** @type {ArrayBuffer } */ update ) => {
277+ if ( isDeepStrictEqual ( update , prevMsg ) ) return
236278 assert ( this . client )
237279 const message = Buffer . from ( update . slice ( 0 , update . byteLength ) )
238- this . client . addMessage (
239- this . getNamespaceString ( socket . nsp ) ,
240- 'index' ,
241- Buffer . from ( this . toRedis ( 'awareness-update' , new Uint8Array ( message ) ) )
242- ) . catch ( console . error )
280+ this . client
281+ . addMessage (
282+ this . getNamespaceString ( socket . nsp ) ,
283+ 'index' ,
284+ Buffer . from ( this . toRedis ( 'awareness-update' , new Uint8Array ( message ) ) )
285+ )
286+ . catch ( console . error )
243287 } )
244288 }
245289
@@ -253,14 +297,18 @@ export class YSocketIO {
253297 socket . on ( 'disconnect' , async ( ) => {
254298 assert ( this . subscriber )
255299 if ( ! socket . user ) return
256- for ( const ns of socket . user . subs ) {
257- const stream = this . namespaceStreamMap . get ( ns )
300+ this . socketUserCache . delete ( socket )
301+ for ( const stream of socket . user . subs ) {
302+ const ns = this . streamNamespaceMap . get ( stream )
303+ if ( ! ns ) continue
258304 const nsp = this . namespaceMap . get ( ns )
259305 if ( nsp ?. sockets . size === 0 && stream ) {
260306 this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
261307 this . namespaceStreamMap . delete ( ns )
262308 this . streamNamespaceMap . delete ( stream )
263309 this . namespaceMap . delete ( ns )
310+ this . namespaceDocMap . get ( ns ) ?. ydoc . destroy ( )
311+ this . namespaceDocMap . delete ( ns )
264312 }
265313 }
266314 } )
@@ -280,11 +328,13 @@ export class YSocketIO {
280328 ( /** @type {Uint8Array } */ update ) => {
281329 assert ( this . client )
282330 const message = Buffer . from ( update . slice ( 0 , update . byteLength ) )
283- this . client . addMessage (
284- this . getNamespaceString ( socket . nsp ) ,
285- 'index' ,
286- Buffer . from ( this . toRedis ( 'sync-step-2' , message ) )
287- ) . catch ( console . error )
331+ this . client
332+ . addMessage (
333+ this . getNamespaceString ( socket . nsp ) ,
334+ 'index' ,
335+ Buffer . from ( this . toRedis ( 'sync-step-2' , message ) )
336+ )
337+ . catch ( console . error )
288338 }
289339 )
290340 if ( doc . awareness . states . size > 0 ) {
@@ -303,7 +353,7 @@ export class YSocketIO {
303353 * @param {string } stream
304354 * @param {Array<Uint8Array> } messages
305355 */
306- redisMessageSubscriber = ( stream , messages ) => {
356+ redisMessageSubscriber = async ( stream , messages ) => {
307357 const namespace = this . streamNamespaceMap . get ( stream )
308358 if ( ! namespace ) return
309359 const nsp = this . namespaceMap . get ( namespace )
@@ -313,6 +363,8 @@ export class YSocketIO {
313363 this . namespaceStreamMap . delete ( namespace )
314364 this . streamNamespaceMap . delete ( stream )
315365 this . namespaceMap . delete ( namespace )
366+ this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
367+ this . namespaceDocMap . delete ( namespace )
316368 }
317369
318370 /** @type {Uint8Array[] } */
@@ -334,6 +386,65 @@ export class YSocketIO {
334386 if ( msg . length === 0 ) continue
335387 nsp . emit ( 'awareness-update' , msg )
336388 }
389+
390+ let changed = false
391+ const existDoc = this . namespaceDocMap . get ( namespace )
392+ if ( existDoc ) {
393+ existDoc . ydoc . on ( 'afterTransaction' , ( tr ) => {
394+ changed = tr . changed . size > 0
395+ } )
396+ Y . transact ( existDoc . ydoc , ( ) => {
397+ for ( const msg of updates ) Y . applyUpdate ( existDoc . ydoc , msg )
398+ for ( const msg of awareness ) {
399+ AwarenessProtocol . applyAwarenessUpdate ( existDoc . awareness , msg , null )
400+ }
401+ } )
402+ }
403+
404+ assert ( this . client )
405+ let doc = existDoc
406+ if ( ! existDoc ) {
407+ const getDoc = await this . client . getDoc ( namespace , 'index' )
408+ doc = getDoc
409+ changed = getDoc . changed
410+ }
411+ assert ( doc )
412+ if ( changed ) this . debouncedPersist ( namespace , doc . ydoc )
413+ this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
414+ this . namespaceDocMap . set ( namespace , doc )
415+ await this . client . trimRoomStream ( namespace , 'index' , nsp . sockets . size === 0 )
416+ }
417+
418+ /**
419+ * @type {Map<string, NodeJS.Timeout | null> }
420+ */
421+ debouncedPersistMap = new Map ( )
422+ /**
423+ * @type {Map<string, Y.Doc> }
424+ */
425+ debouncedPersistDocMap = new Map ( )
426+
427+ /**
428+ * @param {string } namespace
429+ * @param {Y.Doc } doc
430+ */
431+ async debouncedPersist ( namespace , doc ) {
432+ this . debouncedPersistDocMap . set ( namespace , doc )
433+ if ( this . debouncedPersistMap . has ( namespace ) ) return
434+ this . debouncedPersistMap . set (
435+ namespace ,
436+ setTimeout (
437+ async ( ) => {
438+ assert ( this . client )
439+ const doc = this . debouncedPersistDocMap . get ( namespace )
440+ if ( ! doc ) return
441+ await this . client . store . persistDoc ( namespace , 'index' , doc )
442+ this . debouncedPersistDocMap . delete ( namespace )
443+ this . debouncedPersistMap . delete ( namespace )
444+ } ,
445+ PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
446+ )
447+ )
337448 }
338449
339450 /**
0 commit comments