diff --git a/extensions/replication/ReplicationConfigValidator.js b/extensions/replication/ReplicationConfigValidator.js index 310d5a2e9..bdfbb5dea 100644 --- a/extensions/replication/ReplicationConfigValidator.js +++ b/extensions/replication/ReplicationConfigValidator.js @@ -1,7 +1,7 @@ const fs = require('fs'); const joi = require('joi'); const { hostPortJoi, transportJoi, bootstrapListJoi, adminCredsJoi, - retryParamsJoi, probeServerJoi } = + retryParamsJoi, probeServerJoi, probeServerPerSite } = require('../../lib/config/configItems.joi'); const qpRetryJoi = joi.object({ @@ -71,7 +71,10 @@ const joiSchema = joi.object({ concurrency: joi.number().greater(0).default(10), mpuPartsConcurrency: joi.number().greater(0).default(10), minMPUSizeMB: joi.number().greater(0).default(20), - probeServer: probeServerJoi.default(), + probeServer: joi.alternatives().try( + probeServerJoi, + probeServerPerSite, + ).default({ bindAddress: 'localhost', port: 4042 }), circuitBreaker: joi.object().optional(), }).required(), replicationStatusProcessor: { diff --git a/extensions/replication/queueProcessor/task.js b/extensions/replication/queueProcessor/task.js index aa5b1de5b..6eb351907 100644 --- a/extensions/replication/queueProcessor/task.js +++ b/extensions/replication/queueProcessor/task.js @@ -22,7 +22,7 @@ const internalHttpsConfig = config.internalHttps; const mConfig = config.metrics; const { connectionString, autoCreateNamespace } = zkConfig; const RESUME_NODE = 'scheduledResume'; -const { startProbeServer } = require('../../../lib/util/probe'); +const { startProbeServer, getProbeConfig } = require('../../../lib/util/probe'); const { DEFAULT_LIVE_ROUTE, DEFAULT_METRICS_ROUTE, DEFAULT_READY_ROUTE } = require('arsenal').network.probe.ProbeServer; const { sendSuccess } = require('arsenal').network.probe.Utils; @@ -240,7 +240,7 @@ function initAndStart(zkClient) { }); startProbeServer( - repConfig.queueProcessor.probeServer, + getProbeConfig(repConfig.queueProcessor, siteNames, log), (err, probeServer) => { if (err) { log.fatal('error creating probe server', { diff --git a/lib/config/configItems.joi.js b/lib/config/configItems.joi.js index d165ebcf7..757cbed21 100644 --- a/lib/config/configItems.joi.js +++ b/lib/config/configItems.joi.js @@ -112,6 +112,14 @@ const probeServerJoi = joi.object({ port: joi.number().required(), }); +const probeServerPerSite = joi.array().min(1).items( + joi.object({ + bindAddress: joi.string().default('localhost'), + port: joi.number().required(), + site: joi.string().required() + }) +); + const mongoJoi = joi.object({ replicaSetHosts: joi.string().default('localhost:27017'), logName: joi.string().default('s3-recordlog'), @@ -161,6 +169,7 @@ module.exports = { retryParamsJoi, certFilePathsJoi, probeServerJoi, + probeServerPerSite, stsConfigJoi, mongoJoi, qpKafkaJoi, diff --git a/lib/util/probe.js b/lib/util/probe.js index e51392d08..026e3111f 100644 --- a/lib/util/probe.js +++ b/lib/util/probe.js @@ -60,8 +60,42 @@ function observeKafkaStats(msg) { kafkaMetrics.observe(JSON.parse(msg.message)); } +/** + * Get probe config will pull the configuration for the probe server based on + * the provided site name. If siteNames is empty, it returns the global probe server config + * only if it's a single object. + * + * @param {Object} queueProcessorConfig - Configuration of the queue processor that + * holds the probe server configs for all sites + * @param {Array} siteNames - List of site names (should contain at most one element) + * @param {Logger} logger - Logger instance + * @returns {Object|undefined} Config for site or global config, undefined if no match found or invalid config + */ +function getProbeConfig(queueProcessorConfig, siteNames, logger) { + if (Array.isArray(queueProcessorConfig.probeServer)) { + if (siteNames.length !== 1) { + logger.error('Process configured for more than one site or no site provided', { + siteNames, + queueProcessorConfig, + }); + return undefined; + } + const siteConfig = queueProcessorConfig.probeServer.find(config => config.site === siteNames[0]); + if (siteConfig === undefined) { + logger.warn('Probe server configuration for site not found', { + siteName: siteNames[0], + queueProcessorConfig, + }); + } + return siteConfig; + } + + return queueProcessorConfig.probeServer; + } + module.exports = { startProbeServer, startProbeServerPromise, observeKafkaStats, + getProbeConfig, }; diff --git a/tests/unit/lib/util/probe.spec.js b/tests/unit/lib/util/probe.spec.js index a2fa05015..d3afdcd56 100644 --- a/tests/unit/lib/util/probe.spec.js +++ b/tests/unit/lib/util/probe.spec.js @@ -1,6 +1,7 @@ const assert = require('assert'); -const { startProbeServer } = +const { startProbeServer, getProbeConfig } = require('../../../../lib/util/probe'); +const Logger = require('werelogs').Logger; describe('Probe server', () => { it('is not created with no config', done => { @@ -25,3 +26,74 @@ describe('Probe server', () => { }); }); }); + +describe('getProbeConfig', () => { + const log = new Logger('getProbeConfig'); + it('returns the probeServer config when siteNames is empty and probeServer is a single object', () => { + const queueProcessorConfig = { + probeServer: { bindAddress: '127.0.0.1', port: '8080' } + }; + const siteNames = []; + + const result = getProbeConfig(queueProcessorConfig, siteNames, log); + assert.deepStrictEqual(result, { bindAddress: '127.0.0.1', port: '8080' }); + }); + + it('returns undefined when siteNames is empty and probeServer is not a single object', () => { + const queueProcessorConfig = { + probeServer: [{ site: 'site1', bindAddress: '127.0.0.1', port: '8080' }] + }; + const siteNames = []; + + const result = getProbeConfig(queueProcessorConfig, siteNames, log); + assert.strictEqual(result, undefined); + }); + + it('returns the correct site config when probeServer is an array and siteNames has one matching element', () => { + const queueProcessorConfig = { + probeServer: [ + { site: 'site1', bindAddress: '127.0.0.1', port: '8080' }, + { site: 'site2', bindAddress: '127.0.0.2', port: '8081' } + ] + }; + const siteNames = ['site2']; + + const result = getProbeConfig(queueProcessorConfig, siteNames, log); + assert.deepStrictEqual(result, { site: 'site2', bindAddress: '127.0.0.2', port: '8081' }); + }); + + it('returns undefined when probeServer is an array and siteNames has no matching element', () => { + const queueProcessorConfig = { + probeServer: [ + { site: 'site1', bindAddress: '127.0.0.1', port: '8080' } + ] + }; + const siteNames = ['site2']; + + const result = getProbeConfig(queueProcessorConfig, siteNames, log); + assert.strictEqual(result, undefined); + }); + + it('returns undefined when siteNames contains more than one element', () => { + const queueProcessorConfig = { + probeServer: [ + { site: 'site1', bindAddress: '127.0.0.1', port: '8080' }, + { site: 'site2', bindAddress: '127.0.0.2', port: '8081' } + ] + }; + const siteNames = ['site1', 'site2']; // More than one element in siteNames + + const result = getProbeConfig(queueProcessorConfig, siteNames, log); + assert.strictEqual(result, undefined); + }); + + it('returns probeserver when probeServer is not an array and siteNames is not empty', () => { + const queueProcessorConfig = { + probeServer: { bindAddress: '127.0.0.1', port: '8080' } // probeServer is a single object + }; + const siteNames = ['site1']; // siteNames is not empty + + const result = getProbeConfig(queueProcessorConfig, siteNames, log); + assert.deepStrictEqual(result, queueProcessorConfig.probeServer); + }); + });