Skip to content

Commit

Permalink
Code improvements
Browse files Browse the repository at this point in the history
- Remove useless variables
- Update tests

Isue: BB-601
  • Loading branch information
williamlardier committed Sep 18, 2024
1 parent c070015 commit 3e67886
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 53 deletions.
4 changes: 0 additions & 4 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ class OplogPopulator {
this._allocationStrategy = this.initStrategy();
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
Expand Down Expand Up @@ -340,7 +339,6 @@ class OplogPopulator {
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
this._maximumBucketsPerConnector = 1;
strategy = new ImmutableConnector({
logger: this._logger,
metricsHandler: this._metricsHandler,
Expand All @@ -351,12 +349,10 @@ class OplogPopulator {
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
this._maximumBucketsPerConnector = constants.maxBucketsPerConnector;
strategy = new LeastFullConnector({
logger: this._logger,
metricsHandler: this._metricsHandler,
connectorsManager: this._connectorsManager,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
});
}
return new RetainBucketsDecorator(
Expand Down
16 changes: 3 additions & 13 deletions extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const constants = require('../constants');
const AllocationStrategy = require('./AllocationStrategy');

/**
Expand All @@ -10,17 +11,6 @@ const AllocationStrategy = require('./AllocationStrategy');
*/
class LeastFullConnector extends AllocationStrategy {

/**
* @constructor
* @param {Object} params params
* @param {Number} params.maximumBucketsPerConnector maximum number of buckets per connector
* @param {Logger} params.logger logger object
*/
constructor(params) {
super(params);
this._maximumBucketsPerConnector = params.maximumBucketsPerConnector;
}

/**
* Get best connector to assign a bucket to.
* If no connector is available, null is returned.
Expand All @@ -33,7 +23,7 @@ class LeastFullConnector extends AllocationStrategy {
return null;
}
const connector = connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
if (connector.buckets.length >= this._maximumBucketsPerConnector) {
if (connector.buckets.length >= this.maximumBucketsPerConnector) {
return null;
}
return connector;
Expand All @@ -53,7 +43,7 @@ class LeastFullConnector extends AllocationStrategy {
* @returns {Number} maximum number of buckets per connector
*/
get maximumBucketsPerConnector() {
return this._maximumBucketsPerConnector;
return constants.maxBucketsPerConnector;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ class RetainBucketsDecorator extends AllocationStrategy {
* @returns {Connector | null} connector
*/
getConnector(connectors, bucket) {
if (this._retainedBuckets.has(bucket)) {
return this._retainedBuckets.get(bucket);
}

return this._strategy.getConnector(connectors, bucket);
return this._retainedBuckets.get(bucket) ||
this._strategy.getConnector(connectors, bucket);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ class Allocator extends EventEmitter {
try {
const connector = this._bucketsToConnectors.get(bucket);
if (connector) {
await connector.removeBucket(bucket);
this.emit('bucket-removed', bucket, connector);
await connector.removeBucket(bucket);
this._bucketsToConnectors.delete(bucket);
this._metricsHandler.onConnectorConfigured(connector, 'delete');
this._logger.info('Stopped listening to bucket', {
Expand Down
8 changes: 2 additions & 6 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ const paramsJoi = joi.object({
heartbeatIntervalMs: joi.number().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
).default(constants.maxBucketsPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
allocationStrategy: joi.object()
Expand All @@ -47,8 +44,6 @@ class ConnectorsManager extends EventEmitter {
* @constructor
* @param {Object} params params
* @param {number} params.nbConnectors number of connectors to have
* @param {boolean} params.maximumBucketsPerConnector maximum number of
* buckets per connector
* @param {string} params.database MongoDB database to use (for connector)
* @param {string} params.mongoUrl MongoDB connection url
* @param {string} params.oplogTopic topic to use for oplog
Expand Down Expand Up @@ -308,7 +303,8 @@ class ConnectorsManager extends EventEmitter {
};
}
if (connector.bucketCount > this._allocationStrategy.maximumBucketsPerConnector) {
bucketsExceedingLimit += connector.bucketCount - this._allocationStrategy.maximumBucketsPerConnector;
bucketsExceedingLimit +=

Check warning on line 306 in extensions/oplogPopulator/modules/ConnectorsManager.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/ConnectorsManager.js#L306

Added line #L306 was not covered by tests
connector.bucketCount - this._allocationStrategy.maximumBucketsPerConnector;
}
} catch (err) {
this._metricsHandler.onConnectorReconfiguration(connector, false);
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/oplogPopulator/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const OplogPopulatorMetrics =
require('../../../extensions/oplogPopulator/OplogPopulatorMetrics');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const constants = require('../../../extensions/oplogPopulator/constants');

const logger = new werelogs.Logger('Allocator');

Expand Down Expand Up @@ -41,7 +40,6 @@ describe('Allocator', () => {
// Not needed to test all strategies here: we stub their methods
new LeastFullConnector({
logger,
maximumBucketsPerConnector: constants.maxBucketsPerConnector,
}),
{ logger, }
),
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const OplogPopulatorMetrics =
require('../../../extensions/oplogPopulator/OplogPopulatorMetrics');
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const constants = require('../../../extensions/oplogPopulator/constants');

const logger = new werelogs.Logger('ConnectorsManager');

Expand Down Expand Up @@ -103,7 +102,6 @@ describe('ConnectorsManager', () => {
// Not needed to test all strategies here: we stub their methods
new LeastFullConnector({
logger,
maximumBucketsPerConnector: 1,
}),
{ logger }
),
Expand Down Expand Up @@ -184,6 +182,7 @@ describe('ConnectorsManager', () => {
const config = { ...connectorConfig };
config['topic.namespace.map'] = 'outdated-topic';
config['offset.partitiom.name'] = 'partition-name';
sinon.stub(connectorsManager._allocationStrategy, 'maximumBucketsPerConnector').value(1);
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager, '_extractBucketsFromConfig').returns(['bucket1', 'bucket2']);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('ImmutableConnector', () => {
const result = await immutableConnector.canUpdate();
assert.strictEqual(result, false);
});

it('should return 1 for maximumBucketsPerConnector', async () => {
const immutableConnector = new ImmutableConnector({ logger });
const result = await immutableConnector.maximumBucketsPerConnector;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const assert = require('assert');
const sinon = require('sinon');
const werelogs = require('werelogs');

const Connector =
require('../../../../extensions/oplogPopulator/modules/Connector');
const LeastFullConnector =
require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const constants = require('../../../../extensions/oplogPopulator/constants');

const logger = new werelogs.Logger('LeastFullConnector');

Expand Down Expand Up @@ -33,11 +35,14 @@ describe('LeastFullConnector', () => {
beforeEach(() => {
strategy = new LeastFullConnector({
logger,
maximumBucketsPerConnector: 2,
});
});

describe('getConnector', () => {
afterEach(() => {
sinon.restore();
});

it('should return connector with fewest buckets', () => {
const connector = strategy.getConnector([connector1, connector2]);
assert.strictEqual(connector.name, connector1.name);
Expand All @@ -49,6 +54,7 @@ describe('LeastFullConnector', () => {
});

it('should return null if the smallest connector is full', () => {
sinon.stub(strategy, 'maximumBucketsPerConnector').value(1);
const connector = strategy.getConnector([connector2]);
assert.strictEqual(connector, null);
});
Expand All @@ -62,7 +68,7 @@ describe('LeastFullConnector', () => {

describe('maximumBucketsPerConnector', () => {
it('should return the maximum number of buckets per connector', () => {
assert.strictEqual(strategy.maximumBucketsPerConnector, 2);
assert.strictEqual(strategy.maximumBucketsPerConnector, constants.maxBucketsPerConnector);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const connector2 = new Connector({
scenario: 'LeastFullConnector',
strategy: new LeastFullConnector({
logger,
maximumBucketsPerConnector: 2,
}),
}
].forEach(({ scenario, strategy }) => {
Expand Down
16 changes: 0 additions & 16 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const OplogPopulator =
const ChangeStream =
require('../../../lib/wrappers/ChangeStream');
const ConnectorsManager = require('../../../extensions/oplogPopulator/modules/ConnectorsManager');
const constants = require('../../../extensions/oplogPopulator/constants');
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const ImmutableConnector = require('../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector');
Expand Down Expand Up @@ -136,20 +135,6 @@ describe('OplogPopulator', () => {
sinon.restore();
});

it('should set the maximum buckets per connector', () => {
const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(true);
oplogPopulator.initStrategy();
assert.strictEqual(oplogPopulator._maximumBucketsPerConnector, 1);
assert(arePipelinesImmutableStub.calledOnce);
});

it('should set the maximum buckets per connector to default if pipelines are not immutable', () => {
const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(false);
oplogPopulator.initStrategy();
assert.strictEqual(oplogPopulator._maximumBucketsPerConnector, constants.maxBucketsPerConnector);
assert(arePipelinesImmutableStub.calledOnce);
});

it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => {
const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(true);
const strategy = oplogPopulator.initStrategy();
Expand Down Expand Up @@ -262,7 +247,6 @@ describe('OplogPopulator', () => {
it('should initialize connectors manager', async () => {
oplogPopulator._connectorsManager = new ConnectorsManager({
nbConnectors: oplogPopulator._config.numberOfConnectors,
maximumBucketsPerConnector: oplogPopulator._maximumBucketsPerConnector,
database: oplogPopulator._database,
mongoUrl: oplogPopulator._mongoUrl,
oplogTopic: oplogPopulator._config.topic,
Expand Down

0 comments on commit 3e67886

Please sign in to comment.