Skip to content

Commit

Permalink
properly pass compressionType and requiredAcks to BackbeatProducer
Browse files Browse the repository at this point in the history
Issue: BB-620
  • Loading branch information
Kerkesni committed Nov 20, 2024
1 parent a1ca408 commit a207d8e
Show file tree
Hide file tree
Showing 21 changed files with 54 additions and 1 deletion.
2 changes: 2 additions & 0 deletions extensions/gc/GarbageCollector.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class GarbageCollector extends EventEmitter {
kafka: {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._gcConfig.topic,
groupId: this._gcConfig.consumer.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/gc/GarbageCollectorProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class GarbageCollectorProducer {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
producer.once('error', () => {});
Expand Down
2 changes: 2 additions & 0 deletions extensions/lifecycle/LifecycleQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class LifecycleQueuePopulator extends QueuePopulatorExtension {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic,
});
producer.once('error', done);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ class LifecycleBucketProcessor {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._lcConfig.objectTasksTopic,
});
producer.once('error', err => {
Expand Down Expand Up @@ -418,6 +420,8 @@ class LifecycleBucketProcessor {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._lcConfig.bucketTasksTopic,
groupId: this._lcConfig.bucketProcessor.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ class LifecycleConductor {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic: this.lcConfig.bucketTasksTopic,
});
producer.once('error', cb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class LifecycleObjectProcessor extends EventEmitter {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic,
groupId: this._processConfig.groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
});
producer.once('error', cb);
producer.once('ready', () => {
Expand Down Expand Up @@ -121,6 +123,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic,
groupId: this._processConfig.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class MongoQueueProcessor {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
queueProcessor: this.processKafkaEntry.bind(this),
circuitBreaker: this.mongoProcessorConfig.circuitBreaker,
Expand Down
2 changes: 2 additions & 0 deletions extensions/notification/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class QueueProcessor extends EventEmitter {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic: internalTopic,
groupId: consumerGroupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/failedCRR/FailedCRRConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class FailedCRRConsumer {
kafka: {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._topic,
groupId: 'backbeat-retry-group',
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/failedCRR/FailedCRRProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class FailedCRRProducer {
this._producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
this._producer.once('error', () => {});
Expand Down
4 changes: 4 additions & 0 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ class QueueProcessor extends EventEmitter {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic: this.repConfig.replicationStatusTopic,
});
producer.once('error', done);
Expand Down Expand Up @@ -401,6 +403,8 @@ class QueueProcessor extends EventEmitter {
site: this.kafkaConfig.site,
backlogMetrics: options && options.enableBacklogMetrics ?
this.kafkaConfig.backlogMetrics : undefined,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic,
groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/replay/ReplayProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class ReplayProducer {
this._producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
this._producer.once('error', () => {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ class ReplicationStatusProcessor {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic: this.repConfig.replicationStatusTopic,
groupId: this.repConfig.replicationStatusProcessor.groupId,
Expand Down
9 changes: 9 additions & 0 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class BackbeatConsumer extends EventEmitter {
},
site: joi.string(),
maxPollIntervalMs: joi.number().min(45000).default(300000),
// Kafka producer params
compressionType: joi.string(),
requiredAcks: joi.number(),
}).required(),
topic: joi.string().required(),
groupId: joi.string().required(),
Expand All @@ -116,6 +119,8 @@ class BackbeatConsumer extends EventEmitter {
this._kafkaHosts = kafka.hosts;
this._kafkaBacklogMetricsConfig = kafka.backlogMetrics;
this._maxPollIntervalMs = kafka.maxPollIntervalMs;
this._producerCompressionType = kafka.compressionType;
this._producerRequiredAcks = kafka.requiredAcks;
this._site = kafka.site;
this._fromOffset = fromOffset;
this._log = new Logger(clientId);
Expand Down Expand Up @@ -839,6 +844,8 @@ class BackbeatConsumer extends EventEmitter {
assert.strictEqual(this._consumer, null);
producer = new BackbeatProducer({
kafka: { hosts: this._kafkaHosts },
compressionType: this._producerCompressionType,
requiredAcks: this._producerRequiredAcks,
topic: this._topic,
});
producer.on('ready', () => {
Expand Down Expand Up @@ -973,6 +980,8 @@ class BackbeatConsumer extends EventEmitter {
}));
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaHosts },
compressionType: this._producerCompressionType,
requiredAcks: this._producerRequiredAcks,
topic: this._topic,
});
return producer.on('ready', () => {
Expand Down
2 changes: 2 additions & 0 deletions lib/MetricsConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class MetricsConsumer {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic: this.mConfig.topic,
groupId: `${this.mConfig.groupIdPrefix}-${this._id}`,
Expand Down
2 changes: 2 additions & 0 deletions lib/MetricsProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class MetricsProducer {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
producer.once('error', done);
Expand Down
2 changes: 2 additions & 0 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,8 @@ class BackbeatAPI {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic,
});

Expand Down
2 changes: 1 addition & 1 deletion lib/config.joi.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const {

const KAFKA_PRODUCER_MESSAGE_MAX_BYTES = 5000020;
const KAFKA_PRODUCER_DEFAULT_COMPRESSION_TYPE = 'Zstd';
const KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS = 'all';
const KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS = -1; // all brokers
const logSourcesJoi = joi.string().valid('bucketd', 'mongo', 'ingestion',
'dmd', 'kafka');

Expand Down
2 changes: 2 additions & 0 deletions lib/queuePopulator/IngestionPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class IngestionPopulator {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic,
pollIntervalMs: POLL_INTERVAL_MS,
});
Expand Down
2 changes: 2 additions & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ class LogReader {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic,
});
producer.once('error', done);
Expand Down

0 comments on commit a207d8e

Please sign in to comment.