Skip to content

Commit

Permalink
Instead of choose random partition, AuditEventKafkaSender use a fixed…
Browse files Browse the repository at this point in the history
… currentPartitionId of the audit_event topic to send audit events.

This reduces the number of TCP connections from audit client to Kafka cluster hosting the auditi_event topic.
currentPartitionId is updated if it is in badPartitions set.
  • Loading branch information
zzhhhzz committed Aug 20, 2020
1 parent 7e0646a commit 9191193
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.26</version>
<version>0.8.0.27</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.26</version>
<version>0.8.0.27</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class AuditEventKafkaSender implements LoggingAuditEventSender {
private Thread thread;

/**
* For each partition, track the number of sending failures happened to this partition.
* List of PartitionInfo
*/
private List<PartitionInfo> partitionInfoList = new ArrayList<>();

Expand All @@ -165,6 +165,14 @@ public class AuditEventKafkaSender implements LoggingAuditEventSender {

private Map<LoggingAuditHeaders, Integer> eventTriedCount = new ConcurrentHashMap<>();

/**
* currentPartitionId specifies the partition of audit_event topic used to receive audit events.
* The currentPartitionId will be reset in resetCurrentPartitionIdIfNeeded() method. This reduces
* the number of TCP connections from audit client to the Kafka Cluster hosting the audit_event
* topic.
*/
private int currentPartitionId = -1;

public AuditEventKafkaSender(KafkaSenderConfig config,
LinkedBlockingDeque<LoggingAuditEvent> queue,
LoggingAuditStage stage, String host, String name) {
Expand All @@ -174,6 +182,7 @@ public AuditEventKafkaSender(KafkaSenderConfig config,
this.host = host;
this.name = name;
this.stopGracePeriodInSeconds = config.getStopGracePeriodInSeconds();
this.badPartitions.add(-1);
}


Expand All @@ -185,24 +194,12 @@ public void setKafkaProducer(KafkaProducer<byte[], byte[]> kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}

public int getAlternatePartition(int numOfPartitions) {
int randomPartition = 0;
int trial = 0;
while (trial < MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION) {
trial += 1;
randomPartition = ThreadLocalRandom.current().nextInt(numOfPartitions);
if (!badPartitions.contains(randomPartition)) {
break;
}
}
return randomPartition;
}

private void refreshPartitionIfNeeded() {
// refresh every 30 seconds
if (System.currentTimeMillis() - lastTimeUpdate > 1000 * PARTITIONS_REFRESH_INTERVAL_IN_SECONDS) {
try {
badPartitions.clear();
badPartitions.add(-1);
partitionInfoList = this.kafkaProducer.partitionsFor(topic);
lastTimeUpdate = System.currentTimeMillis();
OpenTsdbMetricConverter.incr(
Expand All @@ -214,6 +211,36 @@ private void refreshPartitionIfNeeded() {
"host=" + host, "stage=" + stage.toString());
}
}
resetCurrentPartitionIdIfNeeded();
}

private void resetCurrentPartitionIdIfNeeded() {
if (partitionInfoList.size() == 0) {
currentPartitionId = -1;
return;
}
if (badPartitions.contains(currentPartitionId)){
int trial = 0;
while (trial < MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION) {
trial += 1;
int index = ThreadLocalRandom.current().nextInt(partitionInfoList.size());
int randomPartition = partitionInfoList.get(index).partition();
if (!badPartitions.contains(randomPartition)) {
LOG.warn("Change current partition of audit event topic from {} to {}", currentPartitionId,
randomPartition);
currentPartitionId = randomPartition;
OpenTsdbMetricConverter.incr(
LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_CURRENT_PARTITION_RESET, 1,
"host=" + host, "stage=" + stage.toString());
return;
}
}
currentPartitionId = partitionInfoList.get(ThreadLocalRandom.current().nextInt(
partitionInfoList.size())).partition();
LOG.warn("After {} trials, set current partition to {}",
MAX_RETRIES_FOR_SELECTION_RANDOM_PARTITION, currentPartitionId);

}
}

/**
Expand All @@ -236,9 +263,8 @@ public void run() {
if (event != null) {
try {
value = serializer.serialize(event);
int partition = getAlternatePartition(partitionInfoList.size());
record = new ProducerRecord<>(this.topic, partition , null, value);
kafkaProducer.send(record, new KafkaProducerCallback(event, partition));
record = new ProducerRecord<>(this.topic, currentPartitionId , null, value);
kafkaProducer.send(record, new KafkaProducerCallback(event, currentPartitionId));
} catch (TException e) {
LOG.debug("[{}] failed to construct ProducerRecord because of serialization exception.",
Thread.currentThread().getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class LoggingAuditClientMetrics {

public static final String AUDIT_CLIENT_SENDER_KAFKA_PARTITIONS_REFRESH_ERROR = "audit.client.sender.kafka.partitions_refresh_error";
public static final String AUDIT_CLIENT_SENDER_KAFKA_PARTITIONS_REFRESH_COUNT = "audit.client.sender.kafka.partitions_refresh_count";
public static final String AUDIT_CLIENT_SENDER_KAFKA_CURRENT_PARTITION_RESET = "audit.client.sender.kafka.current_partition_reset";

public static final String AUDIT_CLIENT_SENDER_KAFKA_CALLBACK_EXCEPTION = "audit.client.sender.kafka.callback_exception";
}
2 changes: 1 addition & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.26</version>
<version>0.8.0.27</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.26</version>
<version>0.8.0.27</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down

0 comments on commit 9191193

Please sign in to comment.