Skip to content

Commit

Permalink
Extend test and allow using Infinity in joi checks
Browse files Browse the repository at this point in the history
Issue: BB-601
  • Loading branch information
williamlardier committed Sep 11, 2024
1 parent 6bc0ff8 commit 7736410
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 5 deletions.
5 changes: 4 additions & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ const constants = require('../constants');

const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
joi.any().valid(Infinity),
).default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
Expand Down
7 changes: 5 additions & 2 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ const connectorParams = joi.object({
logger: joi.object().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
joi.any().valid(Infinity),
).default(constants.maxBucketPerConnector),
isPipelineImmutable: joi.boolean().default(false),
singleChangeStream: joi.boolean().default(false),
});
Expand Down Expand Up @@ -246,7 +249,7 @@ class Connector {
* @throws {InternalError}
*/
async addBucket(bucket, doUpdate = false) {
if (this._buckets.size > this._maximumBucketsPerConnector) {
if (this._buckets.size >= this._maximumBucketsPerConnector) {
throw errors.InternalError.customizeDescription('Connector reached maximum number of buckets');
}
this._buckets.add(bucket);
Expand Down
5 changes: 4 additions & 1 deletion extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ const paramsJoi = joi.object({
kafkaConnectPort: joi.number().required(),
singleChangeStream: joi.boolean().default(false),
isPipelineImmutable: joi.boolean().default(false),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
joi.any().valid(Infinity),
).default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ describe('Connector', () => {
});

describe('removeBucket', () => {
it('should handle error', async () => {
const connectorUpdateStub = sinon.stub(connector, 'updatePipeline')
.rejects(errors.InternalError);

connector._buckets.add('example-bucket');
await assert.rejects(connector.removeBucket('example-bucket'));
assert(connectorUpdateStub.calledOnce);
});

it('Should remove bucket and update connector', async () => {
const connectorUpdateStub = sinon.stub(connector, 'updatePipeline')
.resolves();
Expand Down
14 changes: 14 additions & 0 deletions tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ describe('LeastFullConnector (with multiple buckets per connector)', () => {
const connector = strategy.getConnector([connector1, connector2]);
assert.strictEqual(connector.name, connector1.name);
});

it('Should return connector with fewest buckets (single stream case)', () => {
strategy = new LeastFullConnector({
maximumBucketsPerConnector: Infinity,
addConnector: () => new Connector({
name: 'example-connector-3',
buckets: [],
...defaultConnectorParams,
}),
logger,
});
const connector = strategy.getConnector([connector1, connector2]);
assert.strictEqual(connector.name, connector1.name);
});
});
});

Expand Down
33 changes: 32 additions & 1 deletion tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ describe('OplogPopulator', () => {
});

describe('setup', () => {

it('should handle error during setup', async () => {
const error = new Error('InternalError');
const loadOplogHelperClassesStub = sinon.stub(oplogPopulator, '_loadOplogHelperClasses').throws(error);
Expand Down Expand Up @@ -155,6 +154,38 @@ describe('OplogPopulator', () => {
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
});

it('should setup oplog populator with single change stream', async () => {
const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves();
const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');
const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager');
const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]);

oplogPopulator._config.singleChangeStream = true;

await oplogPopulator.setup();

assert(setupMongoClientStub.calledOnce);
assert(getBackbeatEnabledBucketsStub.calledOnce);
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
});

it('should setup oplog populator with immutable pipelines', async () => {
const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves();
const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');
const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager');
const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]);

oplogPopulator._mongoVersion = '6.0.0';

await oplogPopulator.setup();

assert(setupMongoClientStub.calledOnce);
assert(getBackbeatEnabledBucketsStub.calledOnce);
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
});
});

describe('_initializeConnectorsManager', () => {
Expand Down

0 comments on commit 7736410

Please sign in to comment.