Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly pass compressionType and requiredAcks to BackbeatProducer #2573

Merged
merged 7 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/gc/GarbageCollector.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class GarbageCollector extends EventEmitter {
kafka: {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
},
topic: this._gcConfig.topic,
groupId: this._gcConfig.consumer.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/gc/GarbageCollectorProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class GarbageCollectorProducer {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
producer.once('error', () => {});
Expand Down
2 changes: 2 additions & 0 deletions extensions/lifecycle/LifecycleQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class LifecycleQueuePopulator extends QueuePopulatorExtension {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic,
});
producer.once('error', done);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ class LifecycleBucketProcessor {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._lcConfig.objectTasksTopic,
});
producer.once('error', err => {
Expand Down Expand Up @@ -418,6 +420,8 @@ class LifecycleBucketProcessor {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._lcConfig.bucketTasksTopic,
groupId: this._lcConfig.bucketProcessor.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ class LifecycleConductor {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic: this.lcConfig.bucketTasksTopic,
});
producer.once('error', cb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class LifecycleObjectProcessor extends EventEmitter {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic,
groupId: this._processConfig.groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
});
producer.once('error', cb);
producer.once('ready', () => {
Expand Down Expand Up @@ -121,6 +123,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
backlogMetrics: this._kafkaConfig.backlogMetrics,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic,
groupId: this._processConfig.groupId,
Expand Down
2 changes: 2 additions & 0 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class MongoQueueProcessor {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
queueProcessor: this.processKafkaEntry.bind(this),
circuitBreaker: this.mongoProcessorConfig.circuitBreaker,
Expand Down
10 changes: 10 additions & 0 deletions extensions/notification/NotificationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ const destinationSchema = joi.object({
internalTopic: joi.string(),
topic: joi.string().required(),
auth: authSchema.default({}),
requiredAcks: joi.number().when('type', {
is: joi.string().not('kafka'),
then: joi.forbidden(),
otherwise: joi.number().default(1),
}),
compressionType: joi.string().when('type', {
is: joi.string().not('kafka'),
then: joi.forbidden(),
otherwise: joi.string().default('none'),
}),
});

const joiSchema = joi.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class KafkaNotificationDestination extends NotificationDestination {
}

_setupProducer(done) {
const { host, port, topic, pollIntervalMs, auth } = this._destinationConfig;
const { host, port, topic, pollIntervalMs, auth, requiredAcks, compressionType } = this._destinationConfig;
let kafkaHost = host;
if (port) {
kafkaHost = `${host}:${port}`;
Expand All @@ -56,6 +56,8 @@ class KafkaNotificationDestination extends NotificationDestination {
topic,
pollIntervalMs,
auth,
compressionType,
requiredAcks,
});
producer.once('error', done);
producer.once('ready', () => {
Expand Down
4 changes: 0 additions & 4 deletions extensions/notification/destination/KafkaProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class KafkaProducer extends BackbeatProducer {
return 'NotificationProducer';
}

getRequireAcks() {
return 1;
}

setFromConfig(joiResult) {
super.setFromConfig(joiResult);
this._auth = joiResult.auth ? authUtil.generateKafkaAuthObject(joiResult.auth) : {};
Expand Down
37 changes: 2 additions & 35 deletions extensions/notification/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ const Logger = require('werelogs').Logger;
const async = require('async');
const assert = require('assert');
const { ZenkoMetrics } = require('arsenal').metrics;
const { wrapGaugeSet } = require('../../../lib/util/metrics');
const errors = require('arsenal').errors;

const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
const NotificationDestination = require('../destination');
const configUtil = require('../utils/config');
const messageUtil = require('../utils/message');
const NotificationConfigManager = require('../NotificationConfigManager');
const { notificationQueueProcessor } = require('../../../lib/constants').services;

const processedEvents = ZenkoMetrics.createCounter({
name: 's3_notification_queue_processor_events_total',
Expand All @@ -28,25 +26,6 @@ function onQueueProcessorEventProcessed(destination, eventType) {
});
}

const kafkaLagMetric = ZenkoMetrics.createGauge({
name: 's3_notification_queue_lag',
help: 'Number of update entries waiting to be consumed from the Kafka topic',
labelNames: ['origin', 'containerName', 'partition', 'serviceName'],
});

const defaultLabels = {
origin: 'notification',
};

/**
* Contains methods to incrememt different metrics
* @typedef {Object} MetricsHandler
* @property {GaugeSet} lag - kafka lag metric
*/
const metricsHandler = {
lag: wrapGaugeSet(kafkaLagMetric, defaultLabels),
};

class QueueProcessor extends EventEmitter {
/**
* Create a queue processor object to activate notification from a
Expand Down Expand Up @@ -94,7 +73,6 @@ class QueueProcessor extends EventEmitter {
this.kafkaConfig = kafkaConfig;
this.notifConfig = notifConfig;
this.destinationId = destinationId;
this.serviceName = notificationQueueProcessor;
this.destinationConfig
= notifConfig.destinations.find(dest => dest.resource === destinationId);
assert(this.destinationConfig, `Invalid destination argument "${destinationId}".` +
Expand Down Expand Up @@ -183,6 +161,8 @@ class QueueProcessor extends EventEmitter {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic: internalTopic,
groupId: consumerGroupId,
Expand Down Expand Up @@ -352,19 +332,6 @@ class QueueProcessor extends EventEmitter {
*/
async handleMetrics(res, log) {
log.debug('metrics requested');

if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) {
// consumer stats lag is on a different update cycle so we need to
// update the metrics when requested
const lagStats = this._consumer.consumerStats.lag;
Object.keys(lagStats).forEach(partition => {
metricsHandler.lag({
partition,
serviceName: this.serviceName,
}, lagStats[partition]);
});
}

res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/failedCRR/FailedCRRConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class FailedCRRConsumer {
kafka: {
hosts: this._kafkaConfig.hosts,
site: this._kafkaConfig.site,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
},
topic: this._topic,
groupId: 'backbeat-retry-group',
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/failedCRR/FailedCRRProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class FailedCRRProducer {
this._producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
this._producer.once('error', cb);
Expand Down
27 changes: 5 additions & 22 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry');
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const MetricsProducer = require('../../../lib/MetricsProducer');
const libConstants = require('../../../lib/constants');
const { wrapCounterInc, wrapHistogramObserve, wrapGaugeSet } = require('../../../lib/util/metrics');
const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');

const {
Expand Down Expand Up @@ -79,12 +79,6 @@ const metadataReplicationBytesMetric = ZenkoMetrics.createCounter({
labelNames: ['origin', 'serviceName', 'location'],
});

const kafkaLagMetric = ZenkoMetrics.createGauge({
name: 's3_replication_queue_lag',
help: 'Number of update entries waiting to be consumed from the Kafka topic',
labelNames: ['origin', 'containerName', 'partition', 'serviceName'],
});

const sourceDataBytesMetric = ZenkoMetrics.createCounter({
name: 's3_replication_source_data_bytes_total',
help: 'Total number of data bytes read from replication source',
Expand Down Expand Up @@ -129,7 +123,6 @@ const defaultLabels = {
* @property {CounterInc} dataReplicationBytes - Increments the replication bytes metric for data operation
* @property {CounterInc} metadataReplicationBytes - Increments the replication bytes metric for metadata operation
* @property {CounterInc} sourceDataBytes - Increments the source data bytes metric
* @property {GaugeSet} lag - Kafka lag metric
* @property {CounterInc} reads - Increments the read metric
* @property {CounterInc} writes - Increments the write metric
* @property {HistogramObserve} timeElapsed - Observes the time elapsed metric
Expand All @@ -140,7 +133,6 @@ const metricsHandler = {
dataReplicationBytes: wrapCounterInc(dataReplicationBytesMetric, defaultLabels),
metadataReplicationBytes: wrapCounterInc(metadataReplicationBytesMetric, defaultLabels),
sourceDataBytes: wrapCounterInc(sourceDataBytesMetric, defaultLabels),
lag: wrapGaugeSet(kafkaLagMetric, defaultLabels),
reads: wrapCounterInc(readMetric, defaultLabels),
writes: wrapCounterInc(writeMetric, defaultLabels),
timeElapsed: wrapHistogramObserve(timeElapsedMetric, defaultLabels),
Expand Down Expand Up @@ -380,6 +372,8 @@ class QueueProcessor extends EventEmitter {
const producer = new BackbeatProducer({
kafka: { hosts: this.kafkaConfig.hosts },
maxRequestSize: this.kafkaConfig.maxRequestSize,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
topic: this.repConfig.replicationStatusTopic,
});
producer.once('error', done);
Expand Down Expand Up @@ -409,12 +403,13 @@ class QueueProcessor extends EventEmitter {
site: this.kafkaConfig.site,
backlogMetrics: options && options.enableBacklogMetrics ?
this.kafkaConfig.backlogMetrics : undefined,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic,
groupId,
concurrency: this.repConfig.queueProcessor.concurrency,
queueProcessor: queueProcessorFunc,
logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS,
canary: true,
circuitBreaker: this.circuitBreakerConfig,
circuitBreakerMetrics: {
Expand Down Expand Up @@ -1029,18 +1024,6 @@ class QueueProcessor extends EventEmitter {
*/
static async handleMetrics(res, log) {
log.debug('metrics requested');

if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) {
// consumer stats lag is on a different update cycle so we need to
// update the metrics when requested
const lagStats = this._consumer.consumerStats.lag;
Object.keys(lagStats).forEach(partition => {
metricsHandler.lag({
partition,
serviceName: this.serviceName,
}, lagStats[partition]);
});
}
const metrics = await ZenkoMetrics.asPrometheus();
res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/replay/ReplayProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class ReplayProducer {
this._producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
compressionType: this._kafkaConfig.compressionType,
requiredAcks: this._kafkaConfig.requiredAcks,
topic: this._topic,
});
this._producer.once('error', cb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ const FailedCRRProducer = require('../failedCRR/FailedCRRProducer');
const ReplayProducer = require('../replay/ReplayProducer');
const MetricsProducer = require('../../../lib/MetricsProducer');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
const { replicationStatusProcessor } = require('../../../lib/constants').services;

// StatsClient constant default for site metrics
const INTERVAL = 300; // 5 minutes;
const constants = require('../../../lib/constants');
const {
wrapCounterInc,
wrapHistogramObserve,
wrapGaugeSet,
} = require('../../../lib/util/metrics');

/**
Expand Down Expand Up @@ -61,12 +60,6 @@ const loadMetricHandlers = jsutil.once(repConfig => {
labelNames: ['origin', 'replicationStatus'],
});

const kafkaLagMetric = ZenkoMetrics.createGauge({
name: 's3_replication_status_queue_lag',
help: 'Number of update entries waiting to be consumed from the Kafka topic',
labelNames: ['origin', 'containerName', 'partition', 'serviceName'],
});

const replicationStatusDurationSeconds = ZenkoMetrics.createHistogram({
name: 's3_replication_status_process_duration_seconds',
help: 'Duration of replication status processing',
Expand Down Expand Up @@ -145,7 +138,6 @@ const loadMetricHandlers = jsutil.once(repConfig => {
};
return {
status: wrapCounterInc(replicationStatusMetric, defaultLabels),
lag: wrapGaugeSet(kafkaLagMetric, defaultLabels),
statusDuration: wrapHistogramObserve(replicationStatusDurationSeconds,
defaultLabels),
replicationLatency: wrapHistogramObserve(replicationLatency,
Expand Down Expand Up @@ -225,7 +217,6 @@ class ReplicationStatusProcessor {
this._consumer = null;
this._gcProducer = null;
this._mProducer = null;
this.serviceName = replicationStatusProcessor;

this.logger =
new Logger('Backbeat:Replication:ReplicationStatusProcessor');
Expand Down Expand Up @@ -399,6 +390,8 @@ class ReplicationStatusProcessor {
kafka: {
hosts: this.kafkaConfig.hosts,
site: this.kafkaConfig.site,
compressionType: this.kafkaConfig.compressionType,
requiredAcks: this.kafkaConfig.requiredAcks,
},
topic: this.repConfig.replicationStatusTopic,
groupId: this.repConfig.replicationStatusProcessor.groupId,
Expand Down Expand Up @@ -561,19 +554,6 @@ class ReplicationStatusProcessor {
async handleMetrics(res, log) {
log.debug('metrics requested');
const metrics = await ZenkoMetrics.asPrometheus();

if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) {
// consumer stats lag is on a different update cycle so we need to
// update the metrics when requested
const lagStats = this._consumer.consumerStats.lag;
Object.keys(lagStats).forEach(partition => {
this.metricHandlers.lag({
partition,
serviceName: this.serviceName,
}, lagStats[partition]);
});
}

res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
Expand Down
Loading
Loading