Skip to content

Commit

Permalink
Enable consumer to trace all subscriptions (opensource4you#576)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 11, 2022
1 parent 20de1dd commit 30b8996
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.astraea.app.consumer;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.astraea.app.admin.TopicPartition;

/**
* This inherited consumer offers function related to consumer group.
Expand All @@ -42,4 +45,10 @@ public interface SubscribedConsumer<Key, Value> extends Consumer<Key, Value> {

/** @return group instance id (static member) */
Optional<String> groupInstanceId();

/**
* @return the historical subscription. key is the time of getting assignments. value is the
* assignments.
*/
Map<Long, Set<TopicPartition>> historicalSubscription();
}
44 changes: 40 additions & 4 deletions app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -34,6 +36,8 @@ public class TopicsBuilder<Key, Value> extends Builder<Key, Value> {
private final Set<String> topics;
private ConsumerRebalanceListener listener = ignore -> {};

private boolean enableTrace = false;

TopicsBuilder(Set<String> topics) {
this.topics = requireNonNull(topics);
}
Expand Down Expand Up @@ -119,6 +123,17 @@ public TopicsBuilder<Key, Value> isolation(Isolation isolation) {
return this;
}

/**
* enable to trace the historical subscription. see {@link
* SubscribedConsumer#historicalSubscription()}
*
* @return this builder
*/
public TopicsBuilder<Key, Value> enableTrace() {
this.enableTrace = true;
return this;
}

public TopicsBuilder<Key, Value> disableAutoCommitOffsets() {
return config(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
Expand All @@ -135,11 +150,22 @@ public SubscribedConsumer<Key, Value> build() {
Deserializer.of((Deserializer<Key>) keyDeserializer),
Deserializer.of((Deserializer<Value>) valueDeserializer));

var tracker =
new ConsumerRebalanceListener() {
private final Map<Long, Set<TopicPartition>> history = new ConcurrentHashMap<>();

@Override
public void onPartitionAssigned(Set<TopicPartition> partitions) {
if (enableTrace) history.put(System.currentTimeMillis(), Set.copyOf(partitions));
}
};

if (seekStrategy != SeekStrategy.NONE) {
// make sure this consumer is assigned before seeking
var latch = new CountDownLatch(1);
kafkaConsumer.subscribe(
topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
topics,
ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown(), tracker)));
while (latch.getCount() != 0) {
// the offset will be reset, so it is fine to poll data
// TODO: should we disable auto-commit here?
Expand All @@ -148,26 +174,31 @@ public SubscribedConsumer<Key, Value> build() {
}
} else {
// nothing to seek so we just subscribe topics
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener, tracker)));
}

seekStrategy.apply(kafkaConsumer, seekValue);

return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener);
return new SubscribedConsumerImpl<>(
kafkaConsumer, topics, listener, Collections.unmodifiableMap(tracker.history));
}

private static class SubscribedConsumerImpl<Key, Value> extends Builder.BaseConsumer<Key, Value>
implements SubscribedConsumer<Key, Value> {
private final Set<String> topics;
private final ConsumerRebalanceListener listener;

private final Map<Long, Set<TopicPartition>> history;

public SubscribedConsumerImpl(
org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer,
Set<String> topics,
ConsumerRebalanceListener listener) {
ConsumerRebalanceListener listener,
Map<Long, Set<TopicPartition>> history) {
super(kafkaConsumer);
this.topics = topics;
this.listener = listener;
this.history = history;
}

@Override
Expand All @@ -189,6 +220,11 @@ public Optional<String> groupInstanceId() {
return kafkaConsumer.groupMetadata().groupInstanceId();
}

@Override
public Map<Long, Set<TopicPartition>> historicalSubscription() {
return history;
}

@Override
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
Expand Down
68 changes: 68 additions & 0 deletions app/src/test/java/org/astraea/app/consumer/ConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,72 @@ void testCreateConsumersConcurrent() throws ExecutionException, InterruptedExcep
closed.set(true);
fs.get();
}

@Test
void testHistoricalSubscription() {
var partitions = 3;
var topic = Utils.randomString(10);
try (var admin = Admin.of(bootstrapServers())) {
admin.creator().topic(topic).numberOfPartitions(partitions).create();
Utils.sleep(Duration.ofSeconds(3));
}

var groupId = Utils.randomString(10);
var closed = new AtomicBoolean(false);

var consumers =
IntStream.range(0, 2)
.mapToObj(
ignored ->
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.groupId(groupId)
.enableTrace()
.build())
.collect(Collectors.toUnmodifiableList());

var fs =
Utils.sequence(
consumers.stream()
.map(
c ->
CompletableFuture.runAsync(
() -> {
try (c) {
while (!closed.get()) c.poll(Duration.ofSeconds(1));
}
}))
.collect(Collectors.toUnmodifiableList()));

Utils.waitFor(() -> consumers.stream().allMatch(c -> c.historicalSubscription().size() >= 1));

// create another consumer to trigger balance
try (var consumer =
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.groupId(groupId)
.enableTrace()
.build()) {

Utils.waitFor(
() -> {
consumer.poll(Duration.ofSeconds(1));
return consumers.stream().allMatch(c -> c.historicalSubscription().size() > 1);
});
}

// produce data to make sure the balance get done.
produceData(topic, 100);

try (var consumer =
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.groupId(groupId)
.fromBeginning()
.build()) {
Utils.waitFor(() -> !consumer.poll(Duration.ofSeconds(1)).isEmpty());
Assertions.assertTrue(
consumers.stream().anyMatch(c -> c.historicalSubscription().size() > 1));
}
}
}

0 comments on commit 30b8996

Please sign in to comment.