diff --git a/extensions/mongoProcessor/MongoProcessorConfigValidator.js b/extensions/mongoProcessor/MongoProcessorConfigValidator.js index cc72ada5f..ac932cebb 100644 --- a/extensions/mongoProcessor/MongoProcessorConfigValidator.js +++ b/extensions/mongoProcessor/MongoProcessorConfigValidator.js @@ -5,6 +5,7 @@ const joiSchema = joi.object({ topic: joi.string().required(), groupId: joi.string().required(), retry: retryParamsJoi, + concurrency: joi.number().greater(0).default(1), probeServer: probeServerJoi.default(), circuitBreaker: joi.object().optional(), }); diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 6811079f4..30339f788 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -60,6 +60,7 @@ class MongoQueueProcessor { * randomness * @param {number} [mongoProcessorConfig.retry.backoff.factor] - * backoff factor + * @param {number} [mongoProcessorConfig.concurrency] - consumer concurrency * @param {Object} mongoClientConfig - config for connecting to mongo * @param {Object} mConfig - metrics config */ @@ -141,6 +142,7 @@ class MongoQueueProcessor { circuitBreakerMetrics: { type: 'mongo_queue_processor', }, + concurrency: this.mongoProcessorConfig.concurrency, }); this._consumer.on('error', () => { MongoProcessorMetrics.onIngestionKafkaConsume('error');