diff --git a/pom.xml b/pom.xml
index c9506bb2..4d3f31f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.pinterest.singer
singer-package
- 0.8.0.26
+ 0.8.0.27
pom
Singer Logging Agent modules
2013
diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml
index 9e497eed..6fb36da1 100644
--- a/singer-commons/pom.xml
+++ b/singer-commons/pom.xml
@@ -20,7 +20,7 @@
com.pinterest.singer
singer-package
- 0.8.0.26
+ 0.8.0.27
../pom.xml
diff --git a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java
index aee3af19..f97d00fa 100644
--- a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java
+++ b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java
@@ -141,7 +141,7 @@ public class AuditEventKafkaSender implements LoggingAuditEventSender {
private Thread thread;
/**
- * For each partition, track the number of sending failures happened to this partition.
+ * List of PartitionInfo
*/
private List partitionInfoList = new ArrayList<>();
@@ -165,6 +165,14 @@ public class AuditEventKafkaSender implements LoggingAuditEventSender {
private Map eventTriedCount = new ConcurrentHashMap<>();
+ /**
+ * currentPartitionId specifies the partition of audit_event topic used to receive audit events.
+ * The currentPartitionId will be reset in resetCurrentPartitionIdIfNeeded() method. This reduces
+ * the number of TCP connections from audit client to the Kafka Cluster hosting the audit_event
+ * topic.
+ */
+ private int currentPartitionId = -1;
+
public AuditEventKafkaSender(KafkaSenderConfig config,
LinkedBlockingDeque queue,
LoggingAuditStage stage, String host, String name) {
@@ -174,6 +182,7 @@ public AuditEventKafkaSender(KafkaSenderConfig config,
this.host = host;
this.name = name;
this.stopGracePeriodInSeconds = config.getStopGracePeriodInSeconds();
+ this.badPartitions.add(-1);
}
@@ -185,24 +194,12 @@ public void setKafkaProducer(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
- public int getAlternatePartition(int numOfPartitions) {
- int randomPartition = 0;
- int trial = 0;
- while (trial < MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION) {
- trial += 1;
- randomPartition = ThreadLocalRandom.current().nextInt(numOfPartitions);
- if (!badPartitions.contains(randomPartition)) {
- break;
- }
- }
- return randomPartition;
- }
-
private void refreshPartitionIfNeeded() {
// refresh every 30 seconds
if (System.currentTimeMillis() - lastTimeUpdate > 1000 * PARTITIONS_REFRESH_INTERVAL_IN_SECONDS) {
try {
badPartitions.clear();
+ badPartitions.add(-1);
partitionInfoList = this.kafkaProducer.partitionsFor(topic);
lastTimeUpdate = System.currentTimeMillis();
OpenTsdbMetricConverter.incr(
@@ -214,6 +211,36 @@ private void refreshPartitionIfNeeded() {
"host=" + host, "stage=" + stage.toString());
}
}
+ resetCurrentPartitionIdIfNeeded();
+ }
+
+ private void resetCurrentPartitionIdIfNeeded() {
+ if (partitionInfoList.size() == 0) {
+ currentPartitionId = -1;
+ return;
+ }
+ if (badPartitions.contains(currentPartitionId)){
+ int trial = 0;
+ while (trial < MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION) {
+ trial += 1;
+ int index = ThreadLocalRandom.current().nextInt(partitionInfoList.size());
+ int randomPartition = partitionInfoList.get(index).partition();
+ if (!badPartitions.contains(randomPartition)) {
+ LOG.warn("Change current partition of audit event topic from {} to {}", currentPartitionId,
+ randomPartition);
+ currentPartitionId = randomPartition;
+ OpenTsdbMetricConverter.incr(
+ LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_CURRENT_PARTITION_RESET, 1,
+ "host=" + host, "stage=" + stage.toString());
+ return;
+ }
+ }
+ currentPartitionId = partitionInfoList.get(ThreadLocalRandom.current().nextInt(
+ partitionInfoList.size())).partition();
+ LOG.warn("After {} trials, set current partition to {}",
+ MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION, currentPartitionId);
+
+ }
}
/**
@@ -236,9 +263,8 @@ public void run() {
if (event != null) {
try {
value = serializer.serialize(event);
- int partition = getAlternatePartition(partitionInfoList.size());
- record = new ProducerRecord<>(this.topic, partition , null, value);
- kafkaProducer.send(record, new KafkaProducerCallback(event, partition));
+ record = new ProducerRecord<>(this.topic, currentPartitionId , null, value);
+ kafkaProducer.send(record, new KafkaProducerCallback(event, currentPartitionId));
} catch (TException e) {
LOG.debug("[{}] failed to construct ProducerRecord because of serialization exception.",
Thread.currentThread().getName(), e);
diff --git a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java
index 4b0cfdf7..5351a024 100644
--- a/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java
+++ b/singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/common/LoggingAuditClientMetrics.java
@@ -42,6 +42,7 @@ public class LoggingAuditClientMetrics {
public static final String AUDIT_CLIENT_SENDER_KAFKA_PARTITIONS_REFRESH_ERROR = "audit.client.sender.kafka.partitions_refresh_error";
public static final String AUDIT_CLIENT_SENDER_KAFKA_PARTITIONS_REFRESH_COUNT = "audit.client.sender.kafka.partitions_refresh_count";
+ public static final String AUDIT_CLIENT_SENDER_KAFKA_CURRENT_PARTITION_RESET = "audit.client.sender.kafka.current_partition_reset";
public static final String AUDIT_CLIENT_SENDER_KAFKA_CALLBACK_EXCEPTION = "audit.client.sender.kafka.callback_exception";
}
diff --git a/singer/pom.xml b/singer/pom.xml
index d5539c63..23970af3 100644
--- a/singer/pom.xml
+++ b/singer/pom.xml
@@ -7,7 +7,7 @@
com.pinterest.singer
singer-package
- 0.8.0.26
+ 0.8.0.27
../pom.xml
diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml
index efdd501c..41d24f7b 100644
--- a/thrift-logger/pom.xml
+++ b/thrift-logger/pom.xml
@@ -4,7 +4,7 @@
com.pinterest.singer
singer-package
- 0.8.0.26
+ 0.8.0.27
../pom.xml
thrift-logger