From 30b89969d8bd9a2babbdb1254b2b0aa029787f43 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 12 Aug 2022 02:08:59 +0800 Subject: [PATCH] Enable consumer to trace all subscriptions (#576) --- .../app/consumer/SubscribedConsumer.java | 9 +++ .../astraea/app/consumer/TopicsBuilder.java | 44 ++++++++++-- .../astraea/app/consumer/ConsumerTest.java | 68 +++++++++++++++++++ 3 files changed, 117 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java b/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java index 3690f074f9..369d190d36 100644 --- a/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java +++ b/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java @@ -17,7 +17,10 @@ package org.astraea.app.consumer; import java.time.Duration; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import org.astraea.app.admin.TopicPartition; /** * This inherited consumer offers function related to consumer group. @@ -42,4 +45,10 @@ public interface SubscribedConsumer extends Consumer { /** @return group instance id (static member) */ Optional groupInstanceId(); + + /** + * @return the historical subscription. key is the time of getting assignments. value is the + * assignments. + */ + Map> historicalSubscription(); } diff --git a/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java b/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java index 0e350e4f28..16f19949c7 100644 --- a/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java +++ b/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java @@ -19,11 +19,13 @@ import static java.util.Objects.requireNonNull; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,6 +36,8 @@ public class TopicsBuilder extends Builder { private final Set topics; private ConsumerRebalanceListener listener = ignore -> {}; + private boolean enableTrace = false; + TopicsBuilder(Set topics) { this.topics = requireNonNull(topics); } @@ -119,6 +123,17 @@ public TopicsBuilder isolation(Isolation isolation) { return this; } + /** + * enable to trace the historical subscription. see {@link + * SubscribedConsumer#historicalSubscription()} + * + * @return this builder + */ + public TopicsBuilder enableTrace() { + this.enableTrace = true; + return this; + } + public TopicsBuilder disableAutoCommitOffsets() { return config(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } @@ -135,11 +150,22 @@ public SubscribedConsumer build() { Deserializer.of((Deserializer) keyDeserializer), Deserializer.of((Deserializer) valueDeserializer)); + var tracker = + new ConsumerRebalanceListener() { + private final Map> history = new ConcurrentHashMap<>(); + + @Override + public void onPartitionAssigned(Set partitions) { + if (enableTrace) history.put(System.currentTimeMillis(), Set.copyOf(partitions)); + } + }; + if (seekStrategy != SeekStrategy.NONE) { // make sure this consumer is assigned before seeking var latch = new CountDownLatch(1); kafkaConsumer.subscribe( - topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); + topics, + ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown(), tracker))); while (latch.getCount() != 0) { // the offset will be reset, so it is fine to poll data // TODO: should we disable auto-commit here? @@ -148,12 +174,13 @@ public SubscribedConsumer build() { } } else { // nothing to seek so we just subscribe topics - kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); + kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener, tracker))); } seekStrategy.apply(kafkaConsumer, seekValue); - return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener); + return new SubscribedConsumerImpl<>( + kafkaConsumer, topics, listener, Collections.unmodifiableMap(tracker.history)); } private static class SubscribedConsumerImpl extends Builder.BaseConsumer @@ -161,13 +188,17 @@ private static class SubscribedConsumerImpl extends Builder.BaseCons private final Set topics; private final ConsumerRebalanceListener listener; + private final Map> history; + public SubscribedConsumerImpl( org.apache.kafka.clients.consumer.Consumer kafkaConsumer, Set topics, - ConsumerRebalanceListener listener) { + ConsumerRebalanceListener listener, + Map> history) { super(kafkaConsumer); this.topics = topics; this.listener = listener; + this.history = history; } @Override @@ -189,6 +220,11 @@ public Optional groupInstanceId() { return kafkaConsumer.groupMetadata().groupInstanceId(); } + @Override + public Map> historicalSubscription() { + return history; + } + @Override protected void doResubscribe() { kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); diff --git a/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java b/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java index 9b0f9721eb..ee198e96dd 100644 --- a/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java +++ b/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java @@ -447,4 +447,72 @@ void testCreateConsumersConcurrent() throws ExecutionException, InterruptedExcep closed.set(true); fs.get(); } + + @Test + void testHistoricalSubscription() { + var partitions = 3; + var topic = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(partitions).create(); + Utils.sleep(Duration.ofSeconds(3)); + } + + var groupId = Utils.randomString(10); + var closed = new AtomicBoolean(false); + + var consumers = + IntStream.range(0, 2) + .mapToObj( + ignored -> + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .groupId(groupId) + .enableTrace() + .build()) + .collect(Collectors.toUnmodifiableList()); + + var fs = + Utils.sequence( + consumers.stream() + .map( + c -> + CompletableFuture.runAsync( + () -> { + try (c) { + while (!closed.get()) c.poll(Duration.ofSeconds(1)); + } + })) + .collect(Collectors.toUnmodifiableList())); + + Utils.waitFor(() -> consumers.stream().allMatch(c -> c.historicalSubscription().size() >= 1)); + + // create another consumer to trigger balance + try (var consumer = + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .groupId(groupId) + .enableTrace() + .build()) { + + Utils.waitFor( + () -> { + consumer.poll(Duration.ofSeconds(1)); + return consumers.stream().allMatch(c -> c.historicalSubscription().size() > 1); + }); + } + + // produce data to make sure the balance get done. + produceData(topic, 100); + + try (var consumer = + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .groupId(groupId) + .fromBeginning() + .build()) { + Utils.waitFor(() -> !consumer.poll(Duration.ofSeconds(1)).isEmpty()); + Assertions.assertTrue( + consumers.stream().anyMatch(c -> c.historicalSubscription().size() > 1)); + } + } }