From dfcc2f5786ab6da8bd3e2fc893d897aed0a77fe8 Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Wed, 30 Dec 2020 18:14:53 -0800 Subject: [PATCH] bugfix: S3C-3769 Timeout for populator batches In case a populator batch takes too long to execute fully, trigger a timeout, which both logs it along with the step at which the current batch is, and allow a new batch to start. This is a workaround to ensure the queue populator can make progress, and should help investigation whenever a case of timeout happens, to know what calls are causing the timeout. --- bin/queuePopulator.js | 7 ++++++- lib/queuePopulator/LogReader.js | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/bin/queuePopulator.js b/bin/queuePopulator.js index c71b96b74..eb34a67a6 100644 --- a/bin/queuePopulator.js +++ b/bin/queuePopulator.js @@ -24,10 +24,15 @@ function queueBatch(queuePopulator, taskState) { log.debug('skipping replication batch: previous one still in progress'); return undefined; } + const onTimeout = () => { + // reset the flag to allow a new batch to start in case the + // previous batch timed out + taskState.batchInProgress = false; + }; log.debug('start queueing replication batch'); taskState.batchInProgress = true; const maxRead = qpConfig.batchMaxRead; - queuePopulator.processAllLogEntries({ maxRead }, err => { + queuePopulator.processAllLogEntries({ maxRead, onTimeout }, err => { taskState.batchInProgress = false; if (err) { log.error('an error occurred during replication', { diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 0545cd4f7..2ef587a12 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -10,6 +10,8 @@ const ReplicationQueuePopulator = const { metricsExtension, metricsTypeQueued } = require('../../extensions/replication/constants'); +const BATCH_TIMEOUT_SECONDS = 300; + class LogReader { /** @@ -170,6 +172,8 @@ class LogReader { * from the log. Records may contain multiple entries and all entries * are not queued, so the number of queued entries is not directly * related to this number. + * @param {function} [params.onTimeout] - optional callback, + * called when a single batch times out * @param {function} done - callback when done processing the * entries. Called with an error, or null and a statistics object as * second argument. On success, the statistics contain the following: @@ -194,6 +198,15 @@ class LogReader { debugStep: '[INIT]', }; + const batchTimeoutTimer = setTimeout(() => { + this.log.error('replication batch timeout', { + logStats: batchState.logStats, + batchStep: batchState.debugStep, + }); + if (params.onTimeout) { + params.onTimeout(); + } + }, BATCH_TIMEOUT_SECONDS * 1000); async.waterfall([ next => this._processReadRecords(params, batchState, next), next => this._processPrepareEntries(batchState, next), @@ -201,6 +214,7 @@ class LogReader { next => this._processSaveLogOffset(batchState, next), ], err => { + clearTimeout(batchTimeoutTimer); if (err) { return done(err); }