Skip to content

Commit

Permalink
bugfix: S3C-3769 Timeout for populator batches
Browse files Browse the repository at this point in the history
In case a populator batch takes too long to execute fully, trigger a
timeout, which both logs it along with the step at which the current
batch is, and allow a new batch to start.

This is a workaround to ensure the queue populator can make progress,
and should help investigation whenever a case of timeout happens, to
know what calls are causing the timeout.
  • Loading branch information
jonathan-gramain committed Jan 18, 2021
1 parent 63272ae commit dfcc2f5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
7 changes: 6 additions & 1 deletion bin/queuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ function queueBatch(queuePopulator, taskState) {
log.debug('skipping replication batch: previous one still in progress');
return undefined;
}
const onTimeout = () => {
// reset the flag to allow a new batch to start in case the
// previous batch timed out
taskState.batchInProgress = false;
};
log.debug('start queueing replication batch');
taskState.batchInProgress = true;
const maxRead = qpConfig.batchMaxRead;
queuePopulator.processAllLogEntries({ maxRead }, err => {
queuePopulator.processAllLogEntries({ maxRead, onTimeout }, err => {
taskState.batchInProgress = false;
if (err) {
log.error('an error occurred during replication', {
Expand Down
14 changes: 14 additions & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const ReplicationQueuePopulator =
const { metricsExtension, metricsTypeQueued } =
require('../../extensions/replication/constants');

const BATCH_TIMEOUT_SECONDS = 300;

class LogReader {

/**
Expand Down Expand Up @@ -170,6 +172,8 @@ class LogReader {
* from the log. Records may contain multiple entries and all entries
* are not queued, so the number of queued entries is not directly
* related to this number.
* @param {function} [params.onTimeout] - optional callback,
* called when a single batch times out
* @param {function} done - callback when done processing the
* entries. Called with an error, or null and a statistics object as
* second argument. On success, the statistics contain the following:
Expand All @@ -194,13 +198,23 @@ class LogReader {
debugStep: '[INIT]',
};

const batchTimeoutTimer = setTimeout(() => {
this.log.error('replication batch timeout', {
logStats: batchState.logStats,
batchStep: batchState.debugStep,
});
if (params.onTimeout) {
params.onTimeout();
}
}, BATCH_TIMEOUT_SECONDS * 1000);
async.waterfall([
next => this._processReadRecords(params, batchState, next),
next => this._processPrepareEntries(batchState, next),
next => this._processPublishEntries(batchState, next),
next => this._processSaveLogOffset(batchState, next),
],
err => {
clearTimeout(batchTimeoutTimer);
if (err) {
return done(err);
}
Expand Down

0 comments on commit dfcc2f5

Please sign in to comment.