Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 6, 2024
1 parent 7f5a553 commit 8d8381a
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 15 deletions.
4 changes: 2 additions & 2 deletions lib/queuePopulator/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ function _isVersionedLogKey(key) {
class IngestionReader extends LogReader {
constructor(params) {
const { zkClient, ingestionConfig, kafkaConfig, bucketdConfig, qpConfig,
logger, extensions, producer, metricsProducer, s3Config } = params;
logger, extensions, producer, s3Config } = params;
super({ zkClient, kafkaConfig, logConsumer: {}, logId: '', logger,
extensions, metricsProducer, zkMetricsHandler: IngestionPopulatorMetrics });
extensions, zkMetricsHandler: IngestionPopulatorMetrics });
this._ingestionConfig = ingestionConfig;
this.qpConfig = qpConfig;
this.s3Config = s3Config;
Expand Down
6 changes: 2 additions & 4 deletions lib/queuePopulator/KafkaLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ class KafkaLogReader extends LogReader {
* @param {Object} params.qpKafkaConfig - queue populator kafka configuration
* @param {QueuePopulatorExtension[]} params.extensions - array of
* queue populator extension modules
* @param {MetricsProducer} params.metricsProducer - instance of metrics
* producer
* @param {MetricsHandler} params.metricsHandler - instance of metrics
* handler
*/
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, metricsHandler } = params;
// conf contains global kafka and queuePoplator kafka configs
const conf = {
hosts: kafkaConfig.hosts,
Expand All @@ -31,7 +29,7 @@ class KafkaLogReader extends LogReader {
const logConsumer = new LogConsumer(conf, logger);
super({ zkClient, kafkaConfig, zkConfig, logConsumer,
logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
metricsHandler });
this._kafkaConfig = conf;
}

Expand Down
2 changes: 0 additions & 2 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class LogReader {
* @param {Logger} params.logger - logger object
* @param {QueuePopulatorExtension[]} params.extensions - array of
* queue populator extension modules
* @param {MetricsProducer} params.metricsProducer - instance of metrics
* producer
* @param {MetricsHandler} params.metricsHandler - instance of metrics
* handler
* @param {ZkMetricsHandler} params.zkMetricsHandler - instance of zookeeper
Expand Down
4 changes: 2 additions & 2 deletions lib/queuePopulator/MongoLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ const LogReader = require('./LogReader');
class MongoLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, mongoConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, metricsHandler } = params;
logger.info('initializing mongo log reader',
{ method: 'MongoLogReader.constructor',
mongoConfig });
const logConsumer = new LogConsumer(mongoConfig, logger);
super({ zkClient, kafkaConfig, zkConfig, logConsumer,
logId: `mongo_${mongoConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
metricsHandler });
this._mongoConfig = mongoConfig;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/queuePopulator/RaftLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const LogReader = require('./LogReader');
class RaftLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, bucketdConfig, httpsConfig,
raftId, logger, extensions, metricsProducer, metricsHandler } = params;
raftId, logger, extensions, metricsHandler } = params;
const { host, port } = bucketdConfig;
logger.info('initializing raft log reader',
{ method: 'RaftLogReader.constructor',
Expand All @@ -25,7 +25,7 @@ class RaftLogReader extends LogReader {
raftSession: raftId,
logger });
super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`,
logger, extensions, metricsProducer, metricsHandler });
logger, extensions, metricsHandler });
this.raftId = raftId;
}

Expand Down
2 changes: 0 additions & 2 deletions tests/functional/ingestion/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ describe('ingestion reader tests with mock', function fD() {
qpConfig: testConfig.queuePopulator,
logger: dummyLogger,
extensions: [ingestionQP],
metricsProducer: { publishMetrics: () => { } },
s3Config: testConfig.s3,
producer,
});
Expand Down Expand Up @@ -404,7 +403,6 @@ describe('ingestion reader tests with mock', function fD() {
qpConfig: testConfig.queuePopulator,
logger: dummyLogger,
extensions: [ingestionQP],
metricsProducer: { publishMetrics: () => { } },
s3Config: testConfig.s3,
producer,
});
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ describe('queue processor functional tests with mocking', () => {
async.parallel([
done => queueProcessorSF.stop(done),
done => queueProcessorAzure.stop(done),
// done => replicationStatusProcessor.stop(done),
done => replicationStatusProcessor.stop(done),
done => copyLocationResultsConsumer.close(done),
], done);
});
Expand Down

0 comments on commit 8d8381a

Please sign in to comment.