diff --git a/src/api.js b/src/api.js index aaffa7a..51efe3f 100644 --- a/src/api.js +++ b/src/api.js @@ -11,7 +11,6 @@ import * as math from 'lib0/math' import * as protocol from './protocol.js' import * as env from 'lib0/environment' import * as logging from 'lib0/logging' -import * as time from 'lib0/time' const logWorker = logging.createModuleLogger('@y/redis/api/worker') // const logApi = logging.createModuleLogger('@y/redis/api') @@ -280,6 +279,7 @@ export class Api { }) } tasks.length > 0 && logWorker('Accepted tasks ', { tasks }) + let reclaimCounts = 0 await promise.all(tasks.map(async task => { const streamlen = await this.redis.xLen(task.stream) if (streamlen === 0) { @@ -289,6 +289,7 @@ export class Api { .exec() logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) } else { + reclaimCounts++ const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid) const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0])) @@ -321,7 +322,7 @@ export class Api { ydoc.destroy() } })) - return tasks + return { tasks, reclaimCounts } } async destroy () { @@ -353,14 +354,17 @@ export class Worker { this.client = client logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime }) ;(async () => { - const startRedisTime = await client.redis.time() - const timeDiff = startRedisTime.getTime() - time.getUnixTime() + let prev = performance.now() while (!client._destroyed) { try { - const tasks = await client.consumeWorkerQueue(opts) - if (tasks.length === 0 || (client.redisMinMessageLifetime > time.getUnixTime() + timeDiff - number.parseInt(tasks[0].id.split('-')[0]))) { + const { reclaimCounts } = await client.consumeWorkerQueue(opts) + const now = performance.now() + if (reclaimCounts === 0) { await promise.wait(client.redisWorkerTimeout) + } else if (now - prev < client.redisWorkerTimeout) { + await promise.wait(client.redisWorkerTimeout - (now - prev)) } + prev = now } catch (e) { console.error(e) }