diff --git a/lib/constants.js b/lib/constants.js index 932793a95..bcd3334b8 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -14,6 +14,7 @@ const constants = { statusUndefined: 'UNDEFINED', statusNotReady: 'NOT_READY', statusNotConnected: 'NOT_CONNECTED', + statusTimedOut: 'TIMED_OUT', authTypeAssumeRole: 'assumeRole', authTypeAccount: 'account', authTypeService: 'service', diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 4189cdfea..7c187d7ea 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -52,6 +52,9 @@ class LogReader { this._metricsHandler = params.metricsHandler; // TODO: use a common handler for zk metrics from all extensions this._zkMetricsHandler = params.zkMetricsHandler; + + this._batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300; + this._batchTimedOut = false; } _setEntryBatch(entryBatch) { @@ -291,6 +294,21 @@ class LogReader { timeoutMs: params.timeoutMs, logger: this.log.newRequestLogger(), }; + // When using the RaftLogReader log consumer, The batch + // processing can get stuck sometimes for unknown reasons. + // As a temporary fix we set a timeout to kill the process. + const batchTimeoutTimer = setTimeout(() => { + this.log.error('queue populator batch timeout', { + logStats: batchState.logStats, + }); + // S3C doesn't currently support restarts on healthcheck failure, + // so we just crash for now. + if (process.env.CRASH_ON_BATCH_TIMEOUT) { + process.emit('SIGTERM'); + } else { + this._batchTimedOut = true; + } + }, this._batchTimeoutSeconds * 1000); async.waterfall([ next => this._processReadRecords(params, batchState, next), next => this._processPrepareEntries(batchState, next), @@ -299,6 +317,7 @@ class LogReader { next => this._processSaveLogOffset(batchState, next), ], err => { + clearTimeout(batchTimeoutTimer); if (err) { return done(err); } @@ -726,6 +745,10 @@ class LogReader { }); return statuses; } + + batchProcessTimedOut() { + return this._batchTimedOut; + } } module.exports = LogReader; diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index f16d6fd80..a5ef48155 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -592,6 +592,12 @@ class QueuePopulator { }); } }); + if (reader.batchProcessTimedOut()) { + responses.push({ + component: 'log reader', + status: constants.statusTimedOut, + }); + } }); log.debug('verbose liveness', verboseLiveness); diff --git a/tests/unit/QueuePopulator.spec.js b/tests/unit/QueuePopulator.spec.js index 583030888..cb9ef7a6f 100644 --- a/tests/unit/QueuePopulator.spec.js +++ b/tests/unit/QueuePopulator.spec.js @@ -49,6 +49,7 @@ describe('QueuePopulator', () => { const mockLogReader = sinon.spy(); mockLogReader.getProducerStatus = sinon.fake(() => prodStatus); mockLogReader.getLogInfo = sinon.fake(() => logInfo); + mockLogReader.batchProcessTimedOut = sinon.fake(() => false); qp.logReaders = [ mockLogReader, ]; @@ -72,6 +73,7 @@ describe('QueuePopulator', () => { }; mockLogReader.getProducerStatus = sinon.fake(() => prodStatus); mockLogReader.getLogInfo = sinon.fake(() => logInfo); + mockLogReader.batchProcessTimedOut = sinon.fake(() => false); qp.logReaders = [ mockLogReader, ]; @@ -91,5 +93,31 @@ describe('QueuePopulator', () => { ]) ); }); + + it('returns proper details when batch process timed out', () => { + const mockLogReader = sinon.spy(); + mockLogReader.getProducerStatus = sinon.fake(() => ({ + topicA: true, + })); + mockLogReader.getLogInfo = sinon.fake(() => {}); + mockLogReader.batchProcessTimedOut = sinon.fake(() => true); + qp.logReaders = [ + mockLogReader, + ]; + qp.zkClient = { + getState: () => zookeeper.State.SYNC_CONNECTED, + }; + qp.handleLiveness(mockRes, mockLog); + sinon.assert.calledOnceWithExactly(mockRes.writeHead, 500); + sinon.assert.calledOnceWithExactly( + mockRes.end, + JSON.stringify([ + { + component: 'log reader', + status: constants.statusTimedOut, + }, + ]) + ); + }); }); }); diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 638878987..2c515d1dd 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -297,4 +297,73 @@ describe('LogReader', () => { }); }); }); + + describe('processLogEntries', () => { + it('should shutdown when batch processing is stuck and CRASH_ON_BATCH_TIMEOUT is set', done => { + process.env.CRASH_ON_BATCH_TIMEOUT = true; + logReader._batchTimeoutSeconds = 1; + // logReader will become stuck as _processReadRecords will never + // call the callback + sinon.stub(logReader, '_processReadRecords').returns(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => {}); + setTimeout(() => { + assert.strictEqual(emmitted, true); + delete process.env.CRASH_ON_BATCH_TIMEOUT; + done(); + }, 2000); + }).timeout(4000); + + it('should fail healthcheck when batch processing is stuck', done => { + delete process.env.CRASH_ON_BATCH_TIMEOUT; + logReader._batchTimeoutSeconds = 1; + // logReader will become stuck as _processReadRecords will never + // call the callback + sinon.stub(logReader, '_processReadRecords').returns(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => {}); + setTimeout(() => { + assert.strictEqual(emmitted, false); + assert.strictEqual(logReader.batchProcessTimedOut(), true); + done(); + }, 2000); + }).timeout(4000); + + it('should not shutdown if timeout not reached', done => { + process.env.CRASH_ON_BATCH_TIMEOUT = true; + sinon.stub(logReader, '_processReadRecords').yields(); + sinon.stub(logReader, '_processPrepareEntries').yields(); + sinon.stub(logReader, '_processFilterEntries').yields(); + sinon.stub(logReader, '_processPublishEntries').yields(); + sinon.stub(logReader, '_processSaveLogOffset').yields(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => { + assert.strictEqual(emmitted, false); + delete process.env.CRASH_ON_BATCH_TIMEOUT; + done(); + }); + }); + + it('should not fail healthcheck if timeout not reached', done => { + delete process.env.CRASH_ON_BATCH_TIMEOUT; + sinon.stub(logReader, '_processReadRecords').yields(); + sinon.stub(logReader, '_processPrepareEntries').yields(); + sinon.stub(logReader, '_processFilterEntries').yields(); + sinon.stub(logReader, '_processPublishEntries').yields(); + sinon.stub(logReader, '_processSaveLogOffset').yields(); + logReader.processLogEntries({}, () => { + assert.strictEqual(logReader.batchProcessTimedOut(), false); + done(); + }); + }); + }); });