Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: BB-309 add oplog populator metrics #2357

Merged
merged 5 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ const { constructConnectionString } = require('../utils/MongoUtils');
const ChangeStream = require('../../lib/wrappers/ChangeStream');
const Allocator = require('./modules/Allocator');
const ConnectorsManager = require('./modules/ConnectorsManager');
const { ZenkoMetrics } = require('arsenal').metrics;
const OplogPopulatorMetrics = require('./OplogPopulatorMetrics');

const paramsJoi = joi.object({
config: joi.object().required(),
mongoConfig: joi.object().required(),
activeExtensions: joi.array().required(),
logger: joi.object().required(),
enableMetrics: joi.boolean().default(true),
Copy link
Contributor

@francoisferrand francoisferrand Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to make metrics optional?
i don't really see a use-case here or "precedent" in other modules, shouldn't metrics always be collected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In normal cases we always have the metrics enabled, that's why it's set to true by default.
The option was put there to be able to run unit tests with no issues, as otherwise we would get errors about the metrics being registered multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may indeed have something for the tests, but it should not propagate to the config...

}).required();

/**
Expand All @@ -37,7 +40,8 @@ class OplogPopulator {
* @param {Object} params.logger - logger
*/
constructor(params) {
joi.attempt(params, paramsJoi);
const validatedParams = joi.attempt(params, paramsJoi);
Object.assign(params, validatedParams);
this._config = params.config;
this._mongoConfig = params.mongoConfig;
this._activeExtensions = params.activeExtensions;
Expand All @@ -54,6 +58,11 @@ class OplogPopulator {
this._mongoUrl = constructConnectionString(this._mongoConfig);
this._replicaSet = this._mongoConfig.replicaSet;
this._database = this._mongoConfig.database;
// initialize metrics
this._metricsHandler = new OplogPopulatorMetrics(this._logger);
if (params.enableMetrics) {
this._metricsHandler.registerMetrics();
}
}

/**
Expand All @@ -67,6 +76,7 @@ class OplogPopulator {
const client = await MongoClient.connect(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
});
// connect to metadata DB
this._mongoClient = client.db(this._database, {
Expand Down Expand Up @@ -181,6 +191,8 @@ class OplogPopulator {
});
break;
}
const delta = (Date.now() - new Date(change.clusterTime).getTime()) / 1000;
this._metricsHandler.onOplogEventProcessed(change.operationType, delta);
this._logger.info('Change stream event processed', {
method: 'OplogPopulator._handleChangeStreamChange',
type: change.operationType,
Expand All @@ -200,7 +212,16 @@ class OplogPopulator {
'_id': 1,
'operationType': 1,
'documentKey._id': 1,
'fullDocument.value': 1
'fullDocument.value': 1,
// transforming the BSON timestamp
// into a usable date
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
'clusterTime': {
$toDate: {
$dateToString: {
date: '$clusterTime'
}
}
},
},
},
];
Expand Down Expand Up @@ -232,11 +253,13 @@ class OplogPopulator {
prefix: this._config.prefix,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
await this._connectorsManager.initializeConnectors();
this._allocator = new Allocator({
connectorsManager: this._connectorsManager,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
// initialize mongo client
Expand All @@ -248,7 +271,14 @@ class OplogPopulator {
// establish change stream
this._setMetastoreChangeStream();
// remove no longer valid buckets from old connectors
await this._connectorsManager.removeInvalidBuckets(validBuckets);
const oldConnectorBuckets = this._connectorsManager.oldConnectors
.map(connector => connector.buckets)
.flat();
const invalidBuckets = oldConnectorBuckets.filter(bucket => !validBuckets.includes(bucket));
await Promise.all(invalidBuckets.map(bucket => this._allocator.stopListeningToBucket(bucket)));
this._logger.info('Successfully removed invalid buckets from old connectors', {
method: 'ConnectorsManager.removeInvalidBuckets',
});
// start scheduler for updating connectors
this._connectorsManager.scheduleConnectorUpdates();
this._logger.info('OplogPopulator setup complete', {
Expand Down Expand Up @@ -282,6 +312,21 @@ class OplogPopulator {
}
return allReady;
}

/**
* Handle ProbeServer metrics
*
* @param {http.HTTPServerResponse} res - HTTP Response to respond with
* @param {Logger} log - Logger
* @returns {undefined}
*/
handleMetrics(res, log) {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
res.end(ZenkoMetrics.asPrometheus());
}
}

module.exports = OplogPopulator;
165 changes: 165 additions & 0 deletions extensions/oplogPopulator/OplogPopulatorMetrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
const { ZenkoMetrics } = require('arsenal').metrics;

const { getStringSizeInBytes } = require('../../lib/util/buffer');

class OplogPopulatorMetrics {
/**
* @param {Logger} logger logger instance
*/
constructor(logger) {
this.acknowledgementLag = null;
this.connectorConfiguration = null;
this.requestSize = null;
this.connectors = null;
this.reconfigurationLag = null;
this.connectorConfigurationApplied = null;
this._logger = logger;
}

registerMetrics() {
this.acknowledgementLag = ZenkoMetrics.createHistogram({
name: 'oplog_populator_acknowledgement_lag_sec',
help: 'Delay between a config change in mongo and the start of processing by the oplogPopulator in seconds',
labelNames: ['opType'],
buckets: [0.001, 0.01, 1, 10, 100, 1000, 10000],
});
this.connectorConfiguration = ZenkoMetrics.createCounter({
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
name: 'oplog_populator_connector_configuration',
help: 'Number of times we update the configuration of a connector',
labelNames: ['connector', 'opType'],
});
this.buckets = ZenkoMetrics.createGauge({
name: 'oplog_populator_connector_buckets',
help: 'Number of buckets per connector',
labelNames: ['connector'],
});
this.requestSize = ZenkoMetrics.createCounter({
name: 'oplog_populator_connector_request_bytes',
help: 'Size of kafka connect request in bytes',
labelNames: ['connector'],
});
this.mongoPipelineSize = ZenkoMetrics.createGauge({
name: 'oplog_populator_connector_pipeline_bytes',
help: 'Size of mongo pipeline in bytes',
labelNames: ['connector'],
});
this.connectors = ZenkoMetrics.createGauge({
name: 'oplog_populator_connectors',
help: 'Total number of configured connectors',
labelNames: ['isOld'],
});
this.reconfigurationLag = ZenkoMetrics.createHistogram({
name: 'oplog_populator_reconfiguration_lag_sec',
help: 'Time it takes kafka-connect to respond to a connector configuration request',
labelNames: ['connector'],
buckets: [0.001, 0.01, 1, 10, 100, 1000, 10000],
});
this.connectorConfigurationApplied = ZenkoMetrics.createCounter({
name: 'oplog_populator_connector_configuration_applied',
help: 'Number of times we submit the configuration of a connector to kafka-connect',
labelNames: ['connector', 'success'],
});
}

/**
* updates oplog_populator_acknowledgement_lag_sec metric
* @param {string} opType oplog operation type
* @param {number} delta delay between a config change
* in mongo and it getting processed by the oplogPopulator
* @returns {undefined}
*/
onOplogEventProcessed(opType, delta) {
try {
this.acknowledgementLag.observe({
opType,
}, delta);
} catch (error) {
this._logger.error('An error occured while pushing metric', {
method: 'OplogPopulatorMetrics.onOplogEventProcessed',
error: error.message,
});
}
}

/**
* updates oplog_populator_connector_configuration &
* oplog_populator_connector_request_bytes metrics
* @param {Connector} connector connector instance
* @param {string} opType operation type, could be one of
* "add" and "delete"
* @param {number} buckets number of buckets updated
* @returns {undefined}
*/
onConnectorConfigured(connector, opType, buckets = 1) {
try {
this.connectorConfiguration.inc({
connector: connector.name,
opType,
}, buckets);
const reqSize = getStringSizeInBytes(JSON.stringify(connector.config));
this.requestSize.inc({
connector: connector.name,
}, reqSize);
const pipelineSize = getStringSizeInBytes(JSON.stringify(connector.config.pipeline));
this.mongoPipelineSize.set({
connector: connector.name,
}, pipelineSize);
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorConfigured',
error: error.message,
});
}
}

/**
* updates oplog_populator_connectors metric
* @param {boolean} isOld true if connectors were not
* created by this OplogPopulator instance
* @param {number} count number of connectors added
* @returns {undefined}
*/
onConnectorsInstantiated(isOld, count = 1) {
try {
this.connectors.inc({
isOld,
}, count);
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorsInstantiated',
error: error.message,
});
}
}

/**
* updates oplog_populator_reconfiguration_lag_sec metric
* @param {Connector} connector connector instance
* @param {Boolean} success true if reconfiguration was successful
* @param {number} delta time it takes to reconfigure a connector
* @returns {undefined}
*/
onConnectorReconfiguration(connector, success, delta = null) {
try {
this.connectorConfigurationApplied.inc({
connector: connector.name,
success,
});
if (success) {
this.reconfigurationLag.observe({
connector: connector.name,
}, delta);
this.buckets.set({
connector: connector.name,
}, connector.bucketCount);
}
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorReconfiguration',
error: error.message,
});
}
}
}

module.exports = OplogPopulatorMetrics;
6 changes: 5 additions & 1 deletion extensions/oplogPopulator/OplogPopulatorTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const OplogPopulator = require('./OplogPopulator');
const {
DEFAULT_LIVE_ROUTE,
DEFAULT_READY_ROUTE,
DEFAULT_METRICS_ROUTE,
} = require('arsenal').network.probe.ProbeServer;
const { sendSuccess, sendError } = require('arsenal').network.probe.Utils;
const { startProbeServerPromise } = require('../../lib/util/probe');
Expand Down Expand Up @@ -40,7 +41,7 @@ const oplogPopulator = new OplogPopulator({
if (oplogPopulator.isReady()) {
sendSuccess(res, log);
} else {
log.error('Notification Queue Processor is not ready');
log.error('OplogPopulator is not ready');
sendError(res, log, errors.ServiceUnavailable, 'unhealthy');
}
}
Expand All @@ -53,6 +54,9 @@ const oplogPopulator = new OplogPopulator({
// following the same pattern as other extensions, where liveness
// and readiness are handled by the same handler
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE], handleLiveness);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => oplogPopulator.handleMetrics(res, log)
);
}
} catch (error) {
logger.error('Error when starting up the oplog populator', {
Expand Down
12 changes: 11 additions & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
const joi = require('joi');
const { errors } = require('arsenal');

const OplogPopulatorMetrics = require('../OplogPopulatorMetrics');
const LeastFullConnector = require('../allocationStrategy/LeastFullConnector');

const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
}).required();

Expand All @@ -28,6 +32,7 @@ class Allocator {
this._allocationStrategy = new LeastFullConnector({
logger: params.logger,
});
this._metricsHandler = params.metricsHandler;
// Stores connector assigned for each bucket
this._bucketsToConnectors = new Map();
this._initConnectorToBucketMap();
Expand All @@ -41,7 +46,10 @@ class Allocator {
const connectors = this._connectorsManager.connectors;
connectors.forEach(connector => {
connector.buckets
.forEach(bucket => this._bucketsToConnectors.set(bucket, connector));
.forEach(bucket => {
this._bucketsToConnectors.set(bucket, connector);
this._metricsHandler.onConnectorConfigured(connector, 'add');
});
});
}

Expand Down Expand Up @@ -69,6 +77,7 @@ class Allocator {
const connector = this._allocationStrategy.getConnector(connectors);
await connector.addBucket(bucket);
this._bucketsToConnectors.set(bucket, connector);
this._metricsHandler.onConnectorConfigured(connector, 'add');
this._logger.info('Started listening to bucket', {
method: 'Allocator.listenToBucket',
bucket,
Expand Down Expand Up @@ -99,6 +108,7 @@ class Allocator {
if (connector) {
await connector.removeBucket(bucket);
this._bucketsToConnectors.delete(bucket);
this._metricsHandler.onConnectorConfigured(connector, 'delete');
this._logger.info('Stopped listening to bucket', {
method: 'Allocator.listenToBucket',
bucket,
Expand Down
25 changes: 25 additions & 0 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ class Connector {
*/
get bucketCount() { return this._buckets.size; }

/**
* Get connector config
* @returns {Object} connector config
*/
get config() { return this._config; }

/**
* Calculate config size in bytes
* @returns {number} config size
*/
getConfigSizeInBytes() {
try {
const configSize = Buffer.byteLength(JSON.stringify(this._config));
return configSize;
} catch (err) {
this._logger.error('Error while calculating config size', {
method: 'Connector.getConfigSizeInBytes',
connector: this._name,
config: this._config,
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}

/**
* Creates the Kafka-connect mongo connector
* @returns {Promise|undefined} undefined
Expand Down
Loading