From 9123a1c30b17f85b03469b0304a068e736599bf0 Mon Sep 17 00:00:00 2001 From: Maha Benzekri Date: Thu, 7 Nov 2024 09:46:21 +0100 Subject: [PATCH] wip --- .../mongoProcessor/MongoQueueProcessor.js | 84 +------------------ 1 file changed, 1 insertion(+), 83 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 6811079f4..bfe2cd549 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -15,9 +15,7 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer'); const QueueEntry = require('../../lib/models/QueueEntry'); const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry'); const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry'); -const MetricsProducer = require('../../lib/MetricsProducer'); -const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } = - require('../ingestion/constants'); + const getContentType = require('./utils/contentTypeHelper'); const BucketMemState = require('./utils/BucketMemState'); const MongoProcessorMetrics = require('./MongoProcessorMetrics'); @@ -76,20 +74,9 @@ class MongoQueueProcessor { this._mongoClient = new MongoClient(this.mongoClientConfig); this._bucketMemState = new BucketMemState(Config); - // in-mem batch of metrics, we only track total entry count by location - // this._accruedMetrics = { zenko-location: 10 } - this._accruedMetrics = {}; - setInterval(() => { - this._sendMetrics(); - }, METRIC_REPORT_INTERVAL_MS); } - _setupMetricsClients(cb) { - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig); - this._mProducer.setupProducer(cb); - } /** * Start kafka consumer @@ -99,15 +86,6 @@ class MongoQueueProcessor { start() { this.logger.info('starting mongo queue processor'); async.series([ - next => this._setupMetricsClients(err => { - if (err) { - this.logger.error('error setting up metrics client', { - method: 'MongoQueueProcessor.start', - error: err, - }); - } - return next(err); - }), next => this._mongoClient.setup(err => { if (err) { this.logger.error('could not connect to MongoDB', { @@ -177,18 +155,6 @@ class MongoQueueProcessor { }); return next(); }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'MongoQueueProcessor.stop', - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'MongoQueueProcessor.stop', - }); - return next(); - }, ], done); } @@ -408,7 +374,6 @@ class MongoQueueProcessor { return this._mongoClient.deleteObject(bucket, key, options, log, err => { if (err) { - this._normalizePendingMetric(location); log.end().error('error deleting object metadata ' + 'from mongo', { bucket, @@ -418,7 +383,6 @@ class MongoQueueProcessor { }); return done(err); } - this._produceMetricCompletionEntry(location); log.end().info('object metadata deleted from mongo', { entry: sourceEntry.getLogInfo(), location, @@ -443,7 +407,6 @@ class MongoQueueProcessor { this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { if (err) { - this._normalizePendingMetric(location); log.end().error('error processing object queue entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', entry: sourceEntry.getLogInfo(), @@ -454,7 +417,6 @@ class MongoQueueProcessor { const content = getContentType(sourceEntry, zenkoObjMd); if (content.length === 0) { - this._normalizePendingMetric(location); log.end().debug('skipping duplicate entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', entry: sourceEntry.getLogInfo(), @@ -498,7 +460,6 @@ class MongoQueueProcessor { return this._mongoClient.putObject(bucket, key, objVal, params, this.logger, err => { if (err) { - this._normalizePendingMetric(location); log.end().error('error putting object metadata ' + 'to mongo', { bucket, @@ -509,7 +470,6 @@ class MongoQueueProcessor { }); return done(err); } - this._produceMetricCompletionEntry(location); log.end().info('object metadata put to mongo', { entry: sourceEntry.getLogInfo(), location, @@ -519,49 +479,8 @@ class MongoQueueProcessor { }); } - /** - * Send accrued metrics by location to kafka - * @return {undefined} - */ - _sendMetrics() { - Object.keys(this._accruedMetrics).forEach(loc => { - const count = this._accruedMetrics[loc]; - - // only report metrics if something has been recorded for location - if (count > 0) { - this._accruedMetrics[loc] = 0; - const metric = { [loc]: { ops: count } }; - this._mProducer.publishMetrics(metric, metricsTypeCompleted, - metricsExtension, () => {}); - } - }); - } - /** - * Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS - * @param {string} location - zenko storage location name - * @return {undefined} - */ - _produceMetricCompletionEntry(location) { - if (this._accruedMetrics[location]) { - this._accruedMetrics[location] += 1; - } else { - this._accruedMetrics[location] = 1; - } - } - /** - * For cases where we experience an error or skip an entry, we need to - * normalize pending metric. This means we will see pending metrics stuck - * above 0 and will need to bring those metrics down - * @param {string} location - location constraint name - * @return {undefined} - */ - _normalizePendingMetric(location) { - const metric = { [location]: { ops: 1 } }; - this._mProducer.publishMetrics(metric, metricsTypePendingOnly, - metricsExtension, () => {}); - } /** * Get bucket info in memoize state if exists, otherwise fetch from Mongo @@ -639,7 +558,6 @@ class MongoQueueProcessor { entryType: sourceEntry.constructor.name, method: 'MongoQueueProcessor.processKafkaEntry', }); - this._normalizePendingMetric(location); return process.nextTick(done); }); }