Skip to content

Commit

Permalink
fixup! forward port workarround for queue populator batch getting stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerkesni committed Sep 26, 2024
1 parent 5a630a0 commit 00b882b
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 49 deletions.
108 changes: 60 additions & 48 deletions bin/queuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,58 +51,70 @@ function queueBatch(queuePopulator, taskState) {
}
/* eslint-enable no-param-reassign */

const queuePopulator = new QueuePopulator(zkConfig, kafkaConfig,
qpConfig, httpsConfig, mConfig, rConfig, vConfig, extConfigs);
function main() {
const queuePopulator = new QueuePopulator(zkConfig, kafkaConfig,

Check warning on line 55 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L55

Added line #L55 was not covered by tests
qpConfig, httpsConfig, mConfig, rConfig, vConfig, extConfigs);

async.waterfall([
done => startProbeServer(qpConfig.probeServer, (err, probeServer) => {
async.waterfall([
done => startProbeServer(qpConfig.probeServer, (err, probeServer) => {
if (err) {
log.error('error starting probe server', {

Check warning on line 61 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L58-L61

Added lines #L58 - L61 were not covered by tests
error: err,
method: 'QueuePopulator::startProbeServer',
});
done(err);
return;

Check warning on line 66 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L65-L66

Added lines #L65 - L66 were not covered by tests
}
if (probeServer !== undefined) {
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE],
(res, log) => queuePopulator.handleLiveness(res, log)

Check warning on line 70 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L68-L70

Added lines #L68 - L70 were not covered by tests
);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => queuePopulator.handleMetrics(res, log)

Check warning on line 73 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L72-L73

Added lines #L72 - L73 were not covered by tests
);
}
done();

Check warning on line 76 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L76

Added line #L76 was not covered by tests
}),
done => queuePopulator.open(done),

Check warning on line 78 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L78

Added line #L78 was not covered by tests
done => {
const taskState = {

Check warning on line 80 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L80

Added line #L80 was not covered by tests
batchInProgress: false,
};
schedule.scheduleJob(qpConfig.cronRule, () => {
queueBatch(queuePopulator, taskState);

Check warning on line 84 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L83-L84

Added lines #L83 - L84 were not covered by tests
});
done();

Check warning on line 86 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L86

Added line #L86 was not covered by tests
},
], err => {
if (err) {
log.error('error starting probe server', {
log.error('error during queue populator initialization', {

Check warning on line 90 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L90

Added line #L90 was not covered by tests
method: 'QueuePopulator::task',
error: err,
method: 'QueuePopulator::startProbeServer',
});
done(err);
return;
}
if (probeServer !== undefined) {
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE],
(res, log) => queuePopulator.handleLiveness(res, log)
);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => queuePopulator.handleMetrics(res, log)
);
}
done();
}),
done => queuePopulator.open(done),
done => {
const taskState = {
batchInProgress: false,
};
schedule.scheduleJob(qpConfig.cronRule, () => {
queueBatch(queuePopulator, taskState);
});
done();
},
], err => {
if (err) {
log.error('error during queue populator initialization', {
method: 'QueuePopulator::task',
error: err,
});
process.exit(1);
}
});

process.on('SIGTERM', () => {
log.info('received SIGTERM, exiting');
queuePopulator.close(error => {
if (error) {
log.error('failed to exit properly', {
error,
});
process.exit(1);
}
process.exit(0);
});
});

process.on('SIGTERM', () => {
log.info('received SIGTERM, exiting');
queuePopulator.close(error => {
if (error) {
log.error('failed to exit properly', {

Check warning on line 102 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L98-L102

Added lines #L98 - L102 were not covered by tests
error,
});
process.exit(1);

Check warning on line 105 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L105

Added line #L105 was not covered by tests
}
process.exit(0);

Check warning on line 107 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L107

Added line #L107 was not covered by tests
});
});
}

module.exports = {
queueBatch,
};

// run the main function if this script is run directly
// otherwise the file is required as a module for testing
if (require.main === module) {
main();

Check warning on line 119 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L119

Added line #L119 was not covered by tests
}
49 changes: 49 additions & 0 deletions tests/unit/bin/queuePopulator.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const assert = require('assert');
const sinon = require('sinon');

const QueuePopulator = require('../../../lib/queuePopulator/QueuePopulator');
const { queueBatch } = require('../../../bin/queuePopulator');

describe('queuePopulator', () => {

afterEach(() => {
sinon.restore();
});

describe('processLogEntries', () => {
it('should reset batchInProgress if processLogEntries gets stuck', done => {
const qp = new QueuePopulator({}, {}, {}, {}, {}, {}, {}, {});
const processLogEntries = sinon.stub(qp, 'processLogEntries').callsFake(options => {
setTimeout(() => {
options.onTimeout();
}, 1000);
});
const taskStatus = { batchInProgress: false };
queueBatch(qp, taskStatus);
assert.strictEqual(taskStatus.batchInProgress, true);
assert(processLogEntries.calledOnce);
setTimeout(() => {
assert.strictEqual(taskStatus.batchInProgress, false);
done();
}, 2000);
}).timeout(4000);

it('should reset batchInProgress when processLogEntries finishes', () => {
const qp = new QueuePopulator({}, {}, {}, {}, {}, {}, {}, {});
const processLogEntries = sinon.stub(qp, 'processLogEntries').yields();
const taskStatus = { batchInProgress: false };
queueBatch(qp, taskStatus);
assert(processLogEntries.calledOnce);
assert.strictEqual(taskStatus.batchInProgress, false);
});

it('should skip batch if one is currently in progress', () => {
const qp = new QueuePopulator({}, {}, {}, {}, {}, {}, {}, {});
const processLogEntries = sinon.stub(qp, 'processLogEntries').yields();
const taskStatus = { batchInProgress: true };
queueBatch(qp, taskStatus);
assert.strictEqual(taskStatus.batchInProgress, true);
assert(processLogEntries.notCalled);
});
});
});
5 changes: 4 additions & 1 deletion tests/unit/lib/queuePopulator/LogReader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const { Logger } = require('werelogs');

const LogReader = require('../../../../lib/queuePopulator/LogReader');


class MockLogConsumer {
constructor(params) {
this.params = params || {};
Expand Down Expand Up @@ -40,6 +39,10 @@ describe('LogReader', () => {
});
});

afterEach(() => {
sinon.restore();
});

// Currently the initial offset is set to 1 with mongodb backend,
// it looks odd considering mongodb uses random IDs as log cursors
// and could be cleaned up, but this test coming from 7.4 branch
Expand Down

0 comments on commit 00b882b

Please sign in to comment.