From f0f885c5aa64c157a9754a57a63ccaa8e328a0b7 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 20 Aug 2020 00:50:46 -0700 Subject: [PATCH] Instead of choose random partition, AuditEventKafkaSender use a fixed currentPartitionId of the audit_event topic to send audit events. This reduces the number of TCP connections from audit client to Kafka cluster hosting the auditi_event topic. currentPartitionId is updated if it is in badPartitions set. --- pom.xml | 2 +- singer-commons/pom.xml | 2 +- .../client/AuditEventKafkaSender.java | 58 +++++++++++++------ .../common/LoggingAuditClientMetrics.java | 1 + singer/pom.xml | 2 +- thrift-logger/pom.xml | 2 +- 6 files changed, 46 insertions(+), 21 deletions(-) diff --git a/pom.xml b/pom.xml index c9506bb2..4d3f31f0 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.pinterest.singer singer-package - 0.8.0.26 + 0.8.0.27 pom Singer Logging Agent modules 2013 diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml index 9e497eed..6fb36da1 100644 --- a/singer-commons/pom.xml +++ b/singer-commons/pom.xml @@ -20,7 +20,7 @@ com.pinterest.singer singer-package - 0.8.0.26 + 0.8.0.27 ../pom.xml diff --git a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java index aee3af19..a92a6d44 100644 --- a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java +++ b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java @@ -141,7 +141,7 @@ public class AuditEventKafkaSender implements LoggingAuditEventSender { private Thread thread; /** - * For each partition, track the number of sending failures happened to this partition. + * List of PartitionInfo */ private List partitionInfoList = new ArrayList<>(); @@ -165,6 +165,14 @@ public class AuditEventKafkaSender implements LoggingAuditEventSender { private Map eventTriedCount = new ConcurrentHashMap<>(); + /** + * currentPartitionId specifies the partition of audit_event topic used to receive audit events. + * The currentPartitionId will be reset in resetCurrentPartitionIdIfNeeded() method. This reduces + * the number of TCP connections from audit client to the Kafka Cluster hosting the audit_event + * topic. + */ + private int currentPartitionId = -1; + public AuditEventKafkaSender(KafkaSenderConfig config, LinkedBlockingDeque queue, LoggingAuditStage stage, String host, String name) { @@ -174,6 +182,7 @@ public AuditEventKafkaSender(KafkaSenderConfig config, this.host = host; this.name = name; this.stopGracePeriodInSeconds = config.getStopGracePeriodInSeconds(); + this.badPartitions.add(-1); } @@ -185,24 +194,12 @@ public void setKafkaProducer(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; } - public int getAlternatePartition(int numOfPartitions) { - int randomPartition = 0; - int trial = 0; - while (trial < MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION) { - trial += 1; - randomPartition = ThreadLocalRandom.current().nextInt(numOfPartitions); - if (!badPartitions.contains(randomPartition)) { - break; - } - } - return randomPartition; - } - private void refreshPartitionIfNeeded() { // refresh every 30 seconds if (System.currentTimeMillis() - lastTimeUpdate > 1000 * PARTITIONS_REFRESH_INTERVAL_IN_SECONDS) { try { badPartitions.clear(); + badPartitions.add(-1); partitionInfoList = this.kafkaProducer.partitionsFor(topic); lastTimeUpdate = System.currentTimeMillis(); OpenTsdbMetricConverter.incr( @@ -216,6 +213,33 @@ private void refreshPartitionIfNeeded() { } } + private void resetCurrentPartitionIdIfNeeded() { + if (partitionInfoList.size() == 0) { + currentPartitionId = -1; + return; + } + if (badPartitions.contains(currentPartitionId)){ + int trial = 0; + while (trial < MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION) { + trial += 1; + int index = ThreadLocalRandom.current().nextInt(partitionInfoList.size()); + int randomPartition = partitionInfoList.get(index).partition(); + if (!badPartitions.contains(randomPartition)) { + LOG.warn("Change current partition of audit event topic from {} to {}", currentPartitionId, + randomPartition); + currentPartitionId = randomPartition; + OpenTsdbMetricConverter.incr( + LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_DEFAULT_PARTITION_RESET, 1, + "host=" + host, "stage=" + stage.toString()); + return; + } + } + } + if (currentPartitionId == -1) { + currentPartitionId = 0; + } + } + /** * Sender dequeues LoggingAuditEvents and sends them to Kafka cluster. If send to one partition * fails, it will choose another partition. For each event, it will try at most @@ -232,13 +256,13 @@ public void run() { while (!cancelled.get()) { try { refreshPartitionIfNeeded(); + resetCurrentPartitionIdIfNeeded(); event = queue.poll(DEQUEUE_WAIT_IN_SECONDS, TimeUnit.SECONDS); if (event != null) { try { value = serializer.serialize(event); - int partition = getAlternatePartition(partitionInfoList.size()); - record = new ProducerRecord<>(this.topic, partition , null, value); - kafkaProducer.send(record, new KafkaProducerCallback(event, partition)); + record = new ProducerRecord<>(this.topic, currentPartitionId , null, value); + kafkaProducer.send(record, new KafkaProducerCallback(event, currentPartitionId)); } catch (TException e) { LOG.debug("[{}] failed to construct ProducerRecord because of serialization exception.", Thread.currentThread().getName(), e); diff --git a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java index 4b0cfdf7..d916ce45 100644 --- a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java +++ b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java @@ -42,6 +42,7 @@ public class LoggingAuditClientMetrics { public static final String AUDIT_CLIENT_SENDER_KAFKA_PARTITIONS_REFRESH_ERROR = "audit.client.sender.kafka.partitions_refresh_error"; public static final String AUDIT_CLIENT_SENDER_KAFKA_PARTITIONS_REFRESH_COUNT = "audit.client.sender.kafka.partitions_refresh_count"; + public static final String AUDIT_CLIENT_SENDER_KAFKA_DEFAULT_PARTITION_RESET = "audit.client.sender.kafka.default_partition_reset"; public static final String AUDIT_CLIENT_SENDER_KAFKA_CALLBACK_EXCEPTION = "audit.client.sender.kafka.callback_exception"; } diff --git a/singer/pom.xml b/singer/pom.xml index d5539c63..23970af3 100644 --- a/singer/pom.xml +++ b/singer/pom.xml @@ -7,7 +7,7 @@ com.pinterest.singer singer-package - 0.8.0.26 + 0.8.0.27 ../pom.xml diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml index efdd501c..41d24f7b 100644 --- a/thrift-logger/pom.xml +++ b/thrift-logger/pom.xml @@ -4,7 +4,7 @@ com.pinterest.singer singer-package - 0.8.0.26 + 0.8.0.27 ../pom.xml thrift-logger