Skip to content

Commit

Permalink
- Also ensure no race condition when handling connector events
Browse files Browse the repository at this point in the history
Issue: BB-601

Rename metric to avoid unnecessary name

- also fix a comment

Issue: BB-601
  • Loading branch information
williamlardier committed Sep 19, 2024
1 parent 93efc96 commit c0be9ae
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 20 deletions.
6 changes: 3 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,11 @@ class OplogPopulator {
});
// For now, we always use the RetainBucketsDecorator
// so, we map the events from the classes
this._connectorsManager.on('connector-updated', connector =>
this._connectorsManager.on(constants.connectorUpdatedEvent, connector =>
this._allocationStrategy.onConnectorUpdatedOrDestroyed(connector));
this._allocator.on('bucket-removed', (bucket, connector) =>
this._allocator.on(constants.bucketRemovedFromConnectorEvent, (bucket, connector) =>
this._allocationStrategy.onBucketRemoved(bucket, connector));
this._connectorsManager.on('connectors-reconciled', bucketsExceedingLimit => {
this._connectorsManager.on(constants.connectorsReconciledEvent, bucketsExceedingLimit => {
this._metricsHandler.onConnectorsReconciled(
bucketsExceedingLimit,
this._allocationStrategy.retainedBucketsCount,
Expand Down
4 changes: 2 additions & 2 deletions extensions/oplogPopulator/OplogPopulatorMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class OplogPopulatorMetrics {
labelNames: ['connector'],
});
this.bucketsExceedingLimit = ZenkoMetrics.createGauge({
name: 's3_oplog_populator_connector_buckets_exceeding_limit',
name: 's3_oplog_populator_buckets_exceeding_limit',
help: 'Total number of buckets exceeding the limit for all connectors',
});
this.retainedBuckets = ZenkoMetrics.createGauge({
name: 's3_oplog_populator_connector_retained_buckets',
name: 's3_oplog_populator_retained_buckets',
help: 'Current number of buckets still listened to by immutable connectors despite intended removal',
});
this.requestSize = ZenkoMetrics.createCounter({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RetainBucketsDecorator extends AllocationStrategy {
super(params);
this._strategy = strategy;

// Stores buckets that should are removed from the connector
// Stores buckets that should be removed from the connector
// but still in use
this._retainedBuckets = new Map();
}
Expand Down
3 changes: 3 additions & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const constants = {
// to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000
maxBucketsPerConnector: 260000,
mongodbVersionWithImmutablePipelines: '6.0.0',
connectorUpdatedEvent: 'connector-updated',
bucketRemovedFromConnectorEvent: 'bucket-removed',
connectorsReconciledEvent: 'connectors-reconciled',
defaultConnectorConfig: {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
Expand Down
3 changes: 2 additions & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { errors } = require('arsenal');
const OplogPopulatorMetrics = require('../OplogPopulatorMetrics');
const AllocationStrategy = require('../allocationStrategy/AllocationStrategy');
const { EventEmitter } = require('./ConnectorsManager');
const constants = require('../constants');

const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
Expand Down Expand Up @@ -126,8 +127,8 @@ class Allocator extends EventEmitter {
try {
const connector = this._bucketsToConnectors.get(bucket);
if (connector) {
this.emit(constants.bucketRemovedFromConnectorEvent, bucket, connector);
await connector.removeBucket(bucket);
this.emit('bucket-removed', bucket, connector);
this._bucketsToConnectors.delete(bucket);
this._metricsHandler.onConnectorConfigured(connector, 'delete');
this._logger.info('Stopped listening to bucket', {
Expand Down
7 changes: 6 additions & 1 deletion extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const { EventEmitter } = require('stream');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');
const constants = require('../constants');

const connectorParams = joi.object({
name: joi.string().required(),
Expand All @@ -21,7 +23,7 @@ const connectorParams = joi.object({
* destroy and update the config of the connector when adding
* or removing buckets from it
*/
class Connector {
class Connector extends EventEmitter {

/**
* @constructor
Expand All @@ -36,6 +38,7 @@ class Connector {
* @param {number} params.kafkaConnectPort kafka connect port
*/
constructor(params) {
super();
joi.attempt(params, connectorParams);
this._name = params.name;
this._config = params.config;
Expand Down Expand Up @@ -184,6 +187,7 @@ class Connector {
return;
}
try {
this.emit(constants.connectorUpdatedEvent, this);
await this._kafkaConnect.deleteConnector(this._name);
this._isRunning = false;
// resetting the resume point to set a new one on creation of the connector
Expand Down Expand Up @@ -339,6 +343,7 @@ class Connector {
if (doUpdate && this._isRunning) {
const timeBeforeUpdate = Date.now();
this._state.isUpdating = true;
this.emit(constants.connectorUpdatedEvent, this);
await this._kafkaConnect.updateConnectorConfig(this._name, this._config);
this._updateConnectorState(false, timeBeforeUpdate);
this._state.isUpdating = false;
Expand Down
11 changes: 4 additions & 7 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ConnectorsManager extends EventEmitter {
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
});
connector.on(constants.connectorUpdatedEvent,
connector => this.emit(constants.connectorUpdatedEvent, connector));
this._connectors.push(connector);
return connector;
}
Expand Down Expand Up @@ -246,7 +248,6 @@ class ConnectorsManager extends EventEmitter {
async _spawnOrDestroyConnector(connector) {
try {
if (connector.isRunning && connector.bucketCount === 0) {
this.emit('connector-updated', connector);
await connector.destroy();
this._metricsHandler.onConnectorDestroyed();
this._logger.info('Successfully destroyed a connector', {
Expand All @@ -263,11 +264,7 @@ class ConnectorsManager extends EventEmitter {
});
return true;
} else if (connector.isRunning && this._allocationStrategy.canUpdate()) {
const isPipelineUpdated = connector.updatePipeline(true);
if (isPipelineUpdated) {
this.emit('connector-updated', connector);
}
return isPipelineUpdated;
return connector.updatePipeline(true);
}

return false;
Expand Down Expand Up @@ -320,7 +317,7 @@ class ConnectorsManager extends EventEmitter {
});
}
});
this.emit('connectors-reconciled', bucketsExceedingLimit);
this.emit(constants.connectorsReconciledEvent, bucketsExceedingLimit);
if (Object.keys(connectorsStatus).length > 0) {
this._logger.info('Successfully updated connectors', {
method: 'ConnectorsManager._updateConnectors',
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const OplogPopulatorMetrics =
require('../../../extensions/oplogPopulator/OplogPopulatorMetrics');
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const constants = require('../../../extensions/oplogPopulator/constants');

const logger = new werelogs.Logger('ConnectorsManager');

Expand Down Expand Up @@ -238,9 +239,9 @@ describe('ConnectorsManager', () => {
connector1._isRunning = true;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set();
const emitStub = sinon.stub(connectorsManager, 'emit');
const emitStub = sinon.stub(connector1, 'emit');
await connectorsManager._spawnOrDestroyConnector(connector1);
assert(emitStub.calledOnceWith('connector-updated', connector1));
assert(emitStub.calledOnceWith(constants.connectorUpdatedEvent, connector1));
});

it('should spawn a non running connector when buckets are configured', async () => {
Expand Down
7 changes: 4 additions & 3 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/alloc
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const ImmutableConnector = require('../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector');
const AllocationStrategy = require('../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy');
const constants = require('../../../extensions/oplogPopulator/constants');

const oplogPopulatorConfig = {
topic: 'oplog',
Expand Down Expand Up @@ -209,7 +210,7 @@ describe('OplogPopulator', () => {
assert(initializeConnectorsManagerStub.calledOnce);
const onConnectorUpdatedOrDestroyedStub =
sinon.stub(oplogPopulator._allocationStrategy, 'onConnectorUpdatedOrDestroyed');
oplogPopulator._connectorsManager.emit('connector-updated');
oplogPopulator._connectorsManager.emit(constants.connectorUpdatedEvent);
assert(onConnectorUpdatedOrDestroyedStub.calledOnce);
});

Expand All @@ -224,7 +225,7 @@ describe('OplogPopulator', () => {
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
const onBucketRemovedStub = sinon.stub(oplogPopulator._allocationStrategy, 'onBucketRemoved');
oplogPopulator._allocator.emit('bucket-removed');
oplogPopulator._allocator.emit(constants.bucketRemovedFromConnectorEvent);
assert(onBucketRemovedStub.calledOnce);
});

Expand All @@ -239,7 +240,7 @@ describe('OplogPopulator', () => {
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
const onConnectorsReconciledStub = sinon.stub(oplogPopulator._metricsHandler, 'onConnectorsReconciled');
oplogPopulator._connectorsManager.emit('connectors-reconciled');
oplogPopulator._connectorsManager.emit(constants.connectorsReconciledEvent);
assert(onConnectorsReconciledStub.calledOnce);
});
});
Expand Down

0 comments on commit c0be9ae

Please sign in to comment.