Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development/9.0' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
KillianG committed Nov 21, 2024
2 parents df36e67 + 13e845a commit 35a6c85
Show file tree
Hide file tree
Showing 54 changed files with 2,523 additions and 982 deletions.
3 changes: 2 additions & 1 deletion bin/ingestion.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const qpConfig = config.queuePopulator;
const mConfig = config.metrics;
const rConfig = config.redis;
const s3Config = config.s3;
const { connectionString, autoCreateNamespace } = zkConfig;
const { connectionString, autoCreateNamespace, retries } = zkConfig;

const RESUME_NODE = 'scheduledResume';

Expand Down Expand Up @@ -316,6 +316,7 @@ function initAndStart(zkClient) {

const zkClient = new ZookeeperManager(connectionString, {
autoCreateNamespace,
retries,
}, log);
zkClient.once('error', err => {
log.fatal('error connecting to zookeeper', {
Expand Down
2 changes: 2 additions & 0 deletions extensions/gc/GarbageCollector.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class GarbageCollector extends EventEmitter {
kafka: {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._gcConfig.topic,
groupId: this._gcConfig.consumer.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/gc/GarbageCollectorProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class GarbageCollectorProducer {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
producer.once('error', () => {});
Expand Down
2 changes: 2 additions & 0 deletions extensions/lifecycle/LifecycleQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class LifecycleQueuePopulator extends QueuePopulatorExtension {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic,
});
producer.once('error', done);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ class LifecycleBucketProcessor {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._lcConfig.objectTasksTopic,
});
producer.once('error', err => {
Expand Down Expand Up @@ -418,6 +420,8 @@ class LifecycleBucketProcessor {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._lcConfig.bucketTasksTopic,
groupId: this._lcConfig.bucketProcessor.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ class LifecycleConductor {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic: this.lcConfig.bucketTasksTopic,
});
producer.once('error', cb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class LifecycleObjectProcessor extends EventEmitter {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic,
groupId: this._processConfig.groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
});
producer.once('error', cb);
producer.once('ready', () => {
Expand Down Expand Up @@ -121,6 +123,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic,
groupId: this._processConfig.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class MongoQueueProcessor {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
queueProcessor: this.processKafkaEntry.bind(this),
circuitBreaker: this.mongoProcessorConfig.circuitBreaker,
Expand Down
230 changes: 59 additions & 171 deletions extensions/notification/NotificationConfigManager.js
Original file line number Diff line number Diff line change
@@ -1,80 +1,13 @@
const joi = require('joi');
const semver = require('semver');

const { ZenkoMetrics } = require('arsenal').metrics;
const LRUCache = require('arsenal').algorithms
.cache.LRUCache;
const MongoClient = require('mongodb').MongoClient;
const ChangeStream = require('../../lib/wrappers/ChangeStream');
const constants = require('./constants');
const { constructConnectionString, getMongoVersion } = require('../utils/MongoUtils');

const paramsJoi = joi.object({
mongoConfig: joi.object().required(),
logger: joi.object().required(),
}).required();

const MAX_CACHED_ENTRIES = Number(process.env.MAX_CACHED_BUCKET_NOTIFICATION_CONFIGS)
|| 1000;

// should equal true if config manager's cache was hit during a get operation
const CONFIG_MANAGER_CACHE_HIT = 'cache_hit';
// Type of operation performed on the cache
const CONFIG_MANAGER_OPERATION_TYPE = 'op';

const cacheUpdates = ZenkoMetrics.createCounter({
name: 's3_notification_config_manager_cache_updates_total',
help: 'Total number of cache updates',
labelNames: [
CONFIG_MANAGER_OPERATION_TYPE,
],
});

const configGetLag = ZenkoMetrics.createHistogram({
name: 's3_notification_config_manager_config_get_seconds',
help: 'Time it takes in seconds to get a bucket notification config from MongoDB',
labelNames: [
CONFIG_MANAGER_CACHE_HIT,
],
buckets: [0.001, 0.01, 1, 10, 100, 1000],
});

const cachedBuckets = ZenkoMetrics.createGauge({
name: 's3_notification_config_manager_cached_buckets_count',
help: 'Total number of cached buckets in the notification config manager',
});

function onConfigManagerCacheUpdate(op) {
cacheUpdates.inc({
[CONFIG_MANAGER_OPERATION_TYPE]: op,
});
if (op === 'add') {
cachedBuckets.inc({});
} else if (op === 'delete') {
cachedBuckets.dec({});
}
}

function onConfigManagerConfigGet(cacheHit, delay) {
configGetLag.observe({
[CONFIG_MANAGER_CACHE_HIT]: cacheHit,
}, delay);
}
const MongoConfigManager = require('./configManager/MongoConfigManager');
const ZookeeperConfigManager = require('./configManager/ZookeeperConfigManager');

/**
* @class NotificationConfigManager
*
* @classdesc Manages bucket notification configurations, the configurations
* are directly retrieved from the metastore, and are locally cached. Cache
* is invalidated using MongoDB change streams.
* @classdesc Manages bucket notification configurations
*/
class NotificationConfigManager {
/**
* @constructor
* @param {Object} params - constructor params
* @param {Object} params.mongoConfig - mongoDB config
* @param {Logger} params.logger - logger object
*/

constructor(params) {
joi.attempt(params, paramsJoi);
this._logger = params.logger;
Expand All @@ -83,6 +16,26 @@ class NotificationConfigManager {
this._mongoClient = null;
this._metastore = null;
this._metastoreChangeStream = null;

const {
mongoConfig, bucketMetastore, maxCachedConfigs, zkClient, zkConfig, zkPath, zkConcurrency, logger,
} = params;
if (mongoConfig) {
this._configManagerBackend = new MongoConfigManager({
mongoConfig,
bucketMetastore,
maxCachedConfigs,
logger,
});
} else {
this._usesZookeeperBackend = true;
this._configManagerBackend = new ZookeeperConfigManager({
zkClient,
zkConfig,
zkPath,
zkConcurrency,
logger,
});
}

/**
Expand Down Expand Up @@ -163,120 +116,55 @@ class NotificationConfigManager {
});
break;
}
this._logger.debug('Change stream event processed', {
method: 'NotificationConfigManager._handleChangeStreamChange',
});
}

/**
* Initializes a change stream on the metastore collection
* Only document delete and update/replace operations are
* taken into consideration to invalidate cache.
* Newly created buckets (insert operations) are not cached
* as queue populator instances read from different kafka
* partitions and so don't need the configs for all buckets
* @returns {undefined}
* Get bucket notification configuration
*
* @param {String} bucket - bucket
* @param {function} [cb] - callback
* @return {Object|undefined} - configuration if available or undefined
*/
_setMetastoreChangeStream() {
/**
* To avoid processing irrelevant events
* we filter by the operation types and
* only project the fields needed
*/
const changeStreamPipeline = [
{
$match: {
$or: [
{ operationType: 'delete' },
{ operationType: 'replace' },
{ operationType: 'update' },
]
}
},
{
$project: {
'_id': 1,
'operationType': 1,
'documentKey._id': 1,
'fullDocument._id': 1,
'fullDocument.value.notificationConfiguration': 1
},
},
];
this._metastoreChangeStream = new ChangeStream({
logger: this._logger,
collection: this._metastore,
pipeline: changeStreamPipeline,
handler: this._handleChangeStreamChangeEvent.bind(this),
throwOnError: false,
useStartAfter: semver.gte(this._mongoVersion, '4.2.0'),
});
// start watching metastore
this._metastoreChangeStream.start();
getConfig(bucket, cb) {
const val = this._configManagerBackend.getConfig(bucket);
if (!cb) {
return val;
}
if (val instanceof Promise) {
return val.then(res => cb(null, res)).catch(err => cb(err));
}
return cb(null, val);
}

/**
* Sets up the NotificationConfigManager by
* connecting to mongo and initializing the
* change stream
* @param {Function} cb callback
* @returns {undefined}
* Add/update bucket notification configuration.
*
* @param {String} bucket - bucket
* @param {Object} config - bucket notification configuration
* @return {boolean} - true if set
*/
setup(cb) {
this._setupMongoClient(err => {
if (err) {
this._logger.error('An error occured while setting up mongo client', {
method: 'NotificationConfigManager.setup',
});
return cb(err);
}
try {
this._setMetastoreChangeStream();
} catch (error) {
this._logger.error('An error occured while establishing the change stream', {
method: 'NotificationConfigManager._setMetastoreChangeStream',
});
return cb(error);
}
return cb();
});
setConfig(bucket, config) {
return this._configManagerBackend.setConfig(bucket, config);
}

/**
* Get bucket notification configuration
* Remove bucket notification configuration
*
* @param {String} bucket - bucket
* @return {Object|undefined} - configuration if available or undefined
* @return {undefined}
*/
async getConfig(bucket) {
const startTime = Date.now();
// return cached config for bucket if it exists
const cachedConfig = this._cachedConfigs.get(bucket);
if (cachedConfig) {
const delay = (Date.now() - startTime) / 1000;
onConfigManagerConfigGet(true, delay);
return cachedConfig;
}
try {
// retreiving bucket metadata from the metastore
const bucketMetadata = await this._metastore.findOne({ _id: bucket });
const bucketNotificationConfiguration = (bucketMetadata && bucketMetadata.value &&
bucketMetadata.value.notificationConfiguration) || undefined;
// caching the bucket configuration
this._cachedConfigs.add(bucket, bucketNotificationConfiguration);
const delay = (Date.now() - startTime) / 1000;
onConfigManagerConfigGet(false, delay);
onConfigManagerCacheUpdate('add');
return bucketNotificationConfiguration;
} catch (err) {
this._logger.error('An error occured when getting notification ' +
'configuration of bucket', {
method: 'NotificationConfigManager.getConfig',
bucket,
error: err.message,
});
return undefined;
}
removeConfig(bucket) {
return this._configManagerBackend.removeConfig(bucket);
}

/**
* Setup bucket notification configuration manager
*
* @param {function} [cb] - callback
* @return {undefined}
*/
setup(cb) {
return this._configManagerBackend.setup(cb);
}
}

Expand Down
Loading

0 comments on commit 35a6c85

Please sign in to comment.