Skip to content

Commit

Permalink
try2
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 6, 2024
1 parent 6694ca8 commit 11ad023
Show file tree
Hide file tree
Showing 15 changed files with 18 additions and 270 deletions.
64 changes: 1 addition & 63 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const QueueEntry = require('../../lib/models/QueueEntry');
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const MetricsProducer = require('../../lib/MetricsProducer');
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } =
require('../ingestion/constants');

const getContentType = require('./utils/contentTypeHelper');
const BucketMemState = require('./utils/BucketMemState');
const MongoProcessorMetrics = require('./MongoProcessorMetrics');

// batch metrics by location and send to kafka metrics topic every 5 seconds
const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000;

// TODO - ADD PREFIX BASED ON SOURCE
// april 6, 2018

Expand Down Expand Up @@ -79,16 +74,6 @@ class MongoQueueProcessor {
// in-mem batch of metrics, we only track total entry count by location
// this._accruedMetrics = { zenko-location: 10 }
this._accruedMetrics = {};

setInterval(() => {
this._sendMetrics();
}, METRIC_REPORT_INTERVAL_MS);
}

_setupMetricsClients(cb) {
// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig);
this._mProducer.setupProducer(cb);
}

/**
Expand Down Expand Up @@ -177,18 +162,6 @@ class MongoQueueProcessor {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'MongoQueueProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
}

Expand Down Expand Up @@ -408,7 +381,6 @@ class MongoQueueProcessor {
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
Expand Down Expand Up @@ -443,7 +415,6 @@ class MongoQueueProcessor {
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand All @@ -454,7 +425,6 @@ class MongoQueueProcessor {

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
log.end().debug('skipping duplicate entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand Down Expand Up @@ -498,7 +468,6 @@ class MongoQueueProcessor {
return this._mongoClient.putObject(bucket, key, objVal, params,
this.logger, err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error putting object metadata ' +
'to mongo', {
bucket,
Expand All @@ -519,23 +488,6 @@ class MongoQueueProcessor {
});
}

/**
* Send accrued metrics by location to kafka
* @return {undefined}
*/
_sendMetrics() {
Object.keys(this._accruedMetrics).forEach(loc => {
const count = this._accruedMetrics[loc];

// only report metrics if something has been recorded for location
if (count > 0) {
this._accruedMetrics[loc] = 0;
const metric = { [loc]: { ops: count } };
this._mProducer.publishMetrics(metric, metricsTypeCompleted,
metricsExtension, () => {});
}
});
}

/**
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS
Expand All @@ -550,19 +502,6 @@ class MongoQueueProcessor {
}
}

/**
* For cases where we experience an error or skip an entry, we need to
* normalize pending metric. This means we will see pending metrics stuck
* above 0 and will need to bring those metrics down
* @param {string} location - location constraint name
* @return {undefined}
*/
_normalizePendingMetric(location) {
const metric = { [location]: { ops: 1 } };
this._mProducer.publishMetrics(metric, metricsTypePendingOnly,
metricsExtension, () => {});
}

/**
* Get bucket info in memoize state if exists, otherwise fetch from Mongo
* @param {ObjectQueueEntry} sourceEntry - object metadata entry
Expand Down Expand Up @@ -639,7 +578,6 @@ class MongoQueueProcessor {
entryType: sourceEntry.constructor.name,
method: 'MongoQueueProcessor.processKafkaEntry',
});
this._normalizePendingMetric(location);
return process.nextTick(done);
});
}
Expand Down
10 changes: 0 additions & 10 deletions extensions/replication/tasks/MultipleBackendTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const ReplicateObject = require('./ReplicateObject');
const { attachReqUids } = require('../../../lib/clients/utils');
const getExtMetrics = require('../utils/getExtMetrics');
const { metricsExtension, metricsTypeQueued } = require('../constants');

const MPU_GCP_MAX_PARTS = 1024;

Expand Down Expand Up @@ -702,21 +700,13 @@ class MultipleBackendTask extends ReplicateObject {
if (err) {
return doneOnce(err);
}
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics, metricsTypeQueued,
metricsExtension, () => {});
return this._completeRangedMPU(sourceEntry,
uploadId, log, doneOnce);
});
}

_getAndPutObject(sourceEntry, log, cb) {
const partLogger = this.logger.newRequestLogger(log.getUids());
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics, metricsTypeQueued,
metricsExtension, () => {});

if (BACKBEAT_INJECT_REPLICATION_ERROR_COPYOBJ) {
if (Math.random() < BACKBEAT_INJECT_REPLICATION_ERROR_RATE) {
Expand Down
10 changes: 1 addition & 9 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');

const mapLimitWaitPendingIfError = require('../../../lib/util/mapLimitWaitPendingIfError');
const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils');
const getExtMetrics = require('../utils/getExtMetrics');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials');
const RoleCredentials = require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants');
const { replicationStages } = require('../constants');

const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

Expand Down Expand Up @@ -434,9 +433,6 @@ class ReplicateObject extends BackbeatTask {
location: this.site,
replicationContent: 'data',
});
const extMetrics = getExtMetrics(this.site, size, sourceEntry);
this.mProducer.publishMetrics(extMetrics,
metricsTypeCompleted, metricsExtension, () => {});
}

_publishMetadataWriteMetrics(buffer, writeStartTime) {
Expand Down Expand Up @@ -758,10 +754,6 @@ class ReplicateObject extends BackbeatTask {
// Get data from source bucket and put it on the target bucket
next => {
if (!mdOnly) {
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics,
metricsTypeQueued, metricsExtension, () => {});
return this._getAndPutData(sourceEntry, destEntry, log,
next);
}
Expand Down
38 changes: 0 additions & 38 deletions extensions/replication/tasks/UpdateReplicationStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');

const {
metricsExtension,
metricsTypeCompleted,
metricsTypeFailed,
} = require('../constants');
const {
getSortedSetMember,
getSortedSetKey,
Expand Down Expand Up @@ -106,39 +101,6 @@ class UpdateReplicationStatus extends BackbeatTask {
this.failedCRRProducer.publishFailedCRREntry(JSON.stringify(message));
}

/**
* Report CRR metrics
* @param {ObjectQueueEntry} sourceEntry - The original entry
* @param {ObjectQueueEntry} updatedSourceEntry - updated object entry
* @return {undefined}
*/
_reportMetrics(sourceEntry, updatedSourceEntry) {
const content = updatedSourceEntry.getReplicationContent();
const contentLength = updatedSourceEntry.getContentLength();
const bytes = content.includes('DATA') ? contentLength : 0;
const data = {};
const site = sourceEntry.getSite();
data[site] = { ops: 1, bytes };
const status = sourceEntry.getReplicationSiteStatus(site);
// Report to MetricsProducer with completed/failed metrics.
if (status === 'COMPLETED' || status === 'FAILED') {
const entryType = status === 'COMPLETED' ?
metricsTypeCompleted : metricsTypeFailed;

this.mProducer.publishMetrics(data, entryType, metricsExtension,
err => {
if (err) {
this.logger.trace('error occurred in publishing metrics', {
error: err,
method: 'UpdateReplicationStatus._reportMetrics',
});
}
});
// TODO: update ZenkoMetrics
}
return undefined;
}

/**
* Get the appropriate source metadata for a non-versioned bucket. If the
* object metadata has changed since we performed CRR, then we want to
Expand Down
4 changes: 2 additions & 2 deletions lib/queuePopulator/BucketFileLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ const LogReader = require('./LogReader');
class BucketFileLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, dmdConfig, logger,
extensions, metricsProducer, metricsHandler } = params;
extensions, metricsHandler } = params;
super({ zkClient, kafkaConfig, logConsumer: null,
logId: `bucketFile_${dmdConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
metricsHandler });

this._dmdConfig = dmdConfig;
this._log = logger;
Expand Down
44 changes: 0 additions & 44 deletions lib/queuePopulator/IngestionPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ const Logger = require('werelogs').Logger;
const config = require('../Config');
const IngestionReader = require('./IngestionReader');
const BackbeatProducer = require('../BackbeatProducer');
const MetricsConsumer = require('../MetricsConsumer');
const MetricsProducer = require('../MetricsProducer');
const { metricsExtension } = require('../../extensions/ingestion/constants');
const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics');
const {
startCircuitBreakerMetricsExport,
Expand Down Expand Up @@ -79,11 +76,6 @@ class IngestionPopulator {
// shared producer across readers
this._producer = null;

// metrics clients
this._mProducer = null;
this._mConsumer = null;
this._redis = null;

// all ingestion readers (including paused ones)
// i.e.: { zenko-bucket-name: IngestionReader() }
this._ingestionSources = {};
Expand Down Expand Up @@ -152,17 +144,6 @@ class IngestionPopulator {
});
}

_setupMetricsClients(cb) {
// Metrics Consumer
this._mConsumer = new MetricsConsumer(this.rConfig, this.mConfig,
this.kafkaConfig, metricsExtension);
this._mConsumer.start();

// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig);
this._mProducer.setupProducer(cb);
}

_setupProducer(cb) {
if (this._producer) {
return process.nextTick(cb);
Expand Down Expand Up @@ -580,7 +561,6 @@ class IngestionPopulator {
logger: this.log,
extensions: [this._extension],
producer: this._producer,
metricsProducer: this._mProducer,
qpConfig: this.qpConfig,
s3Config: this.s3Config,
});
Expand Down Expand Up @@ -662,30 +642,6 @@ class IngestionPopulator {
*/
close(done) {
async.series([
next => {
if (this._mProducer) {
this.log.debug('closing metrics producer', {
method: 'IngestionPopulator.close',
});
return this._mProducer.close(next);
}
this.log.debug('no metrics producer to close', {
method: 'IngestionPopulator.close',
});
return next();
},
next => {
if (this._mConsumer) {
this.log.debug('closing metrics consumer', {
method: 'IngestionPopulator.close',
});
return this._mConsumer.close(next);
}
this.log.debug('no metrics consumer to close', {
method: 'IngestionPopulator.close',
});
return next();
},
next => {
if (this._producer) {
this.log.debug('closing producer', {
Expand Down
21 changes: 2 additions & 19 deletions lib/queuePopulator/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ const VID_SEP = require('arsenal').versioning.VersioningConstants
const IngestionProducer = require('./IngestionProducer');
const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics');
const LogReader = require('./LogReader');
const {
metricsExtension,
metricsTypeQueued
} = require('../../extensions/ingestion/constants');
const { transformKey } = require('../util/entry');

function _isVersionedLogKey(key) {
Expand All @@ -19,9 +15,9 @@ function _isVersionedLogKey(key) {
class IngestionReader extends LogReader {
constructor(params) {
const { zkClient, ingestionConfig, kafkaConfig, bucketdConfig, qpConfig,
logger, extensions, producer, metricsProducer, s3Config } = params;
logger, extensions, producer, s3Config } = params;
super({ zkClient, kafkaConfig, logConsumer: {}, logId: '', logger,
extensions, metricsProducer, zkMetricsHandler: IngestionPopulatorMetrics });
extensions, zkMetricsHandler: IngestionPopulatorMetrics });
this._ingestionConfig = ingestionConfig;
this.qpConfig = qpConfig;
this.s3Config = s3Config;
Expand Down Expand Up @@ -423,7 +419,6 @@ class IngestionReader extends LogReader {
if (err) {
return done(err);
}
this._publishMetrics();
return done();
});
}
Expand All @@ -446,18 +441,6 @@ class IngestionReader extends LogReader {
], done);
}

_publishMetrics() {
// Ingestion extensions is a single IngestionQueuePopulatorExt
const extension = this._extensions[0];
const location = this.getLocationConstraint();
const metric = extension.getAndResetMetrics(this._targetZenkoBucket);
if (metric && metric.ops > 0) {
const value = { [location]: metric };
this._mProducer.publishMetrics(value, metricsTypeQueued,
metricsExtension, () => {});
}
}


/**
* Bucket configs have user editable fields: credentials, endpoint
Expand Down
Loading

0 comments on commit 11ad023

Please sign in to comment.