diff --git a/bin/ingestion.js b/bin/ingestion.js index 6accc0313..d7ea6a16c 100644 --- a/bin/ingestion.js +++ b/bin/ingestion.js @@ -28,7 +28,7 @@ const qpConfig = config.queuePopulator; const mConfig = config.metrics; const rConfig = config.redis; const s3Config = config.s3; -const { connectionString, autoCreateNamespace } = zkConfig; +const { connectionString, autoCreateNamespace, retries } = zkConfig; const RESUME_NODE = 'scheduledResume'; @@ -316,6 +316,7 @@ function initAndStart(zkClient) { const zkClient = new ZookeeperManager(connectionString, { autoCreateNamespace, + retries, }, log); zkClient.once('error', err => { log.fatal('error connecting to zookeeper', { diff --git a/extensions/gc/GarbageCollector.js b/extensions/gc/GarbageCollector.js index 01172a217..3825dbd20 100644 --- a/extensions/gc/GarbageCollector.js +++ b/extensions/gc/GarbageCollector.js @@ -132,6 +132,8 @@ class GarbageCollector extends EventEmitter { kafka: { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic: this._gcConfig.topic, groupId: this._gcConfig.consumer.groupId, diff --git a/extensions/gc/GarbageCollectorProducer.js b/extensions/gc/GarbageCollectorProducer.js index c7b9dade9..e93051b87 100644 --- a/extensions/gc/GarbageCollectorProducer.js +++ b/extensions/gc/GarbageCollectorProducer.js @@ -26,6 +26,8 @@ class GarbageCollectorProducer { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); producer.once('error', () => {}); diff --git a/extensions/lifecycle/LifecycleQueuePopulator.js b/extensions/lifecycle/LifecycleQueuePopulator.js index a6a58b1b9..02608cde7 100644 --- a/extensions/lifecycle/LifecycleQueuePopulator.js +++ b/extensions/lifecycle/LifecycleQueuePopulator.js @@ -62,6 +62,8 @@ class LifecycleQueuePopulator extends QueuePopulatorExtension { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic, }); producer.once('error', done); diff --git a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js index 761b9ae89..f35208824 100644 --- a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js +++ b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js @@ -380,6 +380,8 @@ class LifecycleBucketProcessor { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._lcConfig.objectTasksTopic, }); producer.once('error', err => { @@ -418,6 +420,8 @@ class LifecycleBucketProcessor { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, backlogMetrics: this._kafkaConfig.backlogMetrics, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic: this._lcConfig.bucketTasksTopic, groupId: this._lcConfig.bucketProcessor.groupId, diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index 0070d01e6..81590bd25 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -876,6 +876,8 @@ class LifecycleConductor { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic: this.lcConfig.bucketTasksTopic, }); producer.once('error', cb); diff --git a/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js b/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js index c1780023f..16ff917c3 100644 --- a/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js +++ b/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js @@ -78,6 +78,8 @@ class LifecycleObjectProcessor extends EventEmitter { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, backlogMetrics: this._kafkaConfig.backlogMetrics, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic, groupId: this._processConfig.groupId, diff --git a/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js b/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js index 90f0be9e5..54405d41e 100644 --- a/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js +++ b/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js @@ -75,6 +75,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }); producer.once('error', cb); producer.once('ready', () => { @@ -121,6 +123,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, backlogMetrics: this._kafkaConfig.backlogMetrics, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic, groupId: this._processConfig.groupId, diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 26d7c8143..9ec0e306f 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -136,6 +136,8 @@ class MongoQueueProcessor { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, queueProcessor: this.processKafkaEntry.bind(this), circuitBreaker: this.mongoProcessorConfig.circuitBreaker, diff --git a/extensions/notification/NotificationConfigManager.js b/extensions/notification/NotificationConfigManager.js index beeed69bf..e00755ddb 100644 --- a/extensions/notification/NotificationConfigManager.js +++ b/extensions/notification/NotificationConfigManager.js @@ -1,80 +1,13 @@ -const joi = require('joi'); -const semver = require('semver'); - -const { ZenkoMetrics } = require('arsenal').metrics; -const LRUCache = require('arsenal').algorithms - .cache.LRUCache; -const MongoClient = require('mongodb').MongoClient; -const ChangeStream = require('../../lib/wrappers/ChangeStream'); -const constants = require('./constants'); -const { constructConnectionString, getMongoVersion } = require('../utils/MongoUtils'); - -const paramsJoi = joi.object({ - mongoConfig: joi.object().required(), - logger: joi.object().required(), -}).required(); - -const MAX_CACHED_ENTRIES = Number(process.env.MAX_CACHED_BUCKET_NOTIFICATION_CONFIGS) - || 1000; - -// should equal true if config manager's cache was hit during a get operation -const CONFIG_MANAGER_CACHE_HIT = 'cache_hit'; -// Type of operation performed on the cache -const CONFIG_MANAGER_OPERATION_TYPE = 'op'; - -const cacheUpdates = ZenkoMetrics.createCounter({ - name: 's3_notification_config_manager_cache_updates_total', - help: 'Total number of cache updates', - labelNames: [ - CONFIG_MANAGER_OPERATION_TYPE, - ], -}); - -const configGetLag = ZenkoMetrics.createHistogram({ - name: 's3_notification_config_manager_config_get_seconds', - help: 'Time it takes in seconds to get a bucket notification config from MongoDB', - labelNames: [ - CONFIG_MANAGER_CACHE_HIT, - ], - buckets: [0.001, 0.01, 1, 10, 100, 1000], -}); - -const cachedBuckets = ZenkoMetrics.createGauge({ - name: 's3_notification_config_manager_cached_buckets_count', - help: 'Total number of cached buckets in the notification config manager', -}); - -function onConfigManagerCacheUpdate(op) { - cacheUpdates.inc({ - [CONFIG_MANAGER_OPERATION_TYPE]: op, - }); - if (op === 'add') { - cachedBuckets.inc({}); - } else if (op === 'delete') { - cachedBuckets.dec({}); - } -} - -function onConfigManagerConfigGet(cacheHit, delay) { - configGetLag.observe({ - [CONFIG_MANAGER_CACHE_HIT]: cacheHit, - }, delay); -} +const MongoConfigManager = require('./configManager/MongoConfigManager'); +const ZookeeperConfigManager = require('./configManager/ZookeeperConfigManager'); /** * @class NotificationConfigManager * - * @classdesc Manages bucket notification configurations, the configurations - * are directly retrieved from the metastore, and are locally cached. Cache - * is invalidated using MongoDB change streams. + * @classdesc Manages bucket notification configurations */ class NotificationConfigManager { - /** - * @constructor - * @param {Object} params - constructor params - * @param {Object} params.mongoConfig - mongoDB config - * @param {Logger} params.logger - logger object - */ + constructor(params) { joi.attempt(params, paramsJoi); this._logger = params.logger; @@ -83,6 +16,26 @@ class NotificationConfigManager { this._mongoClient = null; this._metastore = null; this._metastoreChangeStream = null; + + const { + mongoConfig, bucketMetastore, maxCachedConfigs, zkClient, zkConfig, zkPath, zkConcurrency, logger, + } = params; + if (mongoConfig) { + this._configManagerBackend = new MongoConfigManager({ + mongoConfig, + bucketMetastore, + maxCachedConfigs, + logger, + }); + } else { + this._usesZookeeperBackend = true; + this._configManagerBackend = new ZookeeperConfigManager({ + zkClient, + zkConfig, + zkPath, + zkConcurrency, + logger, + }); } /** @@ -163,120 +116,55 @@ class NotificationConfigManager { }); break; } - this._logger.debug('Change stream event processed', { - method: 'NotificationConfigManager._handleChangeStreamChange', - }); } /** - * Initializes a change stream on the metastore collection - * Only document delete and update/replace operations are - * taken into consideration to invalidate cache. - * Newly created buckets (insert operations) are not cached - * as queue populator instances read from different kafka - * partitions and so don't need the configs for all buckets - * @returns {undefined} + * Get bucket notification configuration + * + * @param {String} bucket - bucket + * @param {function} [cb] - callback + * @return {Object|undefined} - configuration if available or undefined */ - _setMetastoreChangeStream() { - /** - * To avoid processing irrelevant events - * we filter by the operation types and - * only project the fields needed - */ - const changeStreamPipeline = [ - { - $match: { - $or: [ - { operationType: 'delete' }, - { operationType: 'replace' }, - { operationType: 'update' }, - ] - } - }, - { - $project: { - '_id': 1, - 'operationType': 1, - 'documentKey._id': 1, - 'fullDocument._id': 1, - 'fullDocument.value.notificationConfiguration': 1 - }, - }, - ]; - this._metastoreChangeStream = new ChangeStream({ - logger: this._logger, - collection: this._metastore, - pipeline: changeStreamPipeline, - handler: this._handleChangeStreamChangeEvent.bind(this), - throwOnError: false, - useStartAfter: semver.gte(this._mongoVersion, '4.2.0'), - }); - // start watching metastore - this._metastoreChangeStream.start(); + getConfig(bucket, cb) { + const val = this._configManagerBackend.getConfig(bucket); + if (!cb) { + return val; + } + if (val instanceof Promise) { + return val.then(res => cb(null, res)).catch(err => cb(err)); + } + return cb(null, val); } /** - * Sets up the NotificationConfigManager by - * connecting to mongo and initializing the - * change stream - * @param {Function} cb callback - * @returns {undefined} + * Add/update bucket notification configuration. + * + * @param {String} bucket - bucket + * @param {Object} config - bucket notification configuration + * @return {boolean} - true if set */ - setup(cb) { - this._setupMongoClient(err => { - if (err) { - this._logger.error('An error occured while setting up mongo client', { - method: 'NotificationConfigManager.setup', - }); - return cb(err); - } - try { - this._setMetastoreChangeStream(); - } catch (error) { - this._logger.error('An error occured while establishing the change stream', { - method: 'NotificationConfigManager._setMetastoreChangeStream', - }); - return cb(error); - } - return cb(); - }); + setConfig(bucket, config) { + return this._configManagerBackend.setConfig(bucket, config); } /** - * Get bucket notification configuration + * Remove bucket notification configuration * * @param {String} bucket - bucket - * @return {Object|undefined} - configuration if available or undefined + * @return {undefined} */ - async getConfig(bucket) { - const startTime = Date.now(); - // return cached config for bucket if it exists - const cachedConfig = this._cachedConfigs.get(bucket); - if (cachedConfig) { - const delay = (Date.now() - startTime) / 1000; - onConfigManagerConfigGet(true, delay); - return cachedConfig; - } - try { - // retreiving bucket metadata from the metastore - const bucketMetadata = await this._metastore.findOne({ _id: bucket }); - const bucketNotificationConfiguration = (bucketMetadata && bucketMetadata.value && - bucketMetadata.value.notificationConfiguration) || undefined; - // caching the bucket configuration - this._cachedConfigs.add(bucket, bucketNotificationConfiguration); - const delay = (Date.now() - startTime) / 1000; - onConfigManagerConfigGet(false, delay); - onConfigManagerCacheUpdate('add'); - return bucketNotificationConfiguration; - } catch (err) { - this._logger.error('An error occured when getting notification ' + - 'configuration of bucket', { - method: 'NotificationConfigManager.getConfig', - bucket, - error: err.message, - }); - return undefined; - } + removeConfig(bucket) { + return this._configManagerBackend.removeConfig(bucket); + } + + /** + * Setup bucket notification configuration manager + * + * @param {function} [cb] - callback + * @return {undefined} + */ + setup(cb) { + return this._configManagerBackend.setup(cb); } } diff --git a/extensions/notification/NotificationConfigValidator.js b/extensions/notification/NotificationConfigValidator.js index 924f89e94..7be25c870 100644 --- a/extensions/notification/NotificationConfigValidator.js +++ b/extensions/notification/NotificationConfigValidator.js @@ -18,16 +18,27 @@ const destinationSchema = joi.object({ resource: joi.string().required(), type: joi.string().required(), host: joi.string().required(), - port: joi.number().required(), + port: joi.number().optional(), internalTopic: joi.string(), topic: joi.string().required(), auth: authSchema.default({}), + requiredAcks: joi.number().when('type', { + is: joi.string().not('kafka'), + then: joi.forbidden(), + otherwise: joi.number().default(1), + }), + compressionType: joi.string().when('type', { + is: joi.string().not('kafka'), + then: joi.forbidden(), + otherwise: joi.string().default('none'), + }), }); const joiSchema = joi.object({ topic: joi.string(), monitorNotificationFailures: joi.boolean().default(true), notificationFailedTopic: joi.string().optional(), + zookeeperPath: joi.string().optional(), queueProcessor: { groupId: joi.string().required(), concurrency: joi.number().greater(0).default(1000), @@ -35,7 +46,12 @@ const joiSchema = joi.object({ destinations: joi.array().items(destinationSchema).default([]), // TODO: BB-625 reset to being required after supporting probeserver in S3C // for bucket notification proceses - probeServer: probeServerJoi.optional() + probeServer: probeServerJoi.optional(), + bucketMetastore: joi.string().default('__metastore'), + maxCachedConfigs: joi.number().default(1000), + // Conrrency to use when updating all local bucket notification configs + // from zookeeper + zookeeperOpConcurrency: joi.number().default(10), }); function configValidator(backbeatConfig, extConfig) { diff --git a/extensions/notification/NotificationQueuePopulator.js b/extensions/notification/NotificationQueuePopulator.js index d0af2a48f..a8d760a17 100644 --- a/extensions/notification/NotificationQueuePopulator.js +++ b/extensions/notification/NotificationQueuePopulator.js @@ -3,6 +3,7 @@ const util = require('util'); const { isMasterKey } = require('arsenal').versioning; const { usersBucket, mpuBucketPrefix, supportedNotificationEvents } = require('arsenal').constants; +const VID_SEPERATOR = require('arsenal').versioning.VersioningConstants.VersionId.Separator; const configUtil = require('./utils/config'); const safeJsonParse = require('./utils/safeJsonParse'); const messageUtil = require('./utils/message'); @@ -38,27 +39,100 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { * @return {boolean} - true if bucket entry */ _isBucketEntry(bucket, key) { - return ((bucket.toLowerCase() === notifConstants.bucketMetastore && !!key) + return ((bucket.toLowerCase() === this.notificationConfig.bucketMetastore && !!key) || key === undefined); } + /** + * Get bucket attributes from log entry + * + * @param {Object} value - log entry object + * @return {Object|undefined} - bucket attributes if available + */ + _getBucketAttributes(value) { + if (value && value.attributes) { + const { error, result } = safeJsonParse(value.attributes); + if (error) { + return undefined; + } + return result; + } + return undefined; + } + + /** + * Get bucket name from bucket attributes + * + * @param {Object} attributes - bucket attributes from log entry + * @return {String|undefined} - bucket name if available + */ + _getBucketNameFromAttributes(attributes) { + if (attributes && attributes.name) { + return attributes.name; + } + return undefined; + } + + /** + * Get notification configuration from bucket attributes + * + * @param {Object} attributes - bucket attributes from log entry + * @return {Object|undefined} - notification configuration if available + */ + _getBucketNotificationConfiguration(attributes) { + if (attributes && attributes.notificationConfiguration) { + return attributes.notificationConfiguration; + } + return undefined; + } + + /** + * Process bucket entry from the log + * + * @param {string} bucket - bucket name from log entry + * @param {Object} value - log entry object + * @return {undefined} + */ + _processBucketEntry(bucket, value) { + const attributes = this._getBucketAttributes(value); + const bucketName = this._getBucketNameFromAttributes(attributes); + const notificationConfiguration + = this._getBucketNotificationConfiguration(attributes); + if (notificationConfiguration && + Object.keys(notificationConfiguration).length > 0) { + const bnConfig = { + bucket: bucketName, + notificationConfiguration, + }; + // bucket notification config is available, update node + this.bnConfigManager.setConfig(bucketName, bnConfig); + return undefined; + } + // bucket was deleter or notification conf has been removed, so remove zk node + this.bnConfigManager.removeConfig(bucketName || bucket); + return undefined; + } + /** * Returns the correct versionId * to display according to the * versioning state of the object * @param {Object} value log entry object + * @param {Object} overheadFields - extra fields + * @param {Object} overheadFields.versionId - object versionId * @return {String} versionId */ - _getVersionId(value) { + _getVersionId(value, overheadFields) { + const versionId = value.versionId || (overheadFields && overheadFields.versionId); const isNullVersion = value.isNull; - const isVersioned = !!value.versionId; + const isVersioned = !!versionId; // Versioning suspended objects have // a versionId, however it is internal // and should not be used to get the object if (isNullVersion || !isVersioned) { return null; } else { - return value.versionId; + return versionId; } } @@ -103,34 +177,69 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { return supportedNotificationEvents.has(eventType); } + /** + * Extract base key from versioned key + * + * @param {String} key - object key + * @return {String} - versioned base key + */ + _extractVersionedBaseKey(key) { + return key.split(VID_SEPERATOR)[0]; + } + + /** + * Returns the dateTime of the event + * based on the event message property if existent + * or overhead fields + * @param {ObjectMD} value object metadata + * @param {Object} overheadFields overhead fields + * @param {Object} overheadFields.commitTimestamp - Kafka commit timestamp + * @param {Object} overheadFields.opTimestamp - MongoDB operation timestamp + * @returns {string} dateTime of the event + */ + _getEventDateTime(value, overheadFields) { + if (overheadFields) { + return overheadFields.opTimestamp || overheadFields.commitTimestamp || null; + } + return (value && value[notifConstants.eventMessageProperty.dateTime]) || null; + } + /** * Process object entry from the log * * @param {String} bucket - bucket * @param {String} key - object key * @param {Object} value - log entry object - * @param {String} timestamp - operation timestamp + * @param {String} type - log entry type + * @param {Object} overheadFields - extra fields + * @param {Object} overheadFields.commitTimestamp - Kafka commit timestamp + * @param {Object} overheadFields.opTimestamp - MongoDB operation timestamp * @return {undefined} */ - async _processObjectEntry(bucket, key, value, timestamp) { + async _processObjectEntry(bucket, key, value, type, overheadFields) { this._metricsStore.notifEvent(); if (!this._shouldProcessEntry(key, value)) { return undefined; } - const { eventMessageProperty } = notifConstants; - const eventType = value[eventMessageProperty.eventType]; + const { eventMessageProperty, deleteEvent } = notifConstants; + let eventType = value[eventMessageProperty.eventType]; + if (eventType === undefined && type === 'del') { + eventType = deleteEvent; + } if (!this._isNotificationEventSupported(eventType)) { return undefined; } - const versionId = this._getVersionId(value); + const baseKey = this._extractVersionedBaseKey(key); + const versionId = this._getVersionId(value, overheadFields); + const dateTime = this._getEventDateTime(value, overheadFields); const config = await this.bnConfigManager.getConfig(bucket); if (config && Object.keys(config).length > 0) { const ent = { bucket, - key: value.key, + key: baseKey, eventType, versionId, - dateTime: timestamp, + dateTime, }; this.log.debug('validating entry', { method: 'NotificationQueuePopulator._processObjectEntry', @@ -149,20 +258,23 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { return undefined; } // get destination specific notification config - const destBnConf = config.queueConfig.filter( - c => c.queueArn.split(':').pop() - === destination.resource); - if (!destBnConf.length) { + const queueConfig = config.notificationConfiguration.queueConfig.filter( + c => c.queueArn.split(':').pop() === destination.resource + ); + if (!queueConfig.length) { // skip, if there is no config for the current // destination resource return undefined; } // pass only destination resource specific config to // validate entry - const bnConfig = { - queueConfig: destBnConf, + const destConfig = { + bucket, + notificationConfiguration: { + queueConfig, + }, }; - const { isValid, matchingConfig } = configUtil.validateEntry(bnConfig, ent); + const { isValid, matchingConfig } = configUtil.validateEntry(destConfig, ent); if (isValid) { const message = messageUtil.addLogAttributes(value, ent); @@ -200,7 +312,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { * @return {undefined} Promise|undefined */ filterAsync(entry, cb) { - const { bucket, key, overheadFields } = entry; + const { bucket, key, type, overheadFields } = entry; const value = entry.value || '{}'; const { error, result } = safeJsonParse(value); // ignore if entry's value is not valid @@ -208,16 +320,18 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { this.log.error('could not parse log entry', { value, error }); return cb(); } - // ignore bucket operations, mpu's or if the entry has no bucket - const isUserBucketOp = !bucket || bucket === usersBucket; - const isMpuOp = key && key.startsWith(mpuBucketPrefix); - const isBucketOp = bucket && result && this._isBucketEntry(bucket, key); - if ([isUserBucketOp, isMpuOp, isBucketOp].some(cond => cond)) { + // ignore bucket op, mpu's or if the entry has no bucket + if (!bucket || bucket === usersBucket || (key && key.startsWith(mpuBucketPrefix))) { + return cb(); + } + // bucket notification configuration updates + if (bucket && result && this._isBucketEntry(bucket, key)) { + this._processBucketEntry(key, result); return cb(); } // object entry processing - filter and publish if (key && result) { - return this._processObjectEntryCb(bucket, key, result, overheadFields.opTimestamp, cb); + return this._processObjectEntryCb(bucket, key, result, type, overheadFields, cb); } return cb(); } diff --git a/extensions/notification/configManager/BaseConfigManager.js b/extensions/notification/configManager/BaseConfigManager.js new file mode 100644 index 000000000..36582bed7 --- /dev/null +++ b/extensions/notification/configManager/BaseConfigManager.js @@ -0,0 +1,40 @@ +const { errors } = require('arsenal'); + +class BaseConfigManager { + + /** + * Setup the config manager + * @param {Function} cb callback + * @return {undefined} + */ + setup(cb) { + return cb(errors.NotImplemented); + } + + /** + * Get bucket notification configuration + * @param {String} bucket - bucket + * @return {Object|undefined} - configuration if available or undefined + */ + getConfig(bucket) { // eslint-disable-line no-unused-vars + throw new errors.NotImplemented('Method not implemented'); + } + + /** + * Set bucket notification configuration + * @return {boolean} - false + */ + setConfig() { + throw new errors.NotImplemented('Method not implemented'); + } + + /** + * Remove bucket notification configuration + * @return {boolean} - false + */ + removeConfig() { + throw new errors.NotImplemented('Method not implemented'); + } +} + +module.exports = BaseConfigManager; diff --git a/extensions/notification/configManager/MongoConfigManager.js b/extensions/notification/configManager/MongoConfigManager.js new file mode 100644 index 000000000..18eba3f1b --- /dev/null +++ b/extensions/notification/configManager/MongoConfigManager.js @@ -0,0 +1,316 @@ +const joi = require('joi'); +const semver = require('semver'); + +const { ZenkoMetrics } = require('arsenal').metrics; +const LRUCache = require('arsenal').algorithms + .cache.LRUCache; +const { errors } = require('arsenal'); + +const MongoClient = require('mongodb').MongoClient; + +const ChangeStream = require('../../../lib/wrappers/ChangeStream'); +const { constructConnectionString, getMongoVersion } = require('../../utils/MongoUtils'); +const BaseConfigManager = require('./BaseConfigManager'); + +const paramsJoi = joi.object({ + mongoConfig: joi.object().required(), + bucketMetastore: joi.string().required(), + maxCachedConfigs: joi.number().required(), + logger: joi.object().required(), +}).required(); + +// should equal true if config manager's cache was hit during a get operation +const CONFIG_MANAGER_CACHE_HIT = 'cache_hit'; +// Type of operation performed on the cache +const CONFIG_MANAGER_OPERATION_TYPE = 'op'; + +const cacheUpdates = ZenkoMetrics.createCounter({ + name: 's3_notification_config_manager_cache_updates_total', + help: 'Total number of cache updates', + labelNames: [ + CONFIG_MANAGER_OPERATION_TYPE, + ], +}); + +const configGetLag = ZenkoMetrics.createHistogram({ + name: 's3_notification_config_manager_config_get_seconds', + help: 'Time it takes in seconds to get a bucket notification config from MongoDB', + labelNames: [ + CONFIG_MANAGER_CACHE_HIT, + ], + buckets: [0.001, 0.01, 1, 10, 100, 1000], +}); + +const cachedBuckets = ZenkoMetrics.createGauge({ + name: 's3_notification_config_manager_cached_buckets_count', + help: 'Total number of cached buckets in the notification config manager', +}); + +function onConfigManagerCacheUpdate(op) { + cacheUpdates.inc({ + [CONFIG_MANAGER_OPERATION_TYPE]: op, + }); + if (op === 'add') { + cachedBuckets.inc({}); + } else if (op === 'delete') { + cachedBuckets.dec({}); + } +} + +function onConfigManagerConfigGet(cacheHit, delay) { + configGetLag.observe({ + [CONFIG_MANAGER_CACHE_HIT]: cacheHit, + }, delay); +} + +/** + * @class MongoConfigManager + * + * @classdesc Manages bucket notification configurations, the configurations + * are directly retrieved from the metastore, and are locally cached. Cache + * is invalidated using MongoDB change streams. + */ +class MongoConfigManager extends BaseConfigManager { + /** + * @constructor + * @param {Object} params - constructor params + * @param {Object} params.mongoConfig - mongoDB config + * @param {Logger} params.logger - logger object + */ + constructor(params) { + super(); + joi.attempt(params, paramsJoi); + this._logger = params.logger; + this._mongoConfig = params.mongoConfig; + this._cachedConfigs = new LRUCache(params.maxCachedConfigs); + this._bucketMetastore = params.bucketMetastore; + this._mongoClient = null; + this._metastore = null; + this._metastoreChangeStream = null; + } + + /** + * Connects to MongoDB using the MongoClientInterface + * and retreives the metastore collection + * @param {Function} cb callback + * @returns {undefined} + */ + _setupMongoClient(cb) { + const mongoUrl = constructConnectionString(this._mongoConfig); + MongoClient.connect(mongoUrl, { + replicaSet: this._mongoConfig.replicaSet, + useNewUrlParser: true, + }, + (err, client) => { + if (err) { + this._logger.error('Could not connect to MongoDB', { + method: 'MongoConfigManager._setupMongoClient', + error: err.message, + }); + return cb(err); + } + this._logger.debug('Connected to MongoDB', { + method: 'MongoConfigManager._setupMongoClient', + }); + try { + this._mongoClient = client.db(this._mongoConfig.database, { + ignoreUndefined: true, + }); + this._metastore = this._mongoClient.collection(this._bucketMetastore); + // get mongodb version + getMongoVersion(this._mongoClient, (err, version) => { + if (err) { + this._logger.error('Could not get MongoDB version', { + method: 'MongoConfigManager._setupMongoClient', + error: err.message, + }); + return cb(err); + } + this._mongoVersion = version; + return cb(); + }); + return undefined; + } catch (error) { + return cb(error); + } + }); + } + + /** + * Handler for the change stream "change" event. + * Invalidates cached bucket configs based on the change. + * @param {ChangeStreamDocument} change Change stream change object + * @returns {undefined} + */ + _handleChangeStreamChangeEvent(change) { + // invalidating cached notification configs + const cachedConfig = this._cachedConfigs.get(change.documentKey._id); + const bucketNotificationConfiguration = change.fullDocument ? change.fullDocument.value. + notificationConfiguration : null; + switch (change.operationType) { + case 'delete': + if (cachedConfig) { + this._cachedConfigs.remove(change.documentKey._id); + onConfigManagerCacheUpdate('delete'); + } + break; + case 'replace': + case 'update': + if (cachedConfig) { + // add() replaces the value of an entry if it exists in cache + this._cachedConfigs.add(change.documentKey._id, bucketNotificationConfiguration); + onConfigManagerCacheUpdate('update'); + } + break; + default: + this._logger.debug('Skipping unsupported change stream event', { + method: 'MongoConfigManager._handleChangeStreamChange', + }); + break; + } + this._logger.debug('Change stream event processed', { + method: 'MongoConfigManager._handleChangeStreamChange', + }); + } + + /** + * Initializes a change stream on the metastore collection + * Only document delete and update/replace operations are + * taken into consideration to invalidate cache. + * Newly created buckets (insert operations) are not cached + * as queue populator instances read from different kafka + * partitions and so don't need the configs for all buckets + * @returns {undefined} + */ + _setMetastoreChangeStream() { + /** + * To avoid processing irrelevant events + * we filter by the operation types and + * only project the fields needed + */ + const changeStreamPipeline = [ + { + $match: { + $or: [ + { operationType: 'delete' }, + { operationType: 'replace' }, + { operationType: 'update' }, + ] + } + }, + { + $project: { + '_id': 1, + 'operationType': 1, + 'documentKey._id': 1, + 'fullDocument._id': 1, + 'fullDocument.value.notificationConfiguration': 1 + }, + }, + ]; + this._metastoreChangeStream = new ChangeStream({ + logger: this._logger, + collection: this._metastore, + pipeline: changeStreamPipeline, + handler: this._handleChangeStreamChangeEvent.bind(this), + throwOnError: false, + useStartAfter: semver.gte(this._mongoVersion, '4.2.0'), + }); + // start watching metastore + this._metastoreChangeStream.start(); + } + + /** + * Sets up the MongoConfigManager by + * connecting to mongo and initializing the + * change stream + * @param {Function} cb callback + * @returns {undefined} + */ + setup(cb) { + this._setupMongoClient(err => { + if (err) { + this._logger.error('An error occured while setting up mongo client', { + method: 'MongoConfigManager.setup', + }); + return cb(err); + } + try { + this._setMetastoreChangeStream(); + } catch (error) { + this._logger.error('An error occured while establishing the change stream', { + method: 'MongoConfigManager._setMetastoreChangeStream', + }); + return cb(error); + } + return cb(); + }); + } + + /** + * Get bucket notification configuration + * + * @param {String} bucket - bucket + * @return {Object|undefined} - configuration if available or undefined + */ + async getConfig(bucket) { + const startTime = Date.now(); + // return cached config for bucket if it exists + const cachedConfig = this._cachedConfigs.get(bucket); + if (cachedConfig) { + const delay = (Date.now() - startTime) / 1000; + onConfigManagerConfigGet(true, delay); + return { + bucket, + notificationConfiguration: cachedConfig, + }; + } + try { + // retreiving bucket metadata from the metastore + const bucketMetadata = await this._metastore.findOne({ _id: bucket }); + const notificationConfiguration = bucketMetadata?.value?.notificationConfiguration; + const delay = (Date.now() - startTime) / 1000; + onConfigManagerConfigGet(false, delay); + if (!notificationConfiguration) { + return undefined; + } + // caching the bucket configuration + this._cachedConfigs.add(bucket, notificationConfiguration); + onConfigManagerCacheUpdate('add'); + return { + bucket, + notificationConfiguration + }; + } catch (err) { + this._logger.error('An error occured when getting notification ' + + 'configuration of bucket', { + method: 'MongoConfigManager.getConfig', + bucket, + error: err.message, + }); + throw errors.InternalError.customizeDescription(err.message); + } + } + + /** + * Set bucket notification configuration + * Not needed for the MongoDB backend, as we + * use change streams to watch for config changes + * @return {boolean} - false + */ + setConfig() { + return false; + } + + /** + * Remove bucket notification configuration + * Not needed for the MongoDB backend, as we + * use change streams to watch for config changes + * @return {boolean} - false + */ + removeConfig() { + return false; + } +} + +module.exports = MongoConfigManager; diff --git a/extensions/notification/configManager/ZookeeperConfigManager.js b/extensions/notification/configManager/ZookeeperConfigManager.js new file mode 100644 index 000000000..603106ab8 --- /dev/null +++ b/extensions/notification/configManager/ZookeeperConfigManager.js @@ -0,0 +1,470 @@ +const async = require('async'); +const joi = require('joi'); +const { EventEmitter } = require('events'); +const zookeeper = require('node-zookeeper-client'); + +const ZookeeperManager = require('../../../lib/clients/ZookeeperManager'); +const BaseConfigManager = require('./BaseConfigManager'); + +const safeJsonParse = require('../utils/safeJsonParse'); +const constants = require('../constants'); + +const paramsJoi = joi.object({ + zkClient: joi.object().optional(), + zkConfig: joi.object().when('zkClient', { + is: joi.exist(), + then: joi.optional(), + otherwise: joi.required(), + }), + zkPath: joi.string().when('zkClient', { + is: joi.exist(), + then: joi.optional(), + otherwise: joi.required(), + }), + zkConcurrency: joi.number().required(), + logger: joi.object().required(), +}).required(); + +/** + * @class ZookeeperConfigManager + * + * @classdesc Manages bucket notification configurations in zookeeper + */ +class ZookeeperConfigManager extends BaseConfigManager { + /** + * @constructor + * @param {Object} params - constructor params + * @param {Object} params.zkClient - zookeeper client + * @param {Logger} params.logger - logger object + */ + constructor(params) { + super(); + joi.attempt(params, paramsJoi); + this._zkClient = params.zkClient; + this._zkPath = params.zkPath; + this._zkConfig = params.zkConfig; + this._zkConcurrency = params.zkConcurrency; + this.log = params.logger; + this._configs = new Map(); + this._emitter = new EventEmitter(); + this._setupEventListeners(); + } + + _errorListener(error, listener) { + this.log.error('ZookeeperConfigManager.emitter.error', { + listener, + error, + }); + return undefined; + } + + _setConfigListener(bucket, config) { + this.log.debug('ZookeeperConfigManager.emitter.setConfig', { + event: 'setConfig', + bucket, + config, + }); + this._setBucketNotifConfig(bucket, JSON.stringify(config), err => { + if (err) { + this._emitter.emit('error', err, 'setConfigListener'); + } + return undefined; + }); + } + + _getConfigListener(updatedBucket = '') { + this.log.debug('ZookeeperConfigManager.emitter.getConfig', { + event: 'getConfig', + }); + this._listBucketsWithConfig((err, buckets) => { + if (err) { + this._emitter.emit('error', err, 'getConfigListener'); + return undefined; + } + this.log.debug('bucket config to be updated in map', { + bucket: updatedBucket, + }); + const newBuckets = this._getNewBucketNodes(buckets); + this.log.debug('new bucket configs to be added to map', { + buckets: newBuckets, + }); + const bucketsToMap = updatedBucket ? [updatedBucket, ...newBuckets] : newBuckets; + this.log.debug('bucket configs to be added/updated to map', { + buckets: bucketsToMap, + }); + if (bucketsToMap.length > 0) { + this._updateLocalStore(bucketsToMap); + } + return undefined; + }); + } + + _removeConfigListener(bucket) { + this.log.debug('ZookeeperConfigManager.emitter.removeConfig', { + event: 'removeConfig', + bucket, + }); + this._removeBucketNotifConfigNode(bucket, err => { + if (err) { + this._emitter.emit('error', err, 'removeConfigListener'); + } + return undefined; + }); + } + + _setupEventListeners() { + this._emitter + .on('error', error => this._errorListener(error)) + .on('setConfig', + (bucket, config) => this._setConfigListener(bucket, config)) + .on('getConfig', bucket => this._getConfigListener(bucket)) + .on('removeConfig', bucket => this._removeConfigListener(bucket)); + } + + _callbackHandler(cb, err, result) { + if (cb && typeof cb === 'function') { + return cb(err, result); + } + return undefined; + } + + _getBucketNodeZkPath(bucket) { + return `/${constants.zkConfigParentNode}/${bucket}`; + } + + _getConfigDataFromBuffer(data) { + const { error, result } = safeJsonParse(data); + if (error) { + this.log.error('invalid config', { error, config: data }); + return undefined; + } + return result; + } + + _getBucketNotifConfig(bucket, cb) { + const method + = 'ZookeeperConfigManager._getBucketNotifConfig'; + const zkPath = this._getBucketNodeZkPath(bucket); + this.log.debug('fetching bucket notification configuration', { + method, + bucket, + zkPath, + }); + return this._zkClient.getData(zkPath, event => { + this.log.debug('zookeeper getData watcher triggered', { + zkPath, + method, + event, + bucket, + }); + if (event.type === zookeeper.Event.NODE_DATA_CHANGED) { + this._emitter.emit('getConfig', bucket); + } + if (event.type === zookeeper.Event.NODE_DELETED) { + this.removeConfig(bucket, false); + } + }, (error, data) => { + if (error && error.name !== 'NO_NODE') { + const errMsg + = 'error fetching bucket notification configuration'; + this.log.error(errMsg, { + method, + error, + }); + return this._callbackHandler(cb, error); + } + if (data) { + return this._callbackHandler(cb, null, data); + } + // no configuration + return this._callbackHandler(cb); + }); + } + + _checkNodeExists(zkPath, cb) { + const method + = 'ZookeeperConfigManager._checkNodeExists'; + return this._zkClient.exists(zkPath, (err, stat) => { + if (err) { + this.log.error('error checking node existence', + { method, zkPath }); + return this._callbackHandler(cb, err); + } + if (stat) { + this.log.debug('node exists', { method, zkPath }); + return this._callbackHandler(cb, null, true); + } + this.log.debug('node does not exist', { method, zkPath }); + return this._callbackHandler(cb, null, false); + }); + } + + _setBucketNotifConfig(bucket, data, cb) { + const method + = 'ZookeeperConfigManager._setBucketNotifConfig'; + const zkPath = this._getBucketNodeZkPath(bucket); + this.log.debug('setting bucket notification configuration', { + method, + bucket, + zkPath, + }); + return async.waterfall([ + next => this._checkNodeExists(zkPath, next), + (exists, next) => { + if (!exists) { + return this._createBucketNotifConfigNode(bucket, + err => next(err)); + } + return next(); + }, + next => this._zkClient.setData(zkPath, Buffer.from(data), -1, next), + ], err => { + if (err) { + this.log.error('error saving config', { method, zkPath, data }); + } + return this._callbackHandler(cb, err); + }); + } + + _checkConfigurationParentNode(cb) { + const method + = 'ZookeeperConfigManager._checkConfigurationParentNode'; + const zkPath = `/${constants.zkConfigParentNode}`; + return async.waterfall([ + next => this._checkNodeExists(zkPath, next), + (exists, next) => { + if (!exists) { + this.log.debug('parent configuration zookeeper node does ' + + 'not exist', { method, zkPath }); + return this._zkClient.mkdirp(zkPath, err => next(err)); + } + this.log.debug('parent configuration zookeeper node exists', + { method, zkPath }); + return next(); + }, + ], err => { + if (err) { + const errMsg + = 'error checking configuration zookeeper parent node'; + this.log.error(errMsg, { method, zkPath, error: err.message }); + return this._callbackHandler(cb, err); + } + this.log.debug('parent configuration zookeeper checked/added', + { method, zkPath }); + return this._callbackHandler(cb); + }); + } + + _createBucketNotifConfigNode(bucket, cb) { + const method + = 'ZookeeperConfigManager._createBucketNotifConfigNode'; + const zkPath = this._getBucketNodeZkPath(bucket); + this.log.debug('creating bucket notification configuration node', { + method, + bucket, + zkPath, + }); + return this._zkClient.mkdirp(zkPath, err => { + if (err) { + this.log.error('Could not pre-create path in zookeeper', { + method, + zkPath, + error: err, + }); + return this._callbackHandler(cb, err); + } + return this._callbackHandler(cb); + }); + } + + _removeBucketNotifConfigNode(bucket, cb) { + const method + = 'ZookeeperConfigManager._removeBucketNotifConfigNode'; + const zkPath = this._getBucketNodeZkPath(bucket); + this.log.debug('removing bucket notification configuration node', { + method, + bucket, + zkPath, + }); + return this._zkClient.remove(zkPath, error => { + if (error && error.name !== 'NO_NODE') { + this.log.error('Could not remove zookeeper node', { + method, + zkPath, + error, + }); + return this._callbackHandler(cb, error); + } + if (!error) { + const msg + = 'removed notification configuration zookeeper node'; + this.log.debug(msg, { + method, + bucket, + }); + } + return this._callbackHandler(cb); + }); + } + + _getNewBucketNodes(bucketsNodeList) { + if (Array.isArray(bucketsNodeList)) { + const bucketsFromMap = [...this._configs.keys()]; + return bucketsNodeList.filter(b => !bucketsFromMap.includes(b)); + } + return []; + } + + _listBucketsWithConfig(cb) { + const method + = 'ZookeeperConfigManager._listBucketsWithConfig'; + const zkPath = `/${constants.zkConfigParentNode}`; + this._zkClient.getChildren(zkPath, event => { + this.log.debug('zookeeper getChildren watcher triggered', { + zkPath, + method, + event, + }); + if (event.type === zookeeper.Event.NODE_CHILDREN_CHANGED) { + this._emitter.emit('getConfig'); + } + }, (error, buckets) => { + if (error) { + const errMsg + = 'error listing buckets with configuration'; + this.log.error(errMsg, { + zkPath, + method, + error, + }); + this._callbackHandler(cb, error); + } + this._callbackHandler(cb, null, buckets); + }); + } + + _updateLocalStore(buckets, cb) { + async.eachLimit(buckets, this._zkConcurrency, (bucket, next) => { + this._getBucketNotifConfig(bucket, (err, data) => { + if (err) { + return next(err); + } + const configObject = this._getConfigDataFromBuffer(data); + if (configObject) { + this._configs.set(bucket, configObject); + } + return next(); + }); + }, err => this._callbackHandler(cb, err)); + } + + /** + * Get bucket notification configuration + * + * @param {String} bucket - bucket + * @return {Object|undefined} - configuration if available or undefined + */ + getConfig(bucket) { + return this._configs.get(bucket); + } + + /** + * Add/update bucket notification configuration + * + * @param {String} bucket - bucket + * @param {Object} config - bucket notification configuration + * @return {boolean} - true if set + */ + setConfig(bucket, config) { + try { + this.log.debug('set config', { + method: 'ZookeeperConfigManager.setConfig', + bucket, + config, + }); + this._configs.set(bucket, config); + this._emitter.emit('setConfig', bucket, config); + return true; + } catch (err) { + const errMsg + = 'error setting bucket notification configuration'; + this.log.error(errMsg, { + method: 'ZookeeperConfigManager.setConfig', + error: err.message, + bucket, + config, + }); + return false; + } + } + + /** + * Remove bucket notification configuration + * + * @param {String} bucket - bucket + * @return {boolean} - true if removed + */ + removeConfig(bucket) { + try { + this.log.debug('remove config', { + method: 'ZookeeperConfigManager.removeConfig', + bucket, + }); + this._configs.delete(bucket); + this._emitter.emit('removeConfig', bucket); + return true; + } catch (err) { + const errMsg + = 'error removing bucket notification configuration'; + this.log.error(errMsg, { + method: 'ZookeeperConfigManager.removeConfig', + error: err, + bucket, + }); + return false; + } + } + + _setupZookeeper(done) { + if (this._zkClient) { + done(); + return; + } + + const zookeeperUrl = + `${this._zkConfig.connectionString}${this._zkPath}`; + this.log.info('opening zookeeper connection for reading ' + + 'bucket notification configuration', { + zookeeperUrl, + method: 'ZookeeperConfigManager._setupZookeeper', + }); + this._zkClient = new ZookeeperManager(zookeeperUrl, { + autoCreateNamespace: this._zkConfig.autoCreateNamespace, + retries: this._zkConfig.retries, + }, this.log); + + this._zkClient.once('error', done); + this._zkClient.once('ready', () => { + // just in case there would be more 'error' events emitted + this._zkClient.removeAllListeners('error'); + done(); + }); + } + + /** + * Setup bucket notification configuration manager + * + * @param {function} [cb] - callback + * @return {undefined} + */ + setup(cb) { + async.waterfall([ + next => this._setupZookeeper(next), + next => this._checkConfigurationParentNode(next), + (_, next) => this._listBucketsWithConfig(next), + (buckets, next) => this._updateLocalStore(buckets, next), + ], cb); + } +} + +module.exports = ZookeeperConfigManager; diff --git a/extensions/notification/constants.js b/extensions/notification/constants.js index 8c5add9aa..5ec9eda8f 100644 --- a/extensions/notification/constants.js +++ b/extensions/notification/constants.js @@ -5,6 +5,7 @@ const constants = { suffix: 'Suffix', }, bucketNotifConfigPropName: 'notificationConfiguration', + zkConfigParentNode: 'config', arn: { partition: 'scality', service: 'bucketnotif', @@ -13,6 +14,7 @@ const constants = { supportedAuthTypes: ['kerberos'], deleteEvent: 's3:ObjectRemoved:Delete', eventMessageProperty: { + dateTime: 'last-modified', eventType: 'originOp', region: 'dataStoreName', schemaVersion: 'md-model-version', @@ -22,7 +24,6 @@ const constants = { eventVersion: '1.0', eventSource: 'scality:s3', eventS3SchemaVersion: '1.0', - bucketMetastore: '__metastore', }; module.exports = constants; diff --git a/extensions/notification/destination/KafkaNotificationDestination.js b/extensions/notification/destination/KafkaNotificationDestination.js index 237db7652..b7936f164 100644 --- a/extensions/notification/destination/KafkaNotificationDestination.js +++ b/extensions/notification/destination/KafkaNotificationDestination.js @@ -46,12 +46,18 @@ class KafkaNotificationDestination extends NotificationDestination { } _setupProducer(done) { - const { host, port, topic, pollIntervalMs, auth } = this._destinationConfig; + const { host, port, topic, pollIntervalMs, auth, requiredAcks, compressionType } = this._destinationConfig; + let kafkaHost = host; + if (port) { + kafkaHost = `${host}:${port}`; + } const producer = new KafkaProducer({ - kafka: { hosts: `${host}:${port}` }, + kafka: { hosts: kafkaHost }, topic, pollIntervalMs, auth, + compressionType, + requiredAcks, }); producer.once('error', done); producer.once('ready', () => { diff --git a/extensions/notification/destination/KafkaProducer.js b/extensions/notification/destination/KafkaProducer.js index 10bbb3caa..59921e692 100644 --- a/extensions/notification/destination/KafkaProducer.js +++ b/extensions/notification/destination/KafkaProducer.js @@ -17,10 +17,6 @@ class KafkaProducer extends BackbeatProducer { return 'NotificationProducer'; } - getRequireAcks() { - return 1; - } - setFromConfig(joiResult) { super.setFromConfig(joiResult); this._auth = joiResult.auth ? authUtil.generateKafkaAuthObject(joiResult.auth) : {}; diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index 3e0aaf946..5eea4ada6 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -4,9 +4,7 @@ const { EventEmitter } = require('events'); const Logger = require('werelogs').Logger; const async = require('async'); const assert = require('assert'); -const util = require('util'); const { ZenkoMetrics } = require('arsenal').metrics; -const { wrapGaugeSet } = require('../../../lib/util/metrics'); const errors = require('arsenal').errors; const BackbeatConsumer = require('../../../lib/BackbeatConsumer'); @@ -14,7 +12,6 @@ const NotificationDestination = require('../destination'); const configUtil = require('../utils/config'); const messageUtil = require('../utils/message'); const NotificationConfigManager = require('../NotificationConfigManager'); -const { notificationQueueProcessor } = require('../../../lib/constants').services; const processedEvents = ZenkoMetrics.createCounter({ name: 's3_notification_queue_processor_events_total', @@ -29,25 +26,6 @@ function onQueueProcessorEventProcessed(destination, eventType) { }); } -const kafkaLagMetric = ZenkoMetrics.createGauge({ - name: 's3_notification_queue_lag', - help: 'Number of update entries waiting to be consumed from the Kafka topic', - labelNames: ['origin', 'containerName', 'partition', 'serviceName'], -}); - -const defaultLabels = { - origin: 'notification', -}; - -/** - * Contains methods to incrememt different metrics - * @typedef {Object} MetricsHandler - * @property {GaugeSet} lag - kafka lag metric - */ -const metricsHandler = { - lag: wrapGaugeSet(kafkaLagMetric, defaultLabels), -}; - class QueueProcessor extends EventEmitter { /** * Create a queue processor object to activate notification from a @@ -55,6 +33,7 @@ class QueueProcessor extends EventEmitter { * * @constructor * @param {Object} mongoConfig - mongodb connnection configuration object + * @param {Object} zkConfig - zookeeper configuration object * @param {Object} kafkaConfig - kafka configuration object * @param {string} kafkaConfig.hosts - list of kafka brokers * as "host:port[,host:port...]" @@ -86,14 +65,14 @@ class QueueProcessor extends EventEmitter { * @param {String} destinationId - resource name/id of destination * @param {Object} destinationAuth - destination authentication config */ - constructor(mongoConfig, kafkaConfig, notifConfig, destinationId, + constructor(mongoConfig, zkConfig, kafkaConfig, notifConfig, destinationId, destinationAuth) { super(); this.mongoConfig = mongoConfig; + this.zkConfig = zkConfig; this.kafkaConfig = kafkaConfig; this.notifConfig = notifConfig; this.destinationId = destinationId; - this.serviceName = notificationQueueProcessor; this.destinationConfig = notifConfig.destinations.find(dest => dest.resource === destinationId); assert(this.destinationConfig, `Invalid destination argument "${destinationId}".` + @@ -106,10 +85,6 @@ class QueueProcessor extends EventEmitter { this.bnConfigManager = null; this._consumer = null; this._destination = null; - // Once the notification manager is initialized - // this will hold the callback version of the getConfig - // function of the notification config manager - this._getConfig = null; this.logger = new Logger('Backbeat:Notification:QueueProcessor'); } @@ -123,6 +98,11 @@ class QueueProcessor extends EventEmitter { try { this.bnConfigManager = new NotificationConfigManager({ mongoConfig: this.mongoConfig, + bucketMetastore: this.notifConfig.bucketMetastore, + maxCachedConfigs: this.notifConfig.maxCachedConfigs, + zkConfig: this.zkConfig, + zkPath: this.notifConfig.zookeeperPath, + zkConcurrency: this.notifConfig.zookeeperOpConcurrency, logger: this.logger, }); return this.bnConfigManager.setup(done); @@ -181,6 +161,8 @@ class QueueProcessor extends EventEmitter { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic: internalTopic, groupId: consumerGroupId, @@ -203,9 +185,6 @@ class QueueProcessor extends EventEmitter { this.emit('ready'); return next(); }); - // callbackify getConfig from notification config manager - this._getConfig = util.callbackify(this.bnConfigManager - .getConfig.bind(this.bnConfigManager)); return undefined; }, ], err => { @@ -253,7 +232,7 @@ class QueueProcessor extends EventEmitter { } const { bucket, key, eventType } = sourceEntry; try { - return this._getConfig(bucket, (err, notifConfig) => { + return this.bnConfigManager.getConfig(bucket, (err, notifConfig) => { if (err) { this.logger.error('Error while getting notification configuration', { bucket, @@ -264,19 +243,15 @@ class QueueProcessor extends EventEmitter { return done(err); } if (notifConfig && Object.keys(notifConfig).length > 0) { - const destBnConf = notifConfig.queueConfig.filter( - c => c.queueArn.split(':').pop() - === this.destinationId); - if (!destBnConf.length) { + // get destination specific notification config + const queueConfig = notifConfig.notificationConfiguration.queueConfig.filter( + c => c.queueArn.split(':').pop() === this.destinationId + ); + if (!queueConfig.length) { // skip, if there is no config for the current // destination resource - return done(); + return undefined; } - // pass only destination resource specific config to - // validate entry - const bnConfig = { - queueConfig: destBnConf, - }; this.logger.debug('validating entry', { method: 'QueueProcessor.processKafkaEntry', bucket, @@ -285,7 +260,13 @@ class QueueProcessor extends EventEmitter { eventType, destination: this.destinationId, }); - const { isValid, matchingConfig } = configUtil.validateEntry(bnConfig, sourceEntry); + const destConfig = { + bucket, + notificationConfiguration: { + queueConfig, + }, + }; + const { isValid, matchingConfig } = configUtil.validateEntry(destConfig, sourceEntry); if (isValid) { // add notification configuration id to the message sourceEntry.configurationId = matchingConfig.id; @@ -351,19 +332,6 @@ class QueueProcessor extends EventEmitter { */ async handleMetrics(res, log) { log.debug('metrics requested'); - - if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) { - // consumer stats lag is on a different update cycle so we need to - // update the metrics when requested - const lagStats = this._consumer.consumerStats.lag; - Object.keys(lagStats).forEach(partition => { - metricsHandler.lag({ - partition, - serviceName: this.serviceName, - }, lagStats[partition]); - }); - } - res.writeHead(200, { 'Content-Type': ZenkoMetrics.asPrometheusContentType(), }); diff --git a/extensions/notification/queueProcessor/task.js b/extensions/notification/queueProcessor/task.js index e0fe0ba0e..ee1b45224 100644 --- a/extensions/notification/queueProcessor/task.js +++ b/extensions/notification/queueProcessor/task.js @@ -16,6 +16,7 @@ const config = require('../../../lib/Config'); const kafkaConfig = config.kafka; const notifConfig = config.extensions.notification; const mongoConfig = config.queuePopulator.mongo; +const zkConfig = config.zookeeper; const log = new werelogs.Logger('Backbeat:NotificationProcessor:task'); werelogs.configure({ @@ -44,7 +45,7 @@ if (isDestinationAuthEmpty) { destinationAuth = null; } const queueProcessor = new QueueProcessor( - mongoConfig, kafkaConfig, notifConfig, destination, destinationAuth); + mongoConfig, zkConfig, kafkaConfig, notifConfig, destination, destinationAuth); /** * Handle ProbeServer liveness check diff --git a/extensions/notification/utils/config.js b/extensions/notification/utils/config.js index efe5950bb..2435eeea8 100644 --- a/extensions/notification/utils/config.js +++ b/extensions/notification/utils/config.js @@ -76,7 +76,7 @@ function filterConfigsByEvent(queueConfig, eventType) { * @return {Object} Result with validity boolean and matching configuration rule. */ function validateEntry(bnConfig, entry) { - const eventType = entry.eventType; + const { bucket, eventType } = entry; /** * if the event type is unavailable, it is an entry that is a @@ -87,9 +87,14 @@ function validateEntry(bnConfig, entry) { return { isValid: false, matchingConfig: null }; } + if (bucket !== bnConfig.bucket) { + return { isValid: false, matchingConfig: null }; + } + // check if the event type matches at least one supported event from // one of the queue configurations - const qConfigs = filterConfigsByEvent(bnConfig.queueConfig, eventType); + const notifConf = bnConfig.notificationConfiguration; + const qConfigs = filterConfigsByEvent(notifConf.queueConfig, eventType); if (qConfigs.length > 0) { const matchingConfig = qConfigs.find(c => { diff --git a/extensions/replication/failedCRR/FailedCRRConsumer.js b/extensions/replication/failedCRR/FailedCRRConsumer.js index 4af970385..5bf940ebe 100644 --- a/extensions/replication/failedCRR/FailedCRRConsumer.js +++ b/extensions/replication/failedCRR/FailedCRRConsumer.js @@ -40,6 +40,8 @@ class FailedCRRConsumer { kafka: { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic: this._topic, groupId: 'backbeat-retry-group', diff --git a/extensions/replication/failedCRR/FailedCRRProducer.js b/extensions/replication/failedCRR/FailedCRRProducer.js index e78f429c9..a342df148 100644 --- a/extensions/replication/failedCRR/FailedCRRProducer.js +++ b/extensions/replication/failedCRR/FailedCRRProducer.js @@ -28,6 +28,8 @@ class FailedCRRProducer { this._producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); this._producer.once('error', cb); diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 1a3e21929..0c8eafd35 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -30,7 +30,7 @@ const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry'); const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); const MetricsProducer = require('../../../lib/MetricsProducer'); const libConstants = require('../../../lib/constants'); -const { wrapCounterInc, wrapHistogramObserve, wrapGaugeSet } = require('../../../lib/util/metrics'); +const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); const { @@ -79,12 +79,6 @@ const metadataReplicationBytesMetric = ZenkoMetrics.createCounter({ labelNames: ['origin', 'serviceName', 'location'], }); -const kafkaLagMetric = ZenkoMetrics.createGauge({ - name: 's3_replication_queue_lag', - help: 'Number of update entries waiting to be consumed from the Kafka topic', - labelNames: ['origin', 'containerName', 'partition', 'serviceName'], -}); - const sourceDataBytesMetric = ZenkoMetrics.createCounter({ name: 's3_replication_source_data_bytes_total', help: 'Total number of data bytes read from replication source', @@ -129,7 +123,6 @@ const defaultLabels = { * @property {CounterInc} dataReplicationBytes - Increments the replication bytes metric for data operation * @property {CounterInc} metadataReplicationBytes - Increments the replication bytes metric for metadata operation * @property {CounterInc} sourceDataBytes - Increments the source data bytes metric - * @property {GaugeSet} lag - Kafka lag metric * @property {CounterInc} reads - Increments the read metric * @property {CounterInc} writes - Increments the write metric * @property {HistogramObserve} timeElapsed - Observes the time elapsed metric @@ -140,7 +133,6 @@ const metricsHandler = { dataReplicationBytes: wrapCounterInc(dataReplicationBytesMetric, defaultLabels), metadataReplicationBytes: wrapCounterInc(metadataReplicationBytesMetric, defaultLabels), sourceDataBytes: wrapCounterInc(sourceDataBytesMetric, defaultLabels), - lag: wrapGaugeSet(kafkaLagMetric, defaultLabels), reads: wrapCounterInc(readMetric, defaultLabels), writes: wrapCounterInc(writeMetric, defaultLabels), timeElapsed: wrapHistogramObserve(timeElapsedMetric, defaultLabels), @@ -380,6 +372,8 @@ class QueueProcessor extends EventEmitter { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic: this.repConfig.replicationStatusTopic, }); producer.once('error', done); @@ -409,12 +403,13 @@ class QueueProcessor extends EventEmitter { site: this.kafkaConfig.site, backlogMetrics: options && options.enableBacklogMetrics ? this.kafkaConfig.backlogMetrics : undefined, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic, groupId, concurrency: this.repConfig.queueProcessor.concurrency, queueProcessor: queueProcessorFunc, - logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS, canary: true, circuitBreaker: this.circuitBreakerConfig, circuitBreakerMetrics: { @@ -1029,18 +1024,6 @@ class QueueProcessor extends EventEmitter { */ static async handleMetrics(res, log) { log.debug('metrics requested'); - - if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) { - // consumer stats lag is on a different update cycle so we need to - // update the metrics when requested - const lagStats = this._consumer.consumerStats.lag; - Object.keys(lagStats).forEach(partition => { - metricsHandler.lag({ - partition, - serviceName: this.serviceName, - }, lagStats[partition]); - }); - } const metrics = await ZenkoMetrics.asPrometheus(); res.writeHead(200, { 'Content-Type': ZenkoMetrics.asPrometheusContentType(), diff --git a/extensions/replication/queueProcessor/task.js b/extensions/replication/queueProcessor/task.js index 0acbe0a43..d2d451797 100644 --- a/extensions/replication/queueProcessor/task.js +++ b/extensions/replication/queueProcessor/task.js @@ -20,7 +20,7 @@ const redisConfig = config.redis; const httpsConfig = config.https; const internalHttpsConfig = config.internalHttps; const mConfig = config.metrics; -const { connectionString, autoCreateNamespace } = zkConfig; +const { connectionString, autoCreateNamespace, retries } = zkConfig; const RESUME_NODE = 'scheduledResume'; const { startProbeServer, getReplicationProbeConfig } = require('../../../lib/util/probe'); const { DEFAULT_LIVE_ROUTE, DEFAULT_METRICS_ROUTE, DEFAULT_READY_ROUTE } = @@ -288,6 +288,7 @@ function initAndStart(zkClient) { const zkClient = new ZookeeperManager(connectionString, { autoCreateNamespace, + retries, }, log); zkClient.once('error', err => { diff --git a/extensions/replication/replay/ReplayProducer.js b/extensions/replication/replay/ReplayProducer.js index bc3f728c9..221c6838b 100644 --- a/extensions/replication/replay/ReplayProducer.js +++ b/extensions/replication/replay/ReplayProducer.js @@ -28,6 +28,8 @@ class ReplayProducer { this._producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); this._producer.once('error', cb); diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index 7434e4ef6..2f35461a1 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -18,14 +18,13 @@ const FailedCRRProducer = require('../failedCRR/FailedCRRProducer'); const ReplayProducer = require('../replay/ReplayProducer'); const MetricsProducer = require('../../../lib/MetricsProducer'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); -const { replicationStatusProcessor } = require('../../../lib/constants').services; + // StatsClient constant default for site metrics const INTERVAL = 300; // 5 minutes; const constants = require('../../../lib/constants'); const { wrapCounterInc, wrapHistogramObserve, - wrapGaugeSet, } = require('../../../lib/util/metrics'); /** @@ -61,12 +60,6 @@ const loadMetricHandlers = jsutil.once(repConfig => { labelNames: ['origin', 'replicationStatus'], }); - const kafkaLagMetric = ZenkoMetrics.createGauge({ - name: 's3_replication_status_queue_lag', - help: 'Number of update entries waiting to be consumed from the Kafka topic', - labelNames: ['origin', 'containerName', 'partition', 'serviceName'], - }); - const replicationStatusDurationSeconds = ZenkoMetrics.createHistogram({ name: 's3_replication_status_process_duration_seconds', help: 'Duration of replication status processing', @@ -145,7 +138,6 @@ const loadMetricHandlers = jsutil.once(repConfig => { }; return { status: wrapCounterInc(replicationStatusMetric, defaultLabels), - lag: wrapGaugeSet(kafkaLagMetric, defaultLabels), statusDuration: wrapHistogramObserve(replicationStatusDurationSeconds, defaultLabels), replicationLatency: wrapHistogramObserve(replicationLatency, @@ -225,7 +217,6 @@ class ReplicationStatusProcessor { this._consumer = null; this._gcProducer = null; this._mProducer = null; - this.serviceName = replicationStatusProcessor; this.logger = new Logger('Backbeat:Replication:ReplicationStatusProcessor'); @@ -411,6 +402,8 @@ class ReplicationStatusProcessor { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic: this.repConfig.replicationStatusTopic, groupId: this.repConfig.replicationStatusProcessor.groupId, @@ -575,19 +568,6 @@ class ReplicationStatusProcessor { async handleMetrics(res, log) { log.debug('metrics requested'); const metrics = await ZenkoMetrics.asPrometheus(); - - if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) { - // consumer stats lag is on a different update cycle so we need to - // update the metrics when requested - const lagStats = this._consumer.consumerStats.lag; - Object.keys(lagStats).forEach(partition => { - this.metricHandlers.lag({ - partition, - serviceName: this.serviceName, - }, lagStats[partition]); - }); - } - res.writeHead(200, { 'Content-Type': ZenkoMetrics.asPrometheusContentType(), }); diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index d70be33f7..801d132af 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -19,7 +19,6 @@ const { observeKafkaStats } = require('./util/probe'); const CONCURRENCY_DEFAULT = 1; const CLIENT_ID = 'BackbeatConsumer'; const { withTopicPrefix } = require('./util/topic'); -const safeJsonParse = require('../extensions/lifecycle/util/safeJsonParse'); const UNASSIGN_STATUS = { IDLE: 'idle', @@ -92,6 +91,9 @@ class BackbeatConsumer extends EventEmitter { }, site: joi.string(), maxPollIntervalMs: joi.number().min(45000).default(300000), + // Kafka producer params + compressionType: joi.string(), + requiredAcks: joi.number(), }).required(), topic: joi.string().required(), groupId: joi.string().required(), @@ -105,19 +107,20 @@ class BackbeatConsumer extends EventEmitter { circuitBreakerMetrics: joi.object({ type: joi.string().required(), }).optional(), - logConsumerMetricsIntervalS: joi.number(), }); const validConfig = joi.attempt(config, configJoi, 'invalid config params'); const { clientId, zookeeper, kafka, topic, groupId, queueProcessor, - fromOffset, concurrency, fetchMaxBytes, logConsumerMetricsIntervalS, + fromOffset, concurrency, fetchMaxBytes, canary, bootstrap, circuitBreaker, circuitBreakerMetrics } = validConfig; this._zookeeperEndpoint = zookeeper && zookeeper.connectionString; this._kafkaHosts = kafka.hosts; this._kafkaBacklogMetricsConfig = kafka.backlogMetrics; this._maxPollIntervalMs = kafka.maxPollIntervalMs; + this._producerCompressionType = kafka.compressionType; + this._producerRequiredAcks = kafka.requiredAcks; this._site = kafka.site; this._fromOffset = fromOffset; this._log = new Logger(clientId); @@ -129,7 +132,6 @@ class BackbeatConsumer extends EventEmitter { this._canary = canary; this._bootstrap = bootstrap; this._offsetLedger = new OffsetLedger(); - this._logConsumerMetricsIntervalS = logConsumerMetricsIntervalS; this._processingQueue = null; this._messagesConsumed = 0; @@ -214,9 +216,6 @@ class BackbeatConsumer extends EventEmitter { if (this._fetchMaxBytes !== undefined) { consumerParams['fetch.message.max.bytes'] = this._fetchMaxBytes; } - if (this._logConsumerMetricsIntervalS !== undefined) { - consumerParams['statistics.interval.ms'] = this._logConsumerMetricsIntervalS * 1000; - } if (process.env.RDKAFKA_DEBUG_LOGS) { consumerParams.debug = process.env.RDKAFKA_DEBUG_LOGS; } @@ -251,40 +250,6 @@ class BackbeatConsumer extends EventEmitter { return this._consumer.once('ready', () => { this._consumerReady = true; this._checkIfReady(); - if (this._logConsumerMetricsIntervalS !== undefined) { - this._consumer.on('event.stats', res => { - const statsObj = safeJsonParse(res.message); - if (statsObj.error) { - this._log.error('error parsing consumer stats', { error: statsObj.error, topic: this._topic, - groupId: this._groupId }); - return undefined; - } - - const topicStats = statsObj.result.topics?.[this._topic]; - if (typeof topicStats !== 'object') { - return undefined; - } - const consumerStats = { - lag: {}, - }; - // Gather stats per partition consumed by this - // consumer instance - Object.keys(topicStats.partitions).forEach(partition => { - /* eslint-disable camelcase */ - const { consumer_lag, fetch_state } = - topicStats.partitions[partition]; - if (fetch_state === 'active' && consumer_lag >= 0) { - consumerStats.lag[partition] = consumer_lag; - } - /* eslint-enable camelcase */ - }); - this._log.info('topic consumer statistics', { - topic: this._topic, - consumerStats, - }); - return undefined; - }); - } }); } @@ -887,6 +852,8 @@ class BackbeatConsumer extends EventEmitter { assert.strictEqual(this._consumer, null); producer = new BackbeatProducer({ kafka: { hosts: this._kafkaHosts }, + compressionType: this._producerCompressionType, + requiredAcks: this._producerRequiredAcks, topic: this._topic, }); producer.on('ready', () => { @@ -1021,6 +988,8 @@ class BackbeatConsumer extends EventEmitter { })); const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaHosts }, + compressionType: this._producerCompressionType, + requiredAcks: this._producerRequiredAcks, topic: this._topic, }); return producer.on('ready', () => { diff --git a/lib/MetricsConsumer.js b/lib/MetricsConsumer.js index 73fe5680e..3a5aee309 100644 --- a/lib/MetricsConsumer.js +++ b/lib/MetricsConsumer.js @@ -63,6 +63,8 @@ class MetricsConsumer { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic: this.mConfig.topic, groupId: `${this.mConfig.groupIdPrefix}-${this._id}`, diff --git a/lib/MetricsProducer.js b/lib/MetricsProducer.js index 866bba7d7..1b1c24d41 100644 --- a/lib/MetricsProducer.js +++ b/lib/MetricsProducer.js @@ -24,6 +24,8 @@ class MetricsProducer { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); producer.once('error', done); diff --git a/lib/api/BackbeatAPI.js b/lib/api/BackbeatAPI.js index ecebed858..7eac1b006 100644 --- a/lib/api/BackbeatAPI.js +++ b/lib/api/BackbeatAPI.js @@ -70,7 +70,7 @@ class BackbeatAPI { // topics this._crrTopic = this._repConfig.topic; this._crrStatusTopic = this._repConfig.replicationStatusTopic; - this._ingestionTopic = this._ingestionConfig.topic; + this._ingestionTopic = this._ingestionConfig?.topic; this._metricsTopic = config.metrics.topic; this._updateConfigSites(); @@ -1291,6 +1291,8 @@ class BackbeatAPI { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic, }); @@ -1305,9 +1307,10 @@ class BackbeatAPI { } _setZookeeper(cb) { - const { connectionString, autoCreateNamespace } = this._zkConfig; + const { connectionString, autoCreateNamespace, retries } = this._zkConfig; const zkClient = new ZookeeperManager(connectionString, { autoCreateNamespace, + retries, }, this._logger); zkClient.once('error', cb); diff --git a/lib/config.joi.js b/lib/config.joi.js index 6d1975fbc..c2d1ad39f 100644 --- a/lib/config.joi.js +++ b/lib/config.joi.js @@ -14,15 +14,16 @@ const { const KAFKA_PRODUCER_MESSAGE_MAX_BYTES = 5000020; const KAFKA_PRODUCER_DEFAULT_COMPRESSION_TYPE = 'Zstd'; -const KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS = 'all'; +const KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS = -1; // all brokers const logSourcesJoi = joi.string().valid('bucketd', 'mongo', 'ingestion', 'dmd', 'kafka'); const joiSchema = joi.object({ - replicationGroupId: joi.string().length(7).required(), + replicationGroupId: joi.string().length(7).default('RG00001'), zookeeper: { connectionString: joi.string().required(), autoCreateNamespace: joi.boolean().default(false), + retries: joi.number().default(3), }, kafka: { hosts: joi.string().required(), @@ -36,7 +37,7 @@ const joiSchema = joi.object({ requiredAcks: joi.number().default(KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS), }, transport: transportJoi, - s3: hostPortJoi.required(), + s3: hostPortJoi.optional(), vaultAdmin: hostPortJoi, queuePopulator: { auth: authJoi, diff --git a/lib/config/configItems.joi.js b/lib/config/configItems.joi.js index 126ac24ee..4bc53c362 100644 --- a/lib/config/configItems.joi.js +++ b/lib/config/configItems.joi.js @@ -115,7 +115,7 @@ const probeServerJoi = joi.object({ port: joi.number().required(), }); -const probeServerPerSite = joi.array().min(1).items( +const probeServerPerSite = joi.array().items( joi.object({ bindAddress: joi.string().default('localhost'), port: joi.number().required(), diff --git a/lib/constants.js b/lib/constants.js index 909bd3500..e822e739f 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -24,7 +24,6 @@ const constants = { replicationQueueProcessor: 'ReplicationQueueProcessor', replicationReplayProcessor: 'ReplicationReplayProcessor', replicationStatusProcessor: 'ReplicationStatusProcessor', - notificationQueueProcessor: 'NotificationQueueProcessor', }, locationStatusCollection: '__locationStatusStore', lifecycleListing: { diff --git a/lib/queuePopulator/IngestionPopulator.js b/lib/queuePopulator/IngestionPopulator.js index c077ce0ef..08c95c170 100644 --- a/lib/queuePopulator/IngestionPopulator.js +++ b/lib/queuePopulator/IngestionPopulator.js @@ -171,6 +171,8 @@ class IngestionPopulator { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic, pollIntervalMs: POLL_INTERVAL_MS, }); diff --git a/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js b/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js index 675550ae9..bc5e65959 100644 --- a/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js +++ b/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js @@ -118,6 +118,10 @@ class ListRecordStream extends stream.Transform { return callback(null, null); } const objectMd = this._getObjectMd(changeStreamDocument); + if (!objectMd) { + // skipping empty events + return callback(null, null); + } const opType = this._getType(changeStreamDocument.operationType, objectMd); const streamObject = { // timestamp of the kafka message diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 26d4dfdee..1c894d77f 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -515,9 +515,6 @@ class LogReader { } } async.eachSeries(this._extensions, (ext, next) => { - if (entry.value === undefined) { - return next(); - } const overheadFields = { commitTimestamp: record.timestamp, opTimestamp: entry.timestamp, @@ -591,6 +588,8 @@ class LogReader { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic, }); producer.once('error', done); diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 540bdfca6..ed9c530cb 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -199,8 +199,8 @@ class QueuePopulator { return next(err); }), next => this._setupFailedCRRClients(next), - next => this._setupNotificationConfigManager(next), - next => this._setupZookeeper(err => { + next => this._setupZookeeper(next), + next => this._setupNotificationConfigManager(err => { if (err) { return next(err); } @@ -400,6 +400,7 @@ class QueuePopulator { this.zkClient = new ZookeeperManager(zookeeperUrl, { autoCreateNamespace: this.zkConfig.autoCreateNamespace, + retries: this.zkConfig.retries, }, this.log); this.zkClient.once('error', done); @@ -417,6 +418,10 @@ class QueuePopulator { try { this.bnConfigManager = new NotificationConfigManager({ mongoConfig: this.qpConfig.mongo, + bucketMetastore: this.extConfigs.notification.bucketMetastore, + maxCachedConfigs: this.extConfigs.notification.maxCachedConfigs, + zkClient: this.zkClient, + zkConcurrency: this.extConfigs.notification.zookeeperOpConcurrency, logger: this.log, }); return this.bnConfigManager.setup(done); diff --git a/lib/queuePopulator/QueuePopulatorExtension.js b/lib/queuePopulator/QueuePopulatorExtension.js index 7ccd7ca3d..0ef5a879f 100644 --- a/lib/queuePopulator/QueuePopulatorExtension.js +++ b/lib/queuePopulator/QueuePopulatorExtension.js @@ -36,12 +36,13 @@ class QueuePopulatorExtension { * @return {undefined} */ setupZookeeper(cb) { - const { connectionString, autoCreateNamespace } = this.zkConfig; + const { connectionString, autoCreateNamespace, retries } = this.zkConfig; this.log.info('opening zookeeper connection for populator extensions', { zookeeperUrl: connectionString, }); this.zkClient = new ZookeeperManager(connectionString, { autoCreateNamespace, + retries, }, this.log); this.zkClient.once('error', cb); this.zkClient.once('ready', () => { diff --git a/lib/util/probe.js b/lib/util/probe.js index d214db424..8157dfec5 100644 --- a/lib/util/probe.js +++ b/lib/util/probe.js @@ -79,7 +79,7 @@ function getReplicationProbeConfig(config, siteNames, topicName, logger) { } if (Array.isArray(probeConfig)) { - if (siteNames.length !== 1) { + if (siteNames.length > 1) { logger.error('Process configured for more than one site or no site provided', { siteNames, probeConfig, diff --git a/tests/config.notification.json b/tests/config.notification.json index 8891aa6ed..5fc51ab1d 100644 --- a/tests/config.notification.json +++ b/tests/config.notification.json @@ -28,6 +28,7 @@ }, "extensions": { "notification": { + "bucketMetastore": "__metastore", "topic": "backbeat-bucket-notification", "monitorNotificationFailures": true, "queueProcessor": { diff --git a/tests/functional/lib/BackbeatConsumer.js b/tests/functional/lib/BackbeatConsumer.js index 442fbe5e2..ca374d6e6 100644 --- a/tests/functional/lib/BackbeatConsumer.js +++ b/tests/functional/lib/BackbeatConsumer.js @@ -932,96 +932,3 @@ describe('BackbeatConsumer shutdown tests', () => { ], done); }).timeout(60000); }); - -describe('BackbeatConsumer statistics logging tests', () => { - const topic = 'backbeat-consumer-spec-statistics'; - const groupId = `replication-group-${Math.random()}`; - const messages = [ - { key: 'foo', message: '{"hello":"foo"}' }, - { key: 'bar', message: '{"world":"bar"}' }, - { key: 'qux', message: '{"hi":"qux"}' }, - ]; - let producer; - let consumer; - let consumedMessages = []; - function queueProcessor(message, cb) { - consumedMessages.push(message.value); - process.nextTick(cb); - } - before(function before(done) { - this.timeout(60000); - producer = new BackbeatProducer({ - kafka: producerKafkaConf, - topic, - pollIntervalMs: 100, - }); - consumer = new BackbeatConsumer({ - zookeeper: zookeeperConf, - kafka: consumerKafkaConf, groupId, topic, - queueProcessor, - concurrency: 10, - bootstrap: true, - // this enables statistics logging - logConsumerMetricsIntervalS: 1, - }); - async.parallel([ - innerDone => producer.on('ready', innerDone), - innerDone => consumer.on('ready', innerDone), - ], done); - }); - afterEach(() => { - consumedMessages = []; - consumer.removeAllListeners('consumed'); - }); - after(done => { - async.parallel([ - innerDone => producer.close(innerDone), - innerDone => consumer.close(innerDone), - ], done); - }); - it('should be able to log consumer statistics', done => { - producer.send(messages, err => { - assert.ifError(err); - }); - let totalConsumed = 0; - // It would have been nice to check that the lag is strictly - // positive when we haven't consumed yet, but the lag seems - // off when no consumer offset has been written yet to Kafka, - // so it cannot be tested reliably until we start consuming. - consumer.subscribe(); - consumer.on('consumed', messagesConsumed => { - totalConsumed += messagesConsumed; - assert(totalConsumed <= messages.length); - if (totalConsumed === messages.length) { - let firstTime = true; - setTimeout(() => { - consumer._log = { - error: () => {}, - warn: () => {}, - info: (message, args) => { - if (firstTime && message.indexOf('statistics') !== -1) { - firstTime = false; - assert.strictEqual(args.topic, topic); - const consumerStats = args.consumerStats; - assert.strictEqual(typeof consumerStats, 'object'); - const lagStats = consumerStats.lag; - assert.strictEqual(typeof lagStats, 'object'); - // there should be one consumed partition - assert.strictEqual(Object.keys(lagStats).length, 1); - // everything should have been - // consumed hence consumer offsets - // stored equal topic offset, and lag - // should be 0. - const partitionLag = lagStats['0']; - assert.strictEqual(partitionLag, 0); - done(); - } - }, - debug: () => {}, - trace: () => {}, - }; - }, 5000); - } - }); - }).timeout(30000); -}); diff --git a/tests/functional/notification/NotificationConfigManager.js b/tests/functional/notification/NotificationConfigManager.js index ed3d2f224..a663b2c32 100644 --- a/tests/functional/notification/NotificationConfigManager.js +++ b/tests/functional/notification/NotificationConfigManager.js @@ -34,6 +34,8 @@ const notificationConfigurationVariant = { describe('NotificationConfigManager ::', () => { const params = { mongoConfig, + bucketMetastore: '__metastore', + maxCachedConfigs: 1000, logger, }; let manager; @@ -71,28 +73,33 @@ describe('NotificationConfigManager ::', () => { it('getConfig should return correct value', async () => { // should store new configs in cache const config = await manager.getConfig('example-bucket-1'); - assert.deepEqual(config, notificationConfiguration); + assert.deepEqual(config, { + bucket: 'example-bucket-1', + notificationConfiguration, + }); }); it('Cache should store new configs', async () => { + const backend = manager._configManagerBackend; // cache should initially be empty - assert.strictEqual(manager._cachedConfigs.count(), 0); + assert.strictEqual(backend._cachedConfigs.count(), 0); // should store new configs in cache await manager.getConfig('example-bucket-1'); - assert.strictEqual(manager._cachedConfigs.count(), 1); - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), + assert.strictEqual(backend._cachedConfigs.count(), 1); + assert.deepEqual(backend._cachedConfigs.get('example-bucket-1'), notificationConfiguration); await manager.getConfig('example-bucket-2'); - assert.deepEqual(manager._cachedConfigs.get('example-bucket-2'), + assert.deepEqual(backend._cachedConfigs.get('example-bucket-2'), notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 2); + assert.strictEqual(backend._cachedConfigs.count(), 2); // should retreive config fom cache without re-adding it await manager.getConfig('example-bucket-1'); - assert.strictEqual(manager._cachedConfigs.count(), 2); + assert.strictEqual(backend._cachedConfigs.count(), 2); }); it('Cache should be invalidated when change stream event occurs (delete event)', async () => { + const backend = manager._configManagerBackend; // adding configs to cache await manager.getConfig('example-bucket-1'); await manager.getConfig('example-bucket-2'); @@ -104,15 +111,16 @@ describe('NotificationConfigManager ::', () => { _id: 'example-bucket-1', }, }; - manager._metastoreChangeStream._changeStream.emit('change', changeStreamEvent); + backend._metastoreChangeStream._changeStream.emit('change', changeStreamEvent); // cached config for "example-bucket-1" should be invalidated - assert.strictEqual(manager._cachedConfigs.count(), 1); - assert.strictEqual(manager._cachedConfigs.get('example-bucket-1'), undefined); - assert(manager._cachedConfigs.get('example-bucket-2')); + assert.strictEqual(backend._cachedConfigs.count(), 1); + assert.strictEqual(backend._cachedConfigs.get('example-bucket-1'), undefined); + assert(backend._cachedConfigs.get('example-bucket-2')); }); it('Cache should be invalidated when change stream event occurs (replace/update events)', async () => { + const backend = manager._configManagerBackend; // adding configs to cache await manager.getConfig('example-bucket-1'); await manager.getConfig('example-bucket-2'); @@ -130,17 +138,17 @@ describe('NotificationConfigManager ::', () => { } } }; - manager._metastoreChangeStream._changeStream.emit('change', changeStreamEvent); + backend._metastoreChangeStream._changeStream.emit('change', changeStreamEvent); // cached config for "example-bucket-1" should be invalidated - assert.strictEqual(manager._cachedConfigs.count(), 2); - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), + assert.strictEqual(backend._cachedConfigs.count(), 2); + assert.deepEqual(backend._cachedConfigs.get('example-bucket-1'), notificationConfigurationVariant); // update event should yield the same results changeStreamEvent.operationType = 'update'; changeStreamEvent.fullDocument._id = 'example-bucket-2'; - manager._metastoreChangeStream._changeStream.emit('change', changeStreamEvent); - assert.strictEqual(manager._cachedConfigs.count(), 2); - assert.deepEqual(manager._cachedConfigs.get('example-bucket-2'), + backend._metastoreChangeStream._changeStream.emit('change', changeStreamEvent); + assert.strictEqual(backend._cachedConfigs.count(), 2); + assert.deepEqual(backend._cachedConfigs.get('example-bucket-2'), notificationConfiguration); }); }); diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js index 766899072..5cebb2042 100644 --- a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js @@ -111,7 +111,7 @@ describe('ListRecordStream', () => { }); describe('_transform', () => { - it('should correct format entry', done => { + it('should return correct format entry', done => { const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); listRecordStream.write(kafkaMessage); listRecordStream.once('data', data => { @@ -130,9 +130,10 @@ describe('ListRecordStream', () => { return done(); }); }); + it('should skip record if format is invalid', done => { const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); - const InvalidKafkaMessage = getKafkaMessage(''); + const InvalidKafkaMessage = getKafkaMessage('}{'); listRecordStream.write(InvalidKafkaMessage); listRecordStream.write(kafkaMessage); listRecordStream.once('data', data => { @@ -157,6 +158,34 @@ describe('ListRecordStream', () => { return done(); }); }); + + it('should skip empty record', done => { + const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); + const EmptyKafkaMessage = getKafkaMessage('{}'); + listRecordStream.write(EmptyKafkaMessage); + listRecordStream.write(kafkaMessage); + listRecordStream.once('data', data => { + // Streams guarantee that data is kept in the + // same order when writing and reading it. + // This means that if the function doesn't work + // as intended and processed the invalid + // event it should be read in first by this event + // handler which'll fail the test + assert.deepEqual(data, { + timestamp: new Date(kafkaMessage.timestamp), + db: 'example-bucket', + entries: [{ + key: 'example-key', + type: 'put', + value: JSON.stringify({ + field: 'value' + }), + timestamp: '2023-11-29T15:05:57.000Z', + }], + }); + return done(); + }); + }); }); describe('_getObjectMd', () => { diff --git a/tests/unit/lifecycle/LifecycleQueuePopulator.spec.js b/tests/unit/lifecycle/LifecycleQueuePopulator.spec.js index 1ab8844fa..f1a88c23f 100644 --- a/tests/unit/lifecycle/LifecycleQueuePopulator.spec.js +++ b/tests/unit/lifecycle/LifecycleQueuePopulator.spec.js @@ -383,7 +383,10 @@ describe('LifecycleQueuePopulator', () => { lcqp = new LifecycleQueuePopulator(params); lcqp.locationConfigs = Object.assign({}, coldLocationConfigs, locationConfigs); }); - it('it should call _handleDeleteOp on delete message', () => { + afterEach(() => { + sinon.restore(); + }); + it('should call _handleDeleteOp on delete message', () => { const handleDeleteStub = sinon.stub(lcqp, '_handleDeleteOp').returns(); lcqp.filter({ type: 'delete', diff --git a/tests/unit/notification/KafkaNotificationDestination.js b/tests/unit/notification/KafkaNotificationDestination.js new file mode 100644 index 000000000..e7fccc025 --- /dev/null +++ b/tests/unit/notification/KafkaNotificationDestination.js @@ -0,0 +1,40 @@ +const assert = require('assert'); +const sinon = require('sinon'); + +const FakeLogger = require('../../utils/fakeLogger'); + +const KafkaNotificationDestination = + require('../../../extensions/notification/destination/KafkaNotificationDestination'); +const KafkaProducer = + require('../../../extensions/notification/destination/KafkaProducer'); + +describe('KafkaNotificationDestination ::', () => { + afterEach(() => { + sinon.restore(); + }); + it('should properly configure producer', done => { + const destConfig = { + host: 'localhost', + port: 9092, + topic: 'test', + pollIntervalMs: 1000, + requiredAcks: 1, + compressionType: 'none', + }; + + sinon.stub(KafkaProducer.prototype, 'connect').callsFake(function connect() { + setTimeout(() => this.emit('ready'), 100); + }); + + const kafkaNotificationDestination = new KafkaNotificationDestination({ destConfig, logger: FakeLogger }); + kafkaNotificationDestination._setupProducer(err => { + assert.ifError(err); + assert.strictEqual(kafkaNotificationDestination._notificationProducer._kafkaHosts, 'localhost:9092'); + assert.strictEqual(kafkaNotificationDestination._notificationProducer._pollIntervalMs, 1000); + assert.strictEqual(kafkaNotificationDestination._notificationProducer._topic, 'test'); + assert.strictEqual(kafkaNotificationDestination._notificationProducer._compressionType, 'none'); + assert.strictEqual(kafkaNotificationDestination._notificationProducer._requiredAcks, 1); + done(); + }); + }); +}); diff --git a/tests/unit/notification/NotificationConfigManager.js b/tests/unit/notification/NotificationConfigManager.js index 51eb145a1..db1787ca2 100644 --- a/tests/unit/notification/NotificationConfigManager.js +++ b/tests/unit/notification/NotificationConfigManager.js @@ -1,355 +1,187 @@ const assert = require('assert'); const werelogs = require('werelogs'); const sinon = require('sinon'); -const events = require('events'); -const MongoClient = require('mongodb').MongoClient; -const ChangeStream = require('../../../lib/wrappers/ChangeStream'); -const NotificationConfigManager - = require('../../../extensions/notification/NotificationConfigManager'); -const { errors } = require('arsenal'); -const mongoConfig - = require('../../config.json').queuePopulator.mongo; +const NotificationConfigManager = require('../../../extensions/notification/NotificationConfigManager'); +const MongoConfigManager = require('../../../extensions/notification/configManager/MongoConfigManager'); +const ZookeeperConfigManager = require('../../../extensions/notification/configManager/ZookeeperConfigManager'); const logger = new werelogs.Logger('NotificationConfigManager:test'); -const notificationConfiguration = { - queueConfig: [ - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - ], +const bucketConfig = { + bucket: 'bucket1', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + ], + }, }; -const notificationConfigurationVariant = { - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destination2', - filterRules: [], - }, - ], -}; - -describe('NotificationConfigManager ::', () => { - const params = { - mongoConfig, - logger, - }; - - afterEach(() => { - sinon.restore(); - }); +describe('NotificationConfigManager', () => { + describe('constructor', () => { + it('should use the mongodb backend', () => { + const params = { + mongoConfig: { + database: 'eb1e786d-da1e-3fc5-83d2-46f083ab9764', + readPreference: 'primary', + replicaSetHosts: 'localhost:27017', + shardCollections: true, + writeConcern: 'majority' + }, + bucketMetastore: '__metastore', + maxCachedConfigs: 1000, + logger, + }; - describe('Constructor & setup ::', () => { - it('Constructor should validate params', done => { - assert.throws(() => new NotificationConfigManager()); - assert.throws(() => new NotificationConfigManager({})); - assert.throws(() => new NotificationConfigManager({ - mongoConfig: null, - logger: null, - })); const manager = new NotificationConfigManager(params); - assert(manager instanceof NotificationConfigManager); - return done(); + assert(manager._configManagerBackend instanceof MongoConfigManager); }); - it('Setup should initialize the mongoClient and the change stream', done => { - const manager = new NotificationConfigManager(params); - const setMongoStub = sinon.stub(manager, '_setupMongoClient').callsArg(0); - const setChangeStreamStub = sinon.stub(manager, '_setMetastoreChangeStream').returns(); - manager.setup(err => { - assert.ifError(err); - assert(setMongoStub.calledOnce); - assert(setChangeStreamStub.calledOnce); - // cache should initially be empty - assert.strictEqual(manager._cachedConfigs.count(), 0); - return done(); - }); - }); - - it('Setup should fail when mongo setup fails', done => { - const manager = new NotificationConfigManager(params); - const setMongoStub = sinon.stub(manager, '_setupMongoClient').callsArgWith(0, - errors.InternalError); - const setChangeStreamStub = sinon.stub(manager, '_setMetastoreChangeStream'); - manager.setup(err => { - assert.deepEqual(err, errors.InternalError); - assert(setMongoStub.calledOnce); - assert(setChangeStreamStub.notCalled); - return done(); - }); - }); + it('should use the zookeeper backend', () => { + const params = { + zkClient: {}, + zkConcurrency: 10, + logger, + }; - it('Setup should fail when changeStream setup fails', done => { const manager = new NotificationConfigManager(params); - const setMongoStub = sinon.stub(manager, '_setupMongoClient').callsArg(0); - const setChangeStreamStub = sinon.stub(manager, '_setMetastoreChangeStream').throws( - errors.InternalError); - manager.setup(err => { - assert.deepEqual(err, errors.InternalError); - assert(setMongoStub.calledOnce); - assert(setChangeStreamStub.calledOnce); - return done(); - }); + assert(manager._configManagerBackend instanceof ZookeeperConfigManager); + assert.strictEqual(manager._usesZookeeperBackend, true); }); }); - describe('_setupMongoClient ::', () => { - it('should setup the mongo client and get metastore collection', () => { + describe('getConfig', () => { + it('should return the configuration', () => { + const params = { + zkClient: {}, + zkConcurrency: 10, + logger, + }; const manager = new NotificationConfigManager(params); - const getCollectionStub = sinon.stub(); - const mongoCommandStub = sinon.stub().returns({ - version: '4.3.17', - }); - const getDbStub = sinon.stub().returns({ - collection: getCollectionStub, - command: mongoCommandStub, - }); - const mongoConnectStub = sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { - db: getDbStub, - }); - manager._setupMongoClient(err => { - assert.ifError(err); - assert(mongoConnectStub.calledOnce); - assert(getDbStub.calledOnce); - assert(getCollectionStub.calledOnce); - assert(mongoCommandStub.calledOnceWith({ - buildInfo: 1, - })); - assert.equal(manager._mongoVersion, '4.3.17'); - }); - }); - it('should fail when mongo client setup fails', () => { - const manager = new NotificationConfigManager(params); - sinon.stub(MongoClient, 'connect').callsArgWith(2, - errors.InternalError, null); - manager._setupMongoClient(err => { - assert.deepEqual(err, errors.InternalError); - }); - }); + sinon.stub(manager._configManagerBackend, 'getConfig').returns(bucketConfig); - it('should fail when when getting the metadata db', () => { - const manager = new NotificationConfigManager(params); - const getDbStub = sinon.stub().throws(errors.InternalError); - sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { - db: getDbStub, - }); - manager._setupMongoClient(err => { - assert.deepEqual(err, errors.InternalError); - }); + const result = manager.getConfig('bucket1'); + assert.strictEqual(result, bucketConfig); }); - it('should fail when mongo client fails to get metastore', () => { + it('should return the configuration with a callback', done => { + const params = { + zkClient: {}, + zkConcurrency: 10, + logger, + }; const manager = new NotificationConfigManager(params); - const getCollectionStub = sinon.stub().throws(errors.InternalError); - const getDbStub = sinon.stub().returns({ - collection: getCollectionStub, - }); - sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { - db: getDbStub, - }); - manager._setupMongoClient(err => { - assert.deepEqual(err, errors.InternalError); + + sinon.stub(manager._configManagerBackend, 'getConfig').returns(bucketConfig); + + manager.getConfig('bucket1', (err, result) => { + assert.ifError(err); + assert.strictEqual(result, bucketConfig); + done(); }); }); - }); - describe('_handleChangeStreamChangeEvent ::', () => { - it('should remove entry from cache', () => { - const changeStreamEvent = { - _id: 'resumeToken', - operationType: 'delete', - documentKey: { - _id: 'example-bucket-1', - }, - fullDocument: { - _id: 'example-bucket-1', - value: { - notificationConfiguration, - } - } + it('should return the configuration in a promise', done => { + const params = { + zkClient: {}, + zkConcurrency: 10, + logger, }; const manager = new NotificationConfigManager(params); - // populating cache - manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); - // handling change stream event - manager._handleChangeStreamChangeEvent(changeStreamEvent); - // should delete bucket config from cache - assert.strictEqual(manager._cachedConfigs.get('example-bucket-1'), - undefined); - assert.strictEqual(manager._cachedConfigs.count(), 0); - }); - it('should replace entry from cache', () => { - const changeStreamEvent = { - _id: 'resumeToken', - operationType: 'replace', - documentKey: { - _id: 'example-bucket-1', - }, - fullDocument: { - _id: 'example-bucket-1', - value: { - notificationConfiguration: - notificationConfigurationVariant, - } - } - }; - const manager = new NotificationConfigManager(params); - // populating cache - manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); - // handling change stream event - manager._handleChangeStreamChangeEvent(changeStreamEvent); - // should update bucket config in cache - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), - notificationConfigurationVariant); - assert.strictEqual(manager._cachedConfigs.count(), 1); - // same thing should happen with "update" event - changeStreamEvent.operationType = 'update'; - // reseting config to default one - changeStreamEvent.fullDocument.value.notificationConfiguration = - notificationConfiguration; - // emiting the new "update" event - manager._handleChangeStreamChangeEvent(changeStreamEvent); - // cached config must be updated - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), - notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); + sinon.stub(manager._configManagerBackend, 'getConfig').resolves(bucketConfig); + + manager.getConfig('bucket1') + .then(result => { + assert.strictEqual(result, bucketConfig); + done(); + }) + .catch(err => assert.ifError(err)); }); - it('should do nothing when config not in cache', () => { - const changeStreamEvent = { - _id: 'resumeToken', - operationType: 'delete', - documentKey: { - _id: 'example-bucket-2', - }, - fullDocument: { - _id: 'example-bucket-2', - value: { - notificationConfiguration: - notificationConfigurationVariant, - } - } + it('should call callback when the promise resolves', done => { + const params = { + zkClient: {}, + zkConcurrency: 10, + logger, }; const manager = new NotificationConfigManager(params); - // populating cache - manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); - // handling change stream event - manager._handleChangeStreamChangeEvent(changeStreamEvent); - // cache should not change - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), - notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); + + sinon.stub(manager._configManagerBackend, 'getConfig').resolves(bucketConfig); + + manager.getConfig('bucket1', (err, result) => { + assert.ifError(err); + assert.strictEqual(result, bucketConfig); + done(); + }); }); + }); - it('should do nothing when operation is not supported', () => { - const changeStreamEvent = { - _id: 'resumeToken', - operationType: 'insert', - documentKey: { - _id: 'example-bucket-1', + describe('setConfig', () => { + it('should call setConfig of the backend', () => { + const params = { + mongoConfig: { + database: 'eb1e786d-da1e-3fc5-83d2-46f083ab9764', + readPreference: 'primary', + replicaSetHosts: 'localhost:27017', + shardCollections: true, + writeConcern: 'majority' }, - fullDocument: { - _id: 'example-bucket-2', - value: { - notificationConfiguration: - notificationConfigurationVariant, - } - } + bucketMetastore: '__metastore', + maxCachedConfigs: 1000, + logger, }; - const manager = new NotificationConfigManager(params); - // populating cache - manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); - assert(manager._cachedConfigs.get('example-bucket-1')); - // handling change stream event - manager._handleChangeStreamChangeEvent(changeStreamEvent); - // cache should not change - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), - notificationConfiguration); - assert.strictEqual(manager._cachedConfigs.count(), 1); - }); - }); - describe('_setMetastoreChangeStream ::', () => { - it('should use resumeAfter with mongo 3.6', () => { const manager = new NotificationConfigManager(params); - manager._mongoVersion = '3.6.2'; - manager._metastore = { - watch: sinon.stub().returns(new events.EventEmitter()), - }; - manager._setMetastoreChangeStream(); - assert(manager._metastoreChangeStream instanceof ChangeStream); - assert.equal(manager._metastoreChangeStream._resumeField, 'resumeAfter'); + manager._configManagerBackend.setConfig = sinon.stub().returns(false); + manager.setConfig('bucket1', bucketConfig); + assert(manager._configManagerBackend.setConfig.calledOnce); }); + }); - it('should use resumeAfter with mongo 4.0', () => { - const manager = new NotificationConfigManager(params); - manager._mongoVersion = '4.0.7'; - manager._metastore = { - watch: sinon.stub().returns(new events.EventEmitter()), + describe('removeConfig', () => { + it('should call removeConfig of the backend', () => { + const params = { + mongoConfig: { + database: 'eb1e786d-da1e-3fc5-83d2-46f083ab9764', + readPreference: 'primary', + replicaSetHosts: 'localhost:27017', + shardCollections: true, + writeConcern: 'majority' + }, + bucketMetastore: '__metastore', + maxCachedConfigs: 1000, + logger, }; - manager._setMetastoreChangeStream(); - assert(manager._metastoreChangeStream instanceof ChangeStream); - assert.equal(manager._metastoreChangeStream._resumeField, 'resumeAfter'); - }); - it('should use startAfter with mongo 4.2', () => { const manager = new NotificationConfigManager(params); - manager._mongoVersion = '4.2.3'; - manager._metastore = { - watch: sinon.stub().returns(new events.EventEmitter()), - }; - manager._setMetastoreChangeStream(); - assert(manager._metastoreChangeStream instanceof ChangeStream); - assert.equal(manager._metastoreChangeStream._resumeField, 'startAfter'); + manager._configManagerBackend.removeConfig = sinon.stub().returns(false); + manager.removeConfig('bucket1'); + assert(manager._configManagerBackend.removeConfig.calledOnce); }); }); - describe('getConfig ::', () => { - it('should return notification configuration of bucket', async () => { - const manager = new NotificationConfigManager(params); - manager._metastore = { - findOne: () => ( - { - value: { - notificationConfiguration, - } - }), + describe('setup', () => { + it('should call the setup method of the backend', done => { + const params = { + zkClient: {}, + zkConcurrency: 10, + logger, }; - assert.strictEqual(manager._cachedConfigs.count(), 0); - const config = await manager.getConfig('example-bucket-1'); - assert.deepEqual(config, notificationConfiguration); - // should also cache config - assert.strictEqual(manager._cachedConfigs.count(), 1); - assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), - notificationConfiguration); - }); - it('should return undefined when bucket doesn\'t have notification configuration', async () => { const manager = new NotificationConfigManager(params); - manager._metastore = { - findOne: () => ({ value: {} }), - }; - const config = await manager.getConfig('example-bucket-1'); - assert.strictEqual(config, undefined); - }); + const stub = sinon.stub(manager._configManagerBackend, 'setup').yields(); - it('should return undefined when mongo findOne fails', async () => { - const manager = new NotificationConfigManager(params); - manager._metastore = { - findOne: sinon.stub().throws(errors.InternalError), - }; - const config = await manager.getConfig('example-bucket-1'); - assert.strictEqual(config, undefined); + manager.setup(err => { + assert.ifError(err); + assert(stub.calledOnce); + done(); + }); }); }); }); diff --git a/tests/unit/notification/NotificationConfigValidator.js b/tests/unit/notification/NotificationConfigValidator.js new file mode 100644 index 000000000..be576a196 --- /dev/null +++ b/tests/unit/notification/NotificationConfigValidator.js @@ -0,0 +1,81 @@ +const assert = require('assert'); + +const configValidator = require('../../../extensions/notification/NotificationConfigValidator'); + +describe('NotificationConfigValidator ::', () => { + it('should throw an error if requiredAcks is specified for a non-kafka destination', () => { + const extConfig = { + topic: 'topic', + monitorNotificationFailures: true, + notificationFailedTopic: 'failed-topic', + queueProcessor: { + groupId: 'groupId', + concurrency: 1000, + }, + destinations: [{ + resource: 'resource', + type: 'other', + host: 'host', + port: 8000, + topic: 'topic', + requiredAcks: 1, + }], + probeServer: { + bindAddress: 'localhost', + port: 8000, + }, + }; + assert.throws(() => configValidator(null, extConfig)); + }); + + it('should throw an error if compressionType is specified for a non-kafka destination', () => { + const extConfig = { + topic: 'topic', + monitorNotificationFailures: true, + notificationFailedTopic: 'failed-topic', + queueProcessor: { + groupId: 'groupId', + concurrency: 1000, + }, + destinations: [{ + resource: 'resource', + type: 'other', + host: 'host', + port: 8000, + topic: 'topic', + compressionType: 'none', + }], + probeServer: { + bindAddress: 'localhost', + port: 8000, + }, + }; + assert.throws(() => configValidator(null, extConfig)); + }); + + it('should not throw an error if requiredAcks and compressionType is specified for a kafka destination', () => { + const extConfig = { + topic: 'topic', + monitorNotificationFailures: true, + notificationFailedTopic: 'failed-topic', + queueProcessor: { + groupId: 'groupId', + concurrency: 1000, + }, + destinations: [{ + resource: 'resource', + type: 'kafka', + host: 'host', + port: 8000, + topic: 'topic', + requiredAcks: 1, + compressionType: 'none', + }], + probeServer: { + bindAddress: 'localhost', + port: 8000, + }, + }; + assert.doesNotThrow(() => configValidator(null, extConfig)); + }); +}); diff --git a/tests/unit/notification/NotificationQueuePopulator.js b/tests/unit/notification/NotificationQueuePopulator.js index 3db51f70e..6805e40e3 100644 --- a/tests/unit/notification/NotificationQueuePopulator.js +++ b/tests/unit/notification/NotificationQueuePopulator.js @@ -13,19 +13,22 @@ const notificationConfig const logger = new werelogs.Logger('NotificationConfigManager:test'); -const notificationConfiguration = { - queueConfig: [ - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - { - events: ['s3:ObjectRemoved:Delete'], - queueArn: 'arn:scality:bucketnotif:::destination2', - filterRules: [], - }, - ], +const config = { + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + { + events: ['s3:ObjectRemoved:Delete'], + queueArn: 'arn:scality:bucketnotif:::destination2', + filterRules: [], + }, + ], + }, }; describe('NotificationQueuePopulator ::', () => { @@ -35,6 +38,8 @@ describe('NotificationQueuePopulator ::', () => { beforeEach(() => { bnConfigManager = new NotificationConfigManager({ mongoConfig, + bucketMetastore: '__metastore', + maxCachedConfigs: 1000, logger, }); notificationQueuePopulator = new NotificationQueuePopulator({ @@ -64,7 +69,7 @@ describe('NotificationQueuePopulator ::', () => { describe('_processObjectEntry ::', () => { it('should publish object entry in notification topic of destination1', async () => { - sinon.stub(bnConfigManager, 'getConfig').returns(notificationConfiguration); + sinon.stub(bnConfigManager, 'getConfig').returns(config); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( 'example-bucket', @@ -80,7 +85,7 @@ describe('NotificationQueuePopulator ::', () => { }); it('should publish object entry in notification topic of destination2', async () => { - sinon.stub(bnConfigManager, 'getConfig').returns(notificationConfiguration); + sinon.stub(bnConfigManager, 'getConfig').returns(config); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( 'example-bucket', @@ -97,7 +102,7 @@ describe('NotificationQueuePopulator ::', () => { it('should not publish object entry in notification topic if ' + 'config validation failed', async () => { - sinon.stub(bnConfigManager, 'getConfig').returns(notificationConfiguration); + sinon.stub(bnConfigManager, 'getConfig').returns(config); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( 'example-bucket', @@ -124,18 +129,21 @@ describe('NotificationQueuePopulator ::', () => { })), }; sinon.stub(bnConfigManager, 'getConfig').returns({ - queueConfig: [ - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination2', - filterRules: [], - } - ], + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination2', + filterRules: [], + } + ], + }, }); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( @@ -163,18 +171,21 @@ describe('NotificationQueuePopulator ::', () => { })), }; sinon.stub(bnConfigManager, 'getConfig').returns({ - queueConfig: [ - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination2', - filterRules: [], - } - ], + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination2', + filterRules: [], + } + ], + }, }); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( @@ -194,28 +205,31 @@ describe('NotificationQueuePopulator ::', () => { it('should publish object entry to each entry\'s destination topic when multiple ' + 'destinations are valid for an event', async () => { sinon.stub(bnConfigManager, 'getConfig').returns({ - queueConfig: [ - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination2', - filterRules: [], - }, - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination3', - filterRules: [], - }, - { - events: ['s3:ObjectCreated:Put'], - queueArn: 'arn:scality:bucketnotif:::destination4', - filterRules: [], - }, - ], + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination2', + filterRules: [], + }, + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination3', + filterRules: [], + }, + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination4', + filterRules: [], + }, + ], + }, }); // override the destinations' config to add two new destinations that use // the default shared internal topic @@ -253,13 +267,16 @@ describe('NotificationQueuePopulator ::', () => { it('should not publish object entry in notification topic if ' + 'notification is non standard', async () => { sinon.stub(bnConfigManager, 'getConfig').returns({ - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - ], + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + ], + }, }); const publishStub = sinon.stub(notificationQueuePopulator, 'publish'); await notificationQueuePopulator._processObjectEntry( @@ -274,6 +291,36 @@ describe('NotificationQueuePopulator ::', () => { }); assert(publishStub.notCalled); }); + + it('should use proper fields or S3C delete notification', async () => { + sinon.stub(bnConfigManager, 'getConfig').returns(config); + const publishStub = sinon.stub(notificationQueuePopulator, 'publish').returns(); + const timestamp = new Date().toISOString(); + await notificationQueuePopulator._processObjectEntry( + 'example-bucket', + 'example-key\x0098500086134471999999RG001 0', + {}, + 'del', + { + versionId: '123456', + commitTimestamp: timestamp, + } + ); + const expectedMessage = { + bucket: 'example-bucket', + key: 'example-key', + versionId: '123456', + dateTime: timestamp, + eventType: 's3:ObjectRemoved:Delete', + region: null, + schemaVersion: null, + size: null, + }; + assert(publishStub.calledOnce); + assert.strictEqual(publishStub.getCall(0).args.at(0), 'internal-notification-topic-destination2'); + assert.strictEqual(publishStub.getCall(0).args.at(1), 'example-bucket/example-key'); + assert.deepEqual(JSON.parse(publishStub.getCall(0).args.at(2)), expectedMessage); + }); }); describe('filterAsync ::', () => { @@ -295,25 +342,124 @@ describe('NotificationQueuePopulator ::', () => { }); }); - it('should fail if entry is a bucket entry', done => { - const processEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry'); + it('should ignore bucket operations', done => { + const processObjectEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry'); + const processBucketEntryStub = sinon.stub(notificationQueuePopulator, '_processBucketEntry'); + const entry = { + bucket: 'users..bucket', + key: 'example-key', + type: 'put', + value: '{}', + overheadFields: { + opTimestamp: new Date().toISOString(), + }, + }; + notificationQueuePopulator.filterAsync(entry, err => { + assert.ifError(err); + assert(processObjectEntryStub.notCalled); + assert(processBucketEntryStub.notCalled); + return done(); + }); + }); + + it('should ignore mpu bucket operations', done => { + const processObjectEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry'); + const processBucketEntryStub = sinon.stub(notificationQueuePopulator, '_processBucketEntry'); const entry = { bucket: '__metastore', - key: 'example-bucket', + key: 'mpuShadowBucketexample-bucket', type: 'put', value: '{}', overheadFields: { opTimestamp: new Date().toISOString(), }, }; + notificationQueuePopulator.filterAsync(entry, err => { + assert.ifError(err); + assert(processObjectEntryStub.notCalled); + assert(processBucketEntryStub.notCalled); + return done(); + }); + }); + + it('should updated config when a bucket entry contains notification configuration', done => { + const processEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry').yields(); + const setConfigStub = sinon.stub(notificationQueuePopulator.bnConfigManager, 'setConfig').returns(true); + const entry = { + bucket: '__metastore', + key: 'example-bucket', + type: 'put', + value: '{"attributes":"{\\"name\\":\\"example-bucket\\",\\"notificationConfiguration\\":' + + '{\\"queueConfig\\":[{\\"events\\":[\\"s3:ObjectCreated:*\\"],\\"queueArn\\":' + + '\\"arn:scality:bucketnotif:::notification-target\\"}]}}"}', + overheadFields: { + opTimestamp: new Date().toISOString(), + }, + }; + notificationQueuePopulator.filterAsync(entry, err => { + assert.ifError(err); + assert(processEntryStub.notCalled); + assert(setConfigStub.calledWithMatch( + 'example-bucket', + { + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::notification-target', + }, + ], + }, + }, + )); + return done(); + }); + }); + + it('remove config when bucket no longer has notification configured', done => { + const processEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry').yields(); + const removeConfigStub = sinon.stub(notificationQueuePopulator.bnConfigManager, 'removeConfig') + .returns(true); + const entry = { + bucket: '__metastore', + key: 'example-bucket', + type: 'put', + value: '{"attributes":"{\\"name\\":\\"example-bucket\\"}"}', + overheadFields: { + opTimestamp: new Date().toISOString(), + }, + }; notificationQueuePopulator.filterAsync(entry, err => { assert.ifError(err); assert(processEntryStub.notCalled); + assert(removeConfigStub.calledWithMatch('example-bucket')); return done(); }); }); - it('should process the entry', done => { + it('should remove config whe bucket is deleted', done => { + const processEntryStub = sinon.stub(notificationQueuePopulator, '_processObjectEntry').yields(); + const removeConfigStub = sinon.stub(notificationQueuePopulator.bnConfigManager, 'removeConfig') + .returns(true); + const entry = { + bucket: '__metastore', + key: 'example-bucket', + type: 'del', + value: undefined, + overheadFields: { + opTimestamp: new Date().toISOString(), + }, + }; + notificationQueuePopulator.filterAsync(entry, err => { + assert.ifError(err); + assert(processEntryStub.notCalled); + assert(removeConfigStub.calledWithMatch('example-bucket')); + return done(); + }); + }); + + it('should process an object entry', done => { const processEntryCbStub = sinon.stub(notificationQueuePopulator, '_processObjectEntryCb') .yields(); const entry = { @@ -338,22 +484,31 @@ describe('NotificationQueuePopulator ::', () => { { desc: 'non versioned', input: {}, + overhead: null, out: null }, { desc: 'versioned', input: { versionId: '1234' }, + overhead: null, + out: '1234' + }, + { + desc: 'versioned (S3C delete case)', + input: {}, + overhead: { versionId: '1234' }, out: '1234' }, { desc: 'a null version', input: { isNull: true, versionId: '1234' }, + overhead: null, out: null }, ].forEach(tests => { - const { desc, input, out } = tests; + const { desc, input, overhead, out } = tests; it(`Should return ${out} when object is ${desc}`, () => { - const versionId = notificationQueuePopulator._getVersionId(input); + const versionId = notificationQueuePopulator._getVersionId(input, overhead); assert.strictEqual(versionId, out); }); }); @@ -418,32 +573,37 @@ describe('NotificationQueuePopulator with multiple rules ::', () => { beforeEach(() => { bnConfigManager = new NotificationConfigManager({ mongoConfig, + bucketMetastore: '__metastore', + maxCachedConfigs: 1000, logger, }); sinon.stub(bnConfigManager, 'getConfig').returns({ - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destination1', - id: '0', - filterRules: [ - { - name: 'Prefix', - value: 'toto/', - }, - ], - }, { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destination1', - id: '1', - filterRules: [ - { - name: 'Prefix', - value: 'tata/', - }, - ], - }, - ], + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destination1', + id: '0', + filterRules: [ + { + name: 'Prefix', + value: 'toto/', + }, + ], + }, { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destination1', + id: '1', + filterRules: [ + { + name: 'Prefix', + value: 'tata/', + }, + ], + }, + ], + }, }); notificationQueuePopulator = new NotificationQueuePopulator({ config: notificationConfig, diff --git a/tests/unit/notification/NotificationQueueProcessor.js b/tests/unit/notification/NotificationQueueProcessor.js index 8a910c86b..32b8170e5 100644 --- a/tests/unit/notification/NotificationQueueProcessor.js +++ b/tests/unit/notification/NotificationQueueProcessor.js @@ -11,17 +11,22 @@ const kafkaConfig = require('../../config.notification.json').queuePopulator.kafka; const notificationConfig = require('../../config.notification.json').extensions.notification; +const zookeeperConfig + = require('../../config.notification.json').zookeeper; const logger = new werelogs.Logger('NotificationQueueProcessor:test'); -const notificationConfiguration = { - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destination1', - filterRules: [], - }, - ], +const config = { + bucket: 'example-bucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + ], + }, }; const kafkaEntry = { @@ -80,7 +85,7 @@ describe('NotificationQueueProcessor:: ', () => { let notificationQueueProcessor; beforeEach(() => { - notificationQueueProcessor = new NotificationQueueProcessor(mongoConfig, kafkaConfig, + notificationQueueProcessor = new NotificationQueueProcessor(mongoConfig, zookeeperConfig, kafkaConfig, notificationConfig, notificationConfig.destinations[0].resource, null); notificationQueueProcessor.logger = logger; }); @@ -91,7 +96,9 @@ describe('NotificationQueueProcessor:: ', () => { describe('processKafkaEntry ::', () => { it('should publish notification in correct format', async () => { - notificationQueueProcessor._getConfig = sinon.stub().yields(null, notificationConfiguration); + notificationQueueProcessor.bnConfigManager = { + getConfig: sinon.stub().yields(null, config), + }; const sendStub = sinon.stub().yields(null); notificationQueueProcessor._destination = { send: sendStub, diff --git a/tests/unit/notification/QueueProcessor.spec.js b/tests/unit/notification/QueueProcessor.spec.js index acc2206c2..4414c0575 100644 --- a/tests/unit/notification/QueueProcessor.spec.js +++ b/tests/unit/notification/QueueProcessor.spec.js @@ -9,7 +9,7 @@ describe('notification QueueProcessor', () => { let qp; before(done => { - qp = new QueueProcessor(null, null, { + qp = new QueueProcessor(null, null, null, { destinations: [ { resource: 'destId', @@ -17,22 +17,22 @@ describe('notification QueueProcessor', () => { }, ], }, 'destId', null); - qp._getConfig = (bucket, cb) => cb(null, { - queueConfig: [ - { - queueArn: 'arn:scality:bucketnotif:::destId', - events: [ - 's3:ObjectCreated:*', - ], - }, - ], - }); qp.bnConfigManager = { setConfig: () => {}, + getConfig: (bucket, cb) => cb(null, { + bucket: 'mybucket', + notificationConfiguration: { + queueConfig: [ + { + queueArn: 'arn:scality:bucketnotif:::destId', + events: [ + 's3:ObjectCreated:*', + ], + }, + ], + }, + }) }; - qp.bnConfigManager.setConfig('mybucket', { - host: 'foo', - }); qp._setupDestination('kafka', done); }); @@ -175,7 +175,7 @@ describe('notification QueueProcessor with multiple rules', () => { let sendStub; before(done => { - qp = new QueueProcessor(null, null, { + qp = new QueueProcessor(null, null, null, { destinations: [ { resource: 'destId', @@ -184,34 +184,36 @@ describe('notification QueueProcessor with multiple rules', () => { ], }, 'destId', null); - qp._getConfig = (bucket, cb) => cb(null, { - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destId', - id: '0', - filterRules: [ - { - name: 'Prefix', - value: 'toto/', - }, - ], - }, { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destId', - id: '1', - filterRules: [ + qp.bnConfigManager = { + setConfig: () => {}, + getConfig: (bucket, cb) => cb(null, { + bucket: 'mybucket', + notificationConfiguration: { + queueConfig: [ { - name: 'Prefix', - value: 'tata/', + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destId', + id: '0', + filterRules: [ + { + name: 'Prefix', + value: 'toto/', + }, + ], + }, { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destId', + id: '1', + filterRules: [ + { + name: 'Prefix', + value: 'tata/', + }, + ], }, ], }, - ], - }); - - qp.bnConfigManager = { - setConfig: () => {}, + }), }; qp._setupDestination('kafka', done); @@ -360,7 +362,7 @@ describe('notification QueueProcessor with one rule filtering by prefix and suff let sendStub; before(done => { - qp = new QueueProcessor(null, null, { + qp = new QueueProcessor(null, null, null, { destinations: [ { resource: 'destId', @@ -369,28 +371,30 @@ describe('notification QueueProcessor with one rule filtering by prefix and suff ], }, 'destId', null); - qp._getConfig = (bucket, cb) => cb(null, { - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destId', - id: '0', - filterRules: [ - { - name: 'Prefix', - value: 'toto/', - }, + qp.bnConfigManager = { + setConfig: () => {}, + getConfig: (bucket, cb) => cb(null, { + bucket: 'mybucket', + notificationConfiguration: { + queueConfig: [ { - name: 'Suffix', - value: '.png', + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destId', + id: '0', + filterRules: [ + { + name: 'Prefix', + value: 'toto/', + }, + { + name: 'Suffix', + value: '.png', + }, + ], }, ], }, - ], - }); - - qp.bnConfigManager = { - setConfig: () => {}, + }), }; qp._setupDestination('kafka', done); @@ -505,7 +509,7 @@ describe('notification QueueProcessor with multiple rules and object matching al let sendStub; before(done => { - qp = new QueueProcessor(null, null, { + qp = new QueueProcessor(null, null, null, { destinations: [ { resource: 'destId', @@ -514,34 +518,36 @@ describe('notification QueueProcessor with multiple rules and object matching al ], }, 'destId', null); - qp._getConfig = (bucket, cb) => cb(null, { - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destId', - id: '0', - filterRules: [ - { - name: 'Prefix', - value: 'toto/', - }, - ], - }, { - events: ['s3:ObjectCreated:*'], - queueArn: 'arn:scality:bucketnotif:::destId', - id: '1', - filterRules: [ + qp.bnConfigManager = { + setConfig: () => {}, + getConfig: (bucket, cb) => cb(null, { + bucket: 'mybucket', + notificationConfiguration: { + queueConfig: [ { - name: 'Suffix', - value: '.png', + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destId', + id: '0', + filterRules: [ + { + name: 'Prefix', + value: 'toto/', + }, + ], + }, { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destId', + id: '1', + filterRules: [ + { + name: 'Suffix', + value: '.png', + }, + ], }, ], }, - ], - }); - - qp.bnConfigManager = { - setConfig: () => {}, + }), }; qp._setupDestination('kafka', done); @@ -616,7 +622,7 @@ describe('notification QueueProcessor destination id not matching the rule desti let sendStub; before(done => { - qp = new QueueProcessor(null, null, { + qp = new QueueProcessor(null, null, null, { destinations: [ { resource: 'destId', @@ -642,18 +648,24 @@ describe('notification QueueProcessor destination id not matching the rule desti ]; mismatchedARN.forEach(arn => { - qp._getConfig = (bucket, cb) => cb(null, { - queueConfig: [ - { - events: ['s3:ObjectCreated:*'], - queueArn: arn, - id: '0', - filterRules: [ - { name: 'Prefix', value: 'toto/' }, + qp.bnConfigManager = { + setConfig: () => {}, + getConfig: (bucket, cb) => cb(null, { + bucket: 'mybucket', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:*'], + queueArn: arn, + id: '0', + filterRules: [ + { name: 'Prefix', value: 'toto/' }, + ], + }, ], }, - ], - }); + }), + }; const kafkaEntry = { value: JSON.stringify({ diff --git a/tests/unit/notification/configManager/MongoConfigManager.spec.js b/tests/unit/notification/configManager/MongoConfigManager.spec.js new file mode 100644 index 000000000..3ca439a8e --- /dev/null +++ b/tests/unit/notification/configManager/MongoConfigManager.spec.js @@ -0,0 +1,380 @@ +const assert = require('assert'); +const werelogs = require('werelogs'); +const sinon = require('sinon'); +const events = require('events'); +const MongoClient = require('mongodb').MongoClient; + +const ChangeStream = require('../../../../lib/wrappers/ChangeStream'); +const MongoConfigManager + = require('../../../../extensions/notification/configManager/MongoConfigManager'); +const { errors } = require('arsenal'); +const mongoConfig + = require('../../../config.json').queuePopulator.mongo; + +const logger = new werelogs.Logger('MongoConfigManager:test'); + +const notificationConfiguration = { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + }, + ], +}; + +const notificationConfigurationVariant = { + queueConfig: [ + { + events: ['s3:ObjectCreated:*'], + queueArn: 'arn:scality:bucketnotif:::destination2', + filterRules: [], + }, + ], +}; + +describe('MongoConfigManager ::', () => { + const params = { + mongoConfig, + bucketMetastore: '__metastore', + maxCachedConfigs: 100, + logger, + }; + + afterEach(() => { + sinon.restore(); + }); + + describe('Constructor & setup ::', () => { + it('Constructor should validate params', done => { + assert.throws(() => new MongoConfigManager()); + assert.throws(() => new MongoConfigManager({})); + assert.throws(() => new MongoConfigManager({ + mongoConfig: null, + logger: null, + })); + const manager = new MongoConfigManager(params); + assert(manager instanceof MongoConfigManager); + return done(); + }); + + it('Setup should initialize the mongoClient and the change stream', done => { + const manager = new MongoConfigManager(params); + const setMongoStub = sinon.stub(manager, '_setupMongoClient').callsArg(0); + const setChangeStreamStub = sinon.stub(manager, '_setMetastoreChangeStream').returns(); + manager.setup(err => { + assert.ifError(err); + assert(setMongoStub.calledOnce); + assert(setChangeStreamStub.calledOnce); + // cache should initially be empty + assert.strictEqual(manager._cachedConfigs.count(), 0); + return done(); + }); + }); + + it('Setup should fail when mongo setup fails', done => { + const manager = new MongoConfigManager(params); + const setMongoStub = sinon.stub(manager, '_setupMongoClient').callsArgWith(0, + errors.InternalError); + const setChangeStreamStub = sinon.stub(manager, '_setMetastoreChangeStream'); + manager.setup(err => { + assert.deepEqual(err, errors.InternalError); + assert(setMongoStub.calledOnce); + assert(setChangeStreamStub.notCalled); + return done(); + }); + }); + + it('Setup should fail when changeStream setup fails', done => { + const manager = new MongoConfigManager(params); + const setMongoStub = sinon.stub(manager, '_setupMongoClient').callsArg(0); + const setChangeStreamStub = sinon.stub(manager, '_setMetastoreChangeStream').throws( + errors.InternalError); + manager.setup(err => { + assert.deepEqual(err, errors.InternalError); + assert(setMongoStub.calledOnce); + assert(setChangeStreamStub.calledOnce); + return done(); + }); + }); + }); + + describe('_setupMongoClient ::', () => { + it('should setup the mongo client and get metastore collection', () => { + const manager = new MongoConfigManager(params); + const getCollectionStub = sinon.stub(); + const mongoCommandStub = sinon.stub().returns({ + version: '4.3.17', + }); + const getDbStub = sinon.stub().returns({ + collection: getCollectionStub, + command: mongoCommandStub, + }); + const mongoConnectStub = sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { + db: getDbStub, + }); + manager._setupMongoClient(err => { + assert.ifError(err); + assert(mongoConnectStub.calledOnce); + assert(getDbStub.calledOnce); + assert(getCollectionStub.calledOnce); + assert(mongoCommandStub.calledOnceWith({ + buildInfo: 1, + })); + assert.equal(manager._mongoVersion, '4.3.17'); + }); + }); + + it('should fail when mongo client setup fails', () => { + const manager = new MongoConfigManager(params); + sinon.stub(MongoClient, 'connect').callsArgWith(2, + errors.InternalError, null); + manager._setupMongoClient(err => { + assert.deepEqual(err, errors.InternalError); + }); + }); + + it('should fail when when getting the metadata db', () => { + const manager = new MongoConfigManager(params); + const getDbStub = sinon.stub().throws(errors.InternalError); + sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { + db: getDbStub, + }); + manager._setupMongoClient(err => { + assert.deepEqual(err, errors.InternalError); + }); + }); + + it('should fail when mongo client fails to get metastore', () => { + const manager = new MongoConfigManager(params); + const getCollectionStub = sinon.stub().throws(errors.InternalError); + const getDbStub = sinon.stub().returns({ + collection: getCollectionStub, + }); + sinon.stub(MongoClient, 'connect').callsArgWith(2, null, { + db: getDbStub, + }); + manager._setupMongoClient(err => { + assert.deepEqual(err, errors.InternalError); + }); + }); + }); + + describe('_handleChangeStreamChangeEvent ::', () => { + it('should remove entry from cache', () => { + const changeStreamEvent = { + _id: 'resumeToken', + operationType: 'delete', + documentKey: { + _id: 'example-bucket-1', + }, + fullDocument: { + _id: 'example-bucket-1', + value: { + notificationConfiguration, + } + } + }; + const manager = new MongoConfigManager(params); + // populating cache + manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + // handling change stream event + manager._handleChangeStreamChangeEvent(changeStreamEvent); + // should delete bucket config from cache + assert.strictEqual(manager._cachedConfigs.get('example-bucket-1'), + undefined); + assert.strictEqual(manager._cachedConfigs.count(), 0); + }); + + it('should replace entry from cache', () => { + const changeStreamEvent = { + _id: 'resumeToken', + operationType: 'replace', + documentKey: { + _id: 'example-bucket-1', + }, + fullDocument: { + _id: 'example-bucket-1', + value: { + notificationConfiguration: + notificationConfigurationVariant, + } + } + }; + const manager = new MongoConfigManager(params); + // populating cache + manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + // handling change stream event + manager._handleChangeStreamChangeEvent(changeStreamEvent); + // should update bucket config in cache + assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), + notificationConfigurationVariant); + assert.strictEqual(manager._cachedConfigs.count(), 1); + // same thing should happen with "update" event + changeStreamEvent.operationType = 'update'; + // reseting config to default one + changeStreamEvent.fullDocument.value.notificationConfiguration = + notificationConfiguration; + // emiting the new "update" event + manager._handleChangeStreamChangeEvent(changeStreamEvent); + // cached config must be updated + assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), + notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + }); + + it('should do nothing when config not in cache', () => { + const changeStreamEvent = { + _id: 'resumeToken', + operationType: 'delete', + documentKey: { + _id: 'example-bucket-2', + }, + fullDocument: { + _id: 'example-bucket-2', + value: { + notificationConfiguration: + notificationConfigurationVariant, + } + } + }; + const manager = new MongoConfigManager(params); + // populating cache + manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + // handling change stream event + manager._handleChangeStreamChangeEvent(changeStreamEvent); + // cache should not change + assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), + notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + }); + + it('should do nothing when operation is not supported', () => { + const changeStreamEvent = { + _id: 'resumeToken', + operationType: 'insert', + documentKey: { + _id: 'example-bucket-1', + }, + fullDocument: { + _id: 'example-bucket-2', + value: { + notificationConfiguration: + notificationConfigurationVariant, + } + } + }; + const manager = new MongoConfigManager(params); + // populating cache + manager._cachedConfigs.add('example-bucket-1', notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + assert(manager._cachedConfigs.get('example-bucket-1')); + // handling change stream event + manager._handleChangeStreamChangeEvent(changeStreamEvent); + // cache should not change + assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), + notificationConfiguration); + assert.strictEqual(manager._cachedConfigs.count(), 1); + }); + }); + + describe('_setMetastoreChangeStream ::', () => { + it('should use resumeAfter with mongo 3.6', () => { + const manager = new MongoConfigManager(params); + manager._mongoVersion = '3.6.2'; + manager._metastore = { + watch: sinon.stub().returns(new events.EventEmitter()), + }; + manager._setMetastoreChangeStream(); + assert(manager._metastoreChangeStream instanceof ChangeStream); + assert.equal(manager._metastoreChangeStream._resumeField, 'resumeAfter'); + }); + + it('should use resumeAfter with mongo 4.0', () => { + const manager = new MongoConfigManager(params); + manager._mongoVersion = '4.0.7'; + manager._metastore = { + watch: sinon.stub().returns(new events.EventEmitter()), + }; + manager._setMetastoreChangeStream(); + assert(manager._metastoreChangeStream instanceof ChangeStream); + assert.equal(manager._metastoreChangeStream._resumeField, 'resumeAfter'); + }); + + it('should use startAfter with mongo 4.2', () => { + const manager = new MongoConfigManager(params); + manager._mongoVersion = '4.2.3'; + manager._metastore = { + watch: sinon.stub().returns(new events.EventEmitter()), + }; + manager._setMetastoreChangeStream(); + assert(manager._metastoreChangeStream instanceof ChangeStream); + assert.equal(manager._metastoreChangeStream._resumeField, 'startAfter'); + }); + }); + + describe('getConfig ::', () => { + it('should return notification configuration of bucket', async () => { + const manager = new MongoConfigManager(params); + manager._metastore = { + findOne: () => ( + { + value: { + notificationConfiguration, + } + }), + }; + const expectedConfig = { + bucket: 'example-bucket-1', + notificationConfiguration, + }; + assert.strictEqual(manager._cachedConfigs.count(), 0); + const config = await manager.getConfig('example-bucket-1'); + assert.deepEqual(config, expectedConfig); + // should also cache config + assert.strictEqual(manager._cachedConfigs.count(), 1); + assert.deepEqual(manager._cachedConfigs.get('example-bucket-1'), notificationConfiguration); + }); + + it('should return undefined when bucket doesn\'t have notification configuration', async () => { + const manager = new MongoConfigManager(params); + manager._metastore = { + findOne: () => ({ value: {} }), + }; + const config = await manager.getConfig('example-bucket-1'); + assert.deepEqual(config, undefined); + }); + + it('should throw when mongo findOne fails', done => { + const manager = new MongoConfigManager(params); + manager._metastore = { + findOne: sinon.stub().throws(errors.InternalError), + }; + manager.getConfig('example-bucket-1') + .then(() => { + assert.fail('should have thrown'); + }) + .catch(err => { + assert.deepEqual(err, errors.InternalError); + done(); + }); + }); + }); + + describe('setConfig ::', () => { + it('should always return false', async () => { + const manager = new MongoConfigManager(params); + assert.strictEqual(manager.setConfig('example-bucket-1', {}), false); + }); + }); + + describe('removeConfig ::', () => { + it('should always return false', async () => { + const manager = new MongoConfigManager(params); + assert.strictEqual(manager.removeConfig('example-bucket-1'), false); + }); + }); +}); diff --git a/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js b/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js new file mode 100644 index 000000000..75791775c --- /dev/null +++ b/tests/unit/notification/configManager/ZookeeperConfigManager.spec.js @@ -0,0 +1,258 @@ +const assert = require('assert'); +const async = require('async'); +const werelogs = require('werelogs'); +const ZookeeperMock = require('zookeeper-mock'); +const ZookeeperManager = require('../../../../lib/clients/ZookeeperManager'); +const sinon = require('sinon'); + +const ZookeeperConfigManager + = require('../../../../extensions/notification/configManager/ZookeeperConfigManager'); +const constants + = require('../../../../extensions/notification/constants'); + +const logger = new werelogs.Logger('ZookeeperConfigManager:test'); +const { zkConfigParentNode } = constants; +const concurrency = 10; +const bucketPrefix = 'bucket'; +const timeoutMs = 100; + +function getTestConfigValue(bucket) { + const data = { + bucket, + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'arn:scality:bucketnotif:::destination1', + filterRules: [], + id: `${zkConfigParentNode}-${bucket}`, + }, + ], + }, + }; + return data; +} + +function populateTestConfigs(zkClient, numberOfConfigs, cb) { + async.timesLimit(numberOfConfigs, concurrency, (n, done) => { + const bucket = `${bucketPrefix}${n + 1}`; + const val = getTestConfigValue(bucket); + const strVal = JSON.stringify(val); + const node = `/${zkConfigParentNode}/${bucket}`; + async.series([ + next => zkClient.mkdirp(node, next), + next => zkClient.setData(node, strVal, next), + ], done); + }, cb); +} + +function listBuckets(zkClient, cb) { + const node = `/${zkConfigParentNode}`; + zkClient.getChildren(node, cb); +} + +function deleteTestConfigs(zkClient, cb) { + const node = `/${zkConfigParentNode}`; + listBuckets(zkClient, (error, children) => { + if (error) { + assert.ifError(error); + cb(error); + } else { + async.eachLimit(children, concurrency, (child, next) => { + const childNode = `${node}/${child}`; + zkClient.remove(childNode, next); + }, cb); + } + }); +} + +function checkCount(zkClient, manager, cb) { + listBuckets(zkClient, (err, buckets) => { + assert.ifError(err); + const zkConfigCount = buckets.length; + const managerConfigs = [...manager._configs.keys()]; + assert.strictEqual(zkConfigCount, managerConfigs.length); + cb(); + }); +} + +function managerInit(manager, cb) { + manager.setup(err => { + assert.ifError(err); + cb(); + }); +} + +function checkParentConfigZkNode(manager, cb) { + const zkPath = `/${zkConfigParentNode}`; + manager._checkNodeExists(zkPath, (err, exists) => { + assert.ifError(err); + assert(exists); + cb(); + }); +} + +describe('ZookeeperConfigManager', () => { + const zk = new ZookeeperMock({ doLog: false }); + const zkClient = zk.createClient(); + const params = { + zkClient, + zkConcurrency: 10, + logger, + }; + + describe('Constructor', () => { + function checkConfigParentNodeStub(cb) { + return cb(new Error('error checking config parent node')); + } + + after(() => { + sinon.restore(); + zk._resetState(); + }); + + it('constructor and setup checks', done => { + assert.throws(() => new ZookeeperConfigManager()); + assert.throws(() => new ZookeeperConfigManager({})); + assert.throws(() => new ZookeeperConfigManager({ + zkClient: null, + logger: null, + })); + const manager = new ZookeeperConfigManager(params); + assert(manager instanceof ZookeeperConfigManager); + async.series([ + next => managerInit(manager, next), + next => checkParentConfigZkNode(manager, next), + ], done); + }); + + it('should return error if checkConfigParentNode fails', done => { + const manager = new ZookeeperConfigManager(params); + sinon.stub(manager, '_checkConfigurationParentNode') + .callsFake(checkConfigParentNodeStub); + manager.setup(err => { + assert(err); + return done(); + }); + }); + }); + + describe('Operations', () => { + beforeEach(done => populateTestConfigs(zkClient, 5, done)); + + afterEach(() => { + zk._resetState(); + }); + + it('should get bucket notification configuration', () => { + const manager = new ZookeeperConfigManager(params); + managerInit(manager, () => { + const bucket = 'bucket1'; + const result = manager.getConfig(bucket); + assert.strictEqual(result.bucket, bucket); + }); + }); + + it('should return undefined for a non-existing bucket', () => { + const manager = new ZookeeperConfigManager(params); + managerInit(manager, () => { + const bucket = 'bucket100'; + const result = manager.getConfig(bucket); + assert.strictEqual(result, undefined); + }); + }); + + it('should add bucket notification configuration', done => { + const manager = new ZookeeperConfigManager(params); + managerInit(manager, () => { + const bucket = 'bucket100'; + const config = getTestConfigValue(bucket); + const result = manager.setConfig(bucket, config); + assert(result); + setTimeout(() => { + checkCount(zkClient, manager, done); + }, timeoutMs); + }); + }); + + it('should update bucket notification configuration', done => { + const manager = new ZookeeperConfigManager(params); + managerInit(manager, () => { + const bucket = 'bucket1'; + const config = getTestConfigValue(`${bucket}-updated`); + const result = manager.setConfig(bucket, config); + assert(result); + setTimeout(() => { + const updatedConfig = manager.getConfig(bucket); + assert.strictEqual(updatedConfig.bucket, + `${bucket}-updated`); + checkCount(zkClient, manager, done); + }, timeoutMs); + }); + }); + + it('should remove bucket notification configuration', done => { + const manager = new ZookeeperConfigManager(params); + managerInit(manager, () => { + const bucket = 'bucket1'; + let result = manager.removeConfig(bucket); + assert(result); + setTimeout(() => { + result = manager.getConfig(bucket); + assert.strictEqual(result, undefined); + checkCount(zkClient, manager, done); + }, timeoutMs); + }); + }); + + it('config count should be zero when all configs are removed', done => { + const manager = new ZookeeperConfigManager(params); + async.series([ + next => managerInit(manager, next), + next => deleteTestConfigs(zkClient, next), + next => setTimeout(next, timeoutMs), + next => checkCount(zkClient, manager, next), + ], done); + }); + }); + + describe('setup', () => { + it('should setup zookeeper client when it\'s not provided', done => { + const manager = new ZookeeperConfigManager({ + zkPath: '/notification', + zkConfig: { + connectionString: '127.0.0.1:2181', + autoCreateNamespace: false, + }, + zkConcurrency: 10, + logger, + }); + sinon.stub(manager, '_checkConfigurationParentNode').yields(null, null); + sinon.stub(manager, '_listBucketsWithConfig').yields(null, null); + sinon.stub(manager, '_updateLocalStore').yields(null); + const interval = setInterval(() => { + if (manager._zkClient) { + manager._zkClient.emit('ready'); + clearInterval(interval); + } + }, 30); + manager.setup(err => { + assert.ifError(err); + assert(manager._zkClient instanceof ZookeeperManager); + done(); + }); + }); + + it('should not create zookeeper client when it\'s provided', done => { + const manager = new ZookeeperConfigManager(params); + sinon.stub(manager, '_checkConfigurationParentNode').yields(null, null); + sinon.stub(manager, '_listBucketsWithConfig').yields(null, null); + sinon.stub(manager, '_updateLocalStore').yields(null); + manager.setup(err => { + assert.ifError(err); + assert(!(manager._zkClient instanceof ZookeeperManager)); + done(); + }); + }); + }); +}); diff --git a/tests/unit/notification/utils/config.js b/tests/unit/notification/utils/config.js index b5e5483da..97eecd51b 100644 --- a/tests/unit/notification/utils/config.js +++ b/tests/unit/notification/utils/config.js @@ -518,7 +518,7 @@ describe('Notification configuration util', () => { const bnConfig = getBucketNotifConfig(test.entry.bucket, configMap); const result - = notifConfUtil.validateEntry(bnConfig.notificationConfiguration, test.entry); + = notifConfUtil.validateEntry(bnConfig, test.entry); assert.strictEqual(test.pass, result.isValid); if (test.pass) { assert.deepStrictEqual(test.expectedMatchingConfig, result.matchingConfig); @@ -527,5 +527,29 @@ describe('Notification configuration util', () => { } }); }); + + it('should fail if the configuration is for another bucket', () => { + const config = { + bucket: 'bucket1', + notificationConfiguration: { + queueConfig: [ + { + events: ['s3:ObjectCreated:Put'], + queueArn: 'q1', + filterRules: [], + id: 'config1', + }, + ], + }, + }; + const entry = { + eventType: 's3:ObjectCreated:Put', + bucket: 'bucket10', + key: 'test.png', + }; + const result = notifConfUtil.validateEntry(config, entry); + assert.strictEqual(false, result.isValid); + assert(!result.matchingConfig); + }); }); });