@@ -11,7 +11,6 @@ import * as math from 'lib0/math'
1111import * as protocol from './protocol.js'
1212import * as env from 'lib0/environment'
1313import * as logging from 'lib0/logging'
14- import * as time from 'lib0/time'
1514
1615const logWorker = logging . createModuleLogger ( '@y/redis/api/worker' )
1716// const logApi = logging.createModuleLogger('@y/redis/api')
@@ -280,6 +279,7 @@ export class Api {
280279 } )
281280 }
282281 tasks . length > 0 && logWorker ( 'Accepted tasks ' , { tasks } )
282+ let reclaimCounts = 0
283283 await promise . all ( tasks . map ( async task => {
284284 const streamlen = await this . redis . xLen ( task . stream )
285285 if ( streamlen === 0 ) {
@@ -289,6 +289,7 @@ export class Api {
289289 . exec ( )
290290 logWorker ( 'Stream still empty, removing recurring task from queue ' , { stream : task . stream } )
291291 } else {
292+ reclaimCounts ++
292293 const { room, docid } = decodeRedisRoomStreamName ( task . stream , this . prefix )
293294 const { ydoc, storeReferences, redisLastId } = await this . getDoc ( room , docid )
294295 const lastId = math . max ( number . parseInt ( redisLastId . split ( '-' ) [ 0 ] ) , number . parseInt ( task . id . split ( '-' ) [ 0 ] ) )
@@ -321,7 +322,7 @@ export class Api {
321322 ydoc . destroy ( )
322323 }
323324 } ) )
324- return tasks
325+ return { tasks, reclaimCounts }
325326 }
326327
327328 async destroy ( ) {
@@ -353,14 +354,17 @@ export class Worker {
353354 this . client = client
354355 logWorker ( 'Created worker process ' , { id : client . consumername , prefix : client . prefix , minMessageLifetime : client . redisMinMessageLifetime } )
355356 ; ( async ( ) => {
356- const startRedisTime = await client . redis . time ( )
357- const timeDiff = startRedisTime . getTime ( ) - time . getUnixTime ( )
357+ let prev = performance . now ( )
358358 while ( ! client . _destroyed ) {
359359 try {
360- const tasks = await client . consumeWorkerQueue ( opts )
361- if ( tasks . length === 0 || ( client . redisMinMessageLifetime > time . getUnixTime ( ) + timeDiff - number . parseInt ( tasks [ 0 ] . id . split ( '-' ) [ 0 ] ) ) ) {
360+ const { reclaimCounts } = await client . consumeWorkerQueue ( opts )
361+ const now = performance . now ( )
362+ if ( reclaimCounts === 0 ) {
362363 await promise . wait ( client . redisWorkerTimeout )
364+ } else if ( now - prev < client . redisWorkerTimeout ) {
365+ await promise . wait ( client . redisWorkerTimeout - ( now - prev ) )
363366 }
367+ prev = now
364368 } catch ( e ) {
365369 console . error ( e )
366370 }
0 commit comments