Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 7, 2024
1 parent bc44839 commit 9123a1c
Showing 1 changed file with 1 addition and 83 deletions.
84 changes: 1 addition & 83 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const QueueEntry = require('../../lib/models/QueueEntry');
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const MetricsProducer = require('../../lib/MetricsProducer');
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } =
require('../ingestion/constants');

const getContentType = require('./utils/contentTypeHelper');
const BucketMemState = require('./utils/BucketMemState');
const MongoProcessorMetrics = require('./MongoProcessorMetrics');
Expand Down Expand Up @@ -76,20 +74,9 @@ class MongoQueueProcessor {
this._mongoClient = new MongoClient(this.mongoClientConfig);
this._bucketMemState = new BucketMemState(Config);

// in-mem batch of metrics, we only track total entry count by location
// this._accruedMetrics = { zenko-location: 10 }
this._accruedMetrics = {};

setInterval(() => {
this._sendMetrics();
}, METRIC_REPORT_INTERVAL_MS);
}

_setupMetricsClients(cb) {
// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig);
this._mProducer.setupProducer(cb);
}

/**
* Start kafka consumer
Expand All @@ -99,15 +86,6 @@ class MongoQueueProcessor {
start() {
this.logger.info('starting mongo queue processor');
async.series([
next => this._setupMetricsClients(err => {
if (err) {
this.logger.error('error setting up metrics client', {
method: 'MongoQueueProcessor.start',
error: err,
});
}
return next(err);
}),
next => this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
Expand Down Expand Up @@ -177,18 +155,6 @@ class MongoQueueProcessor {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'MongoQueueProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
}

Expand Down Expand Up @@ -408,7 +374,6 @@ class MongoQueueProcessor {
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
Expand All @@ -418,7 +383,6 @@ class MongoQueueProcessor {
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata deleted from mongo', {
entry: sourceEntry.getLogInfo(),
location,
Expand All @@ -443,7 +407,6 @@ class MongoQueueProcessor {
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand All @@ -454,7 +417,6 @@ class MongoQueueProcessor {

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
log.end().debug('skipping duplicate entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand Down Expand Up @@ -498,7 +460,6 @@ class MongoQueueProcessor {
return this._mongoClient.putObject(bucket, key, objVal, params,
this.logger, err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error putting object metadata ' +
'to mongo', {
bucket,
Expand All @@ -509,7 +470,6 @@ class MongoQueueProcessor {
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata put to mongo', {
entry: sourceEntry.getLogInfo(),
location,
Expand All @@ -519,49 +479,8 @@ class MongoQueueProcessor {
});
}

/**
* Send accrued metrics by location to kafka
* @return {undefined}
*/
_sendMetrics() {
Object.keys(this._accruedMetrics).forEach(loc => {
const count = this._accruedMetrics[loc];

// only report metrics if something has been recorded for location
if (count > 0) {
this._accruedMetrics[loc] = 0;
const metric = { [loc]: { ops: count } };
this._mProducer.publishMetrics(metric, metricsTypeCompleted,
metricsExtension, () => {});
}
});
}

/**
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS
* @param {string} location - zenko storage location name
* @return {undefined}
*/
_produceMetricCompletionEntry(location) {
if (this._accruedMetrics[location]) {
this._accruedMetrics[location] += 1;
} else {
this._accruedMetrics[location] = 1;
}
}

Check failure on line 483 in extensions/mongoProcessor/MongoQueueProcessor.js

View workflow job for this annotation

GitHub Actions / tests

More than 2 blank lines not allowed
/**
* For cases where we experience an error or skip an entry, we need to
* normalize pending metric. This means we will see pending metrics stuck
* above 0 and will need to bring those metrics down
* @param {string} location - location constraint name
* @return {undefined}
*/
_normalizePendingMetric(location) {
const metric = { [location]: { ops: 1 } };
this._mProducer.publishMetrics(metric, metricsTypePendingOnly,
metricsExtension, () => {});
}

/**
* Get bucket info in memoize state if exists, otherwise fetch from Mongo
Expand Down Expand Up @@ -639,7 +558,6 @@ class MongoQueueProcessor {
entryType: sourceEntry.constructor.name,
method: 'MongoQueueProcessor.processKafkaEntry',
});
this._normalizePendingMetric(location);
return process.nextTick(done);
});
}
Expand Down

0 comments on commit 9123a1c

Please sign in to comment.