From a6d3f90599b185cf0569a23826a06a9d42af7110 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 19 Aug 2019 08:43:31 -0400 Subject: [PATCH 01/18] Only check for leader changes when there is new metadata --- .../clients/consumer/internals/Fetcher.java | 26 +++++++++++++------ .../consumer/internals/SubscriptionState.java | 4 +++ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c1c42bd06ca4f..81fcfeafce6b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -153,6 +153,8 @@ public class Fetcher implements Closeable { private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; private final Set nodesWithPendingFetchRequests; private final ApiVersions apiVersions; + private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + private PartitionRecords nextInLineRecords = null; @@ -479,10 +481,14 @@ public void validateOffsetsIfNeeded() { throw exception; // Validate each partition against the current leader and epoch - subscriptions.assignedPartitions().forEach(topicPartition -> { - ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition); - subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch); - }); + // If we see a new metadata version, check all partitions + int newMetadataUpdateVersion = metadata.updateVersion(); + if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { + subscriptions.forEachAssignedPartition(topicPartition -> { + ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition); + subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch); + }); + } // Collect positions needing validation, with backoff Map partitionsToValidate = subscriptions @@ -769,6 +775,7 @@ private void validateOffsetsAsync(Map> regrouped = regroupFetchPositionsByLeader(partitionsToValidate); + long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; regrouped.forEach((node, fetchPostitions) -> { if (node.isEmpty()) { metadata.requestUpdate(); @@ -791,7 +798,7 @@ private void validateOffsetsAsync(Map future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate); future.addListener(new RequestFutureListener() { @@ -1091,9 +1098,12 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren private Map prepareFetchRequests() { Map fetchable = new LinkedHashMap<>(); - // Ensure the position has an up-to-date leader - subscriptions.assignedPartitions().forEach( - tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); + int newMetadataUpdateVersion = metadata.updateVersion(); + if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { + // Ensure the position has an up-to-date leader + subscriptions.forEachAssignedPartition( + tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); + } long currentTimeMs = time.milliseconds(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 7c965ae58a55d..94a361c5ba2e4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -388,6 +388,10 @@ public synchronized Set assignedPartitions() { return new HashSet<>(this.assignment.partitionSet()); } + public void forEachAssignedPartition(Consumer consumer) { + this.assignment.partitionSet().forEach(consumer); + } + /** * @return a modifiable copy of the currently assigned partitions as a list */ From b21a304279b23fe803a610f9634f6240c09b6d22 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 19 Aug 2019 09:33:31 -0400 Subject: [PATCH 02/18] Add missing import --- .../kafka/clients/consumer/internals/SubscriptionState.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 94a361c5ba2e4..1060819b8e267 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -40,6 +41,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; From b2e9ccef28df013627ffc835b661dc13ba3c7565 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 19 Aug 2019 13:20:56 -0400 Subject: [PATCH 03/18] checkstyle --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 81fcfeafce6b9..2e9e938862232 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1102,7 +1102,7 @@ private Map prepareFetchRequests() { if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { // Ensure the position has an up-to-date leader subscriptions.forEachAssignedPartition( - tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); + tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); } long currentTimeMs = time.milliseconds(); From c4f75a002dbbb26437d5d5330c9d38ffafef595d Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 19 Aug 2019 14:23:18 -0400 Subject: [PATCH 04/18] Feedback from PR --- .../clients/consumer/internals/Fetcher.java | 27 ++++++++++--------- .../consumer/internals/SubscriptionState.java | 2 +- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2e9e938862232..25865e49c76f5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -482,13 +482,7 @@ public void validateOffsetsIfNeeded() { // Validate each partition against the current leader and epoch // If we see a new metadata version, check all partitions - int newMetadataUpdateVersion = metadata.updateVersion(); - if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { - subscriptions.forEachAssignedPartition(topicPartition -> { - ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition); - subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch); - }); - } + maybeValidateAssignments(); // Collect positions needing validation, with backoff Map partitionsToValidate = subscriptions @@ -1091,6 +1085,18 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren } } + /** + * If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then + * we should check that all of the assignments have a valid position. + */ + private void maybeValidateAssignments() { + int newMetadataUpdateVersion = metadata.updateVersion(); + if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { + subscriptions.forEachAssignedPartition( + tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); + } + } + /** * Create fetch requests for all nodes for which we have assigned partitions * that have no existing requests in flight. @@ -1098,12 +1104,7 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren private Map prepareFetchRequests() { Map fetchable = new LinkedHashMap<>(); - int newMetadataUpdateVersion = metadata.updateVersion(); - if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { - // Ensure the position has an up-to-date leader - subscriptions.forEachAssignedPartition( - tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); - } + maybeValidateAssignments(); long currentTimeMs = time.milliseconds(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 1060819b8e267..d08e61d9afe91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -390,7 +390,7 @@ public synchronized Set assignedPartitions() { return new HashSet<>(this.assignment.partitionSet()); } - public void forEachAssignedPartition(Consumer consumer) { + public synchronized void forEachAssignedPartition(Consumer consumer) { this.assignment.partitionSet().forEach(consumer); } From 5ee309ed28864ca9d43e61d024786e5de47f205d Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 19 Aug 2019 15:09:41 -0400 Subject: [PATCH 05/18] checkstyle --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- .../kafka/clients/consumer/internals/SubscriptionState.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 25865e49c76f5..7c4e3e19a76d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1093,7 +1093,7 @@ private void maybeValidateAssignments() { int newMetadataUpdateVersion = metadata.updateVersion(); if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { subscriptions.forEachAssignedPartition( - tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); + tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index d08e61d9afe91..0f59c3d20777d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.util.ArrayList; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; From 7e31731ba9e56ef0dffe9ab0bddc16be4edb8839 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 20 Aug 2019 10:33:45 -0400 Subject: [PATCH 06/18] Fix FetcherTest tests --- .../clients/consumer/internals/FetcherTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 5f66689520f36..22bb23feb023f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -498,7 +498,7 @@ public void testParseCorruptedRecord() throws Exception { buffer.flip(); - subscriptions.seek(tp0, 0); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0))); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -536,7 +536,7 @@ private void ensureBlockOnRecord(long blockedOffset) { private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) { // Seek to skip the bad record and fetch again. - subscriptions.seek(tp0, toOffset); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(toOffset, Optional.empty(), metadata.leaderAndEpoch(tp0))); // Should not throw exception after the seek. fetcher.fetchedRecords(); assertEquals(1, fetcher.sendFetches()); @@ -945,13 +945,13 @@ public void testFetchOnCompletedFetchesForSomePausedPartitions() { // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses // #1 seek, request, poll, response - subscriptions.seek(tp0, 1); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp0))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); // #2 seek, request, poll, response - subscriptions.seek(tp1, 1); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); @@ -980,13 +980,13 @@ public void testFetchOnCompletedFetchesForAllPausedPartitions() { // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses // #1 seek, request, poll, response - subscriptions.seek(tp0, 1); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp0))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); // #2 seek, request, poll, response - subscriptions.seek(tp1, 1); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); @@ -1350,7 +1350,8 @@ public void testSeekBeforeException() { assertEquals(2, fetcher.fetchedRecords().get(tp0).size()); subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); - subscriptions.seek(tp1, 1); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); + assertEquals(1, fetcher.sendFetches()); partitions = new HashMap<>(); partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100, @@ -1670,7 +1671,7 @@ public void testEarlierOffsetResetArrivesLate() throws InterruptedException { Object result = invocation.callRealMethod(); latchEarliestDone.countDown(); return result; - }).when(subscriptions).maybeSeekUnvalidated(tp0, 0L, OffsetResetStrategy.EARLIEST); + }).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L), OffsetResetStrategy.EARLIEST); es.submit(() -> { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); From 4db04ace0899211e3188e6c1f5b34283a351b9bb Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 20 Aug 2019 10:34:01 -0400 Subject: [PATCH 07/18] Use full FetchPosition when resetting position Previouslly, this would only update the offset and rely on future calls to Fetcher#maybeValidatePositionForCurrentLeader to get the leader information. Now that we are only calling maybeValidatePositionForCurrentLeader when the metadata has updated, we would get stuck after a reset. --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- .../kafka/clients/consumer/internals/SubscriptionState.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 7c4e3e19a76d9..4ee09fb8a91d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -711,7 +711,7 @@ private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy r SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( offsetData.offset, offsetData.leaderEpoch, metadata.leaderAndEpoch(partition)); offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch)); - subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy); + subscriptions.maybeSeekUnvalidated(partition, position, requestedResetStrategy); } private void resetOffsetsAsync(Map partitionResetTimestamps) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 0f59c3d20777d..795cd5d3c8784 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -368,7 +368,7 @@ public void seekUnvalidated(TopicPartition tp, FetchPosition position) { assignedState(tp).seekUnvalidated(position); } - synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetResetStrategy requestedResetStrategy) { + synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position, OffsetResetStrategy requestedResetStrategy) { TopicPartitionState state = assignedStateOrNull(tp); if (state == null) { log.debug("Skipping reset of partition {} since it is no longer assigned", tp); @@ -377,8 +377,8 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetRes } else if (requestedResetStrategy != state.resetStrategy) { log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp); } else { - log.info("Resetting offset for partition {} to offset {}.", tp, offset); - state.seekUnvalidated(new FetchPosition(offset)); + log.info("Resetting offset for partition {} to position {}.", tp, position); + state.seekUnvalidated(position); } } From cdb4dcc636051598f2c30f5665570ea58c859ffd Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 20 Aug 2019 16:07:12 -0400 Subject: [PATCH 08/18] Fix mocking in unit test --- .../apache/kafka/clients/consumer/internals/FetcherTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index cdd51cf1f7d0d..fe59cc625920d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1671,7 +1671,7 @@ public void testEarlierOffsetResetArrivesLate() throws InterruptedException { Object result = invocation.callRealMethod(); latchEarliestDone.countDown(); return result; - }).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L), OffsetResetStrategy.EARLIEST); + }).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), metadata.leaderAndEpoch(tp0)), OffsetResetStrategy.EARLIEST); es.submit(() -> { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); From 33776b2559eeb49537311a3ed712f54fb8673cb6 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 21 Aug 2019 22:53:56 -0400 Subject: [PATCH 09/18] Ensure we don't enter validation when reseting offsets --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 4ee09fb8a91d6..0c90860ddf84a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -709,7 +709,9 @@ private List> fetchRecords(PartitionRecords partitionRecord private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) { SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( - offsetData.offset, offsetData.leaderEpoch, metadata.leaderAndEpoch(partition)); + offsetData.offset, + Optional.empty(), // This will ensure we skip validation + metadata.leaderAndEpoch(partition)); offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch)); subscriptions.maybeSeekUnvalidated(partition, position, requestedResetStrategy); } From 13d63d81691a4930185b5422e5e1920c744eba82 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 29 Aug 2019 13:04:56 -0400 Subject: [PATCH 10/18] Temporarily revert back to always checking the position --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 0c90860ddf84a..7e7494010ada4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1093,10 +1093,10 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren */ private void maybeValidateAssignments() { int newMetadataUpdateVersion = metadata.updateVersion(); - if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { + //if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { subscriptions.forEachAssignedPartition( tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); - } + //} } /** From aa0a77d47612103bd45879aa2a419df24d2e8b20 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 29 Aug 2019 13:38:18 -0400 Subject: [PATCH 11/18] Adding verbose logging --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 7e7494010ada4..f9eb17c8a2c9e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MetadataCache; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.StaleMetadataException; @@ -1093,10 +1094,10 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren */ private void maybeValidateAssignments() { int newMetadataUpdateVersion = metadata.updateVersion(); - //if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { + if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { subscriptions.forEachAssignedPartition( tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); - //} + } } /** @@ -1116,6 +1117,7 @@ private Map prepareFetchRequests() { Node node = selectReadReplica(partition, position.currentLeader.leader, currentTimeMs); if (node == null || node.isEmpty()) { + log.info("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); metadata.requestUpdate(); } else if (client.isUnavailable(node)) { client.maybeThrowAuthFailure(node); @@ -1123,7 +1125,6 @@ private Map prepareFetchRequests() { // If we try to send during the reconnect blackout window, then the request is just // going to be failed anyway before being sent, so skip the send for now log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); - } else if (this.nodesWithPendingFetchRequests.contains(node.id())) { log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); } else { From 0788f6ac8cee38ff4189e2ea4bd1b3057ef92d62 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 29 Aug 2019 14:00:34 -0400 Subject: [PATCH 12/18] checkstyle --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index f9eb17c8a2c9e..cfb17b169514b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MetadataCache; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.StaleMetadataException; From 8cfbfa116fa6097ec339d075fdfa691fb8305764 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 5 Aug 2020 11:08:17 -0400 Subject: [PATCH 13/18] Fixup after merge --- .../clients/consumer/internals/Fetcher.java | 24 +++++++------------ .../consumer/internals/FetcherTest.java | 17 ++++++------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e7743e948c957..2aa5d6490708b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -722,7 +722,9 @@ private List> fetchRecords(CompletedFetch completedFetch, i // Visible for testing void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) { FetchPosition position = new FetchPosition( - offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition)); + offsetData.offset, + Optional.empty(), // This will ensure we skip validation + metadata.currentLeader(partition)); offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch)); subscriptions.maybeSeekUnvalidated(partition, position, requestedResetStrategy); } @@ -783,7 +785,7 @@ private void validateOffsetsAsync(Map partitionsT regroupFetchPositionsByLeader(partitionsToValidate); long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; - regrouped.forEach((node, fetchPostitions) -> { + regrouped.forEach((node, fetchPositions) -> { if (node.isEmpty()) { metadata.requestUpdate(); return; @@ -805,7 +807,7 @@ private void validateOffsetsAsync(Map partitionsT return; } - subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), nextResetTimeMs); + subscriptions.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs); RequestFuture future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); @@ -1128,19 +1130,11 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren private void maybeValidateAssignments() { int newMetadataUpdateVersion = metadata.updateVersion(); if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { - subscriptions.forEachAssignedPartition( - tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); + subscriptions.assignedPartitions().forEach(topicPartition -> { + ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition); + subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch); + }); } - - // Ensure the position has an up-to-date leader - subscriptions.assignedPartitions().forEach(tp -> - subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp)) - ); - - subscriptions.assignedPartitions().forEach(topicPartition -> { - ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition); - subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch); - }); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 1c32c23fea12a..02481cac99654 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -504,7 +504,7 @@ public void testParseCorruptedRecord() throws Exception { buffer.flip(); - subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0))); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.currentLeader(tp0))); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -542,7 +542,7 @@ private void ensureBlockOnRecord(long blockedOffset) { private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) { // Seek to skip the bad record and fetch again. - subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(toOffset, Optional.empty(), metadata.leaderAndEpoch(tp0))); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(toOffset, Optional.empty(), metadata.currentLeader(tp0))); // Should not throw exception after the seek. fetcher.fetchedRecords(); assertEquals(1, fetcher.sendFetches()); @@ -979,13 +979,13 @@ public void testFetchOnCompletedFetchesForSomePausedPartitions() { // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses // #1 seek, request, poll, response - subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp0))); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp0))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); // #2 seek, request, poll, response - subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); @@ -1014,13 +1014,13 @@ public void testFetchOnCompletedFetchesForAllPausedPartitions() { // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses // #1 seek, request, poll, response - subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp0))); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp0))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); // #2 seek, request, poll, response - subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); @@ -1383,7 +1383,7 @@ public void testSeekBeforeException() { assertEquals(2, fetcher.fetchedRecords().get(tp0).size()); subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); - subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); assertEquals(1, fetcher.sendFetches()); partitions = new HashMap<>(); @@ -1751,7 +1751,8 @@ public void testEarlierOffsetResetArrivesLate() throws InterruptedException { Object result = invocation.callRealMethod(); latchEarliestDone.countDown(); return result; - }).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), metadata.leaderAndEpoch(tp0)), OffsetResetStrategy.EARLIEST); + }).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, + Optional.empty(), metadata.currentLeader(tp0)), OffsetResetStrategy.EARLIEST); es.submit(() -> { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); From 89f87c0de1dba810500e0f2829cc5552a72c7ad2 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 7 Aug 2020 13:46:33 -0400 Subject: [PATCH 14/18] PR feedback --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2aa5d6490708b..2e5a1c71fce2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -489,7 +489,7 @@ public void validateOffsetsIfNeeded() { // Validate each partition against the current leader and epoch // If we see a new metadata version, check all partitions - maybeValidateAssignments(); + validatePositionsOnMetadataChange(); // Collect positions needing validation, with backoff Map partitionsToValidate = subscriptions @@ -1127,7 +1127,7 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren * If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then * we should check that all of the assignments have a valid position. */ - private void maybeValidateAssignments() { + private void validatePositionsOnMetadataChange() { int newMetadataUpdateVersion = metadata.updateVersion(); if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { subscriptions.assignedPartitions().forEach(topicPartition -> { @@ -1144,7 +1144,7 @@ private void maybeValidateAssignments() { private Map prepareFetchRequests() { Map fetchable = new LinkedHashMap<>(); - maybeValidateAssignments(); + validatePositionsOnMetadataChange(); long currentTimeMs = time.milliseconds(); @@ -1156,7 +1156,7 @@ private Map prepareFetchRequests() { Optional leaderOpt = position.currentLeader.leader; if (!leaderOpt.isPresent()) { - log.info("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); + log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); metadata.requestUpdate(); continue; } From 293684d55e8b7285cf4c5d22200647837368207b Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 7 Aug 2020 13:46:43 -0400 Subject: [PATCH 15/18] Remove some streaming code --- .../consumer/internals/SubscriptionState.java | 59 ++++++++++--------- .../common/internals/PartitionStates.java | 11 +++- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 59c3ab7bd6815..05f8edfe16660 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -36,17 +36,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.regex.Pattern; -import java.util.stream.Collector; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion; @@ -331,7 +329,7 @@ public synchronized Set subscription() { } public synchronized Set pausedPartitions() { - return collectPartitions(TopicPartitionState::isPaused, Collectors.toSet()); + return collectPartitions(TopicPartitionState::isPaused); } /** @@ -407,10 +405,6 @@ public synchronized Set assignedPartitions() { return new HashSet<>(this.assignment.partitionSet()); } - public synchronized void forEachAssignedPartition(Consumer consumer) { - this.assignment.partitionSet().forEach(consumer); - } - /** * @return a modifiable copy of the currently assigned partitions as a list */ @@ -427,10 +421,13 @@ synchronized int numAssignedPartitions() { } synchronized List fetchablePartitions(Predicate isAvailable) { - return assignment.stream() - .filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable()) - .map(PartitionStates.PartitionState::topicPartition) - .collect(Collectors.toList()); + List result = new ArrayList<>(); + assignment.forEach((topicPartition, topicPartitionState) -> { + if (topicPartitionState.isFetchable() && isAvailable.test(topicPartition)) { + result.add(topicPartition); + } + }); + return result; } public synchronized boolean hasAutoAssignedPartitions() { @@ -601,10 +598,9 @@ public synchronized Optional clearPreferredReadReplica(TopicPartition t public synchronized Map allConsumed() { Map allConsumed = new HashMap<>(); - assignment.stream().forEach(state -> { - TopicPartitionState partitionState = state.value(); + assignment.forEach((topicPartition, partitionState) -> { if (partitionState.hasValidPosition()) - allConsumed.put(state.topicPartition(), new OffsetAndMetadata(partitionState.position.offset, + allConsumed.put(topicPartition, new OffsetAndMetadata(partitionState.position.offset, partitionState.position.offsetEpoch, "")); }); return allConsumed; @@ -644,26 +640,33 @@ public synchronized OffsetResetStrategy resetStrategy(TopicPartition partition) } public synchronized boolean hasAllFetchPositions() { - return assignment.stream().allMatch(state -> state.value().hasValidPosition()); + Iterator it = assignment.stateIterator(); + while (it.hasNext()) { + if (!it.next().hasValidPosition()) { + return false; + } + } + return true; } public synchronized Set initializingPartitions() { - return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING), Collectors.toSet()); + return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING)); } - private > T collectPartitions(Predicate filter, Collector collector) { - return assignment.stream() - .filter(state -> filter.test(state.value())) - .map(PartitionStates.PartitionState::topicPartition) - .collect(collector); + private Set collectPartitions(Predicate filter) { + Set result = new HashSet<>(); + assignment.forEach((topicPartition, topicPartitionState) -> { + if (filter.test(topicPartitionState)) { + result.add(topicPartition); + } + }); + return result; } public synchronized void resetInitializingPositions() { final Set partitionsWithNoOffsets = new HashSet<>(); - assignment.stream().forEach(state -> { - TopicPartition tp = state.topicPartition(); - TopicPartitionState partitionState = state.value(); + assignment.forEach((tp, partitionState) -> { if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) { if (defaultResetStrategy == OffsetResetStrategy.NONE) partitionsWithNoOffsets.add(tp); @@ -677,13 +680,11 @@ public synchronized void resetInitializingPositions() { } public synchronized Set partitionsNeedingReset(long nowMs) { - return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs), - Collectors.toSet()); + return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs)); } public synchronized Set partitionsNeedingValidation(long nowMs) { - return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs), - Collectors.toSet()); + return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs)); } public synchronized boolean isAssigned(TopicPartition tp) { diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java index fb738cfc958f7..e19126d36d32f 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java @@ -20,12 +20,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Stream; +import java.util.function.BiConsumer; /** * This class is a useful building block for doing fetch requests where topic partitions have to be rotated via @@ -85,8 +86,12 @@ public boolean contains(TopicPartition topicPartition) { return map.containsKey(topicPartition); } - public Stream> stream() { - return map.entrySet().stream().map(entry -> new PartitionState<>(entry.getKey(), entry.getValue())); + public Iterator stateIterator() { + return map.values().iterator(); + } + + public void forEach(BiConsumer biConsumer) { + map.forEach(biConsumer); } public Map partitionStateMap() { From 315de4ae8bfd86067c5fa198c2da017ab06cedb1 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 7 Aug 2020 16:05:36 -0400 Subject: [PATCH 16/18] Add some commentary about ugly code --- .../kafka/clients/consumer/internals/SubscriptionState.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 05f8edfe16660..62c53e839d5e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -421,8 +421,10 @@ synchronized int numAssignedPartitions() { } synchronized List fetchablePartitions(Predicate isAvailable) { + // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API List result = new ArrayList<>(); assignment.forEach((topicPartition, topicPartitionState) -> { + // Cheap check is first to avoid evaluating the predicate if possible if (topicPartitionState.isFetchable() && isAvailable.test(topicPartition)) { result.add(topicPartition); } @@ -640,6 +642,7 @@ public synchronized OffsetResetStrategy resetStrategy(TopicPartition partition) } public synchronized boolean hasAllFetchPositions() { + // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API Iterator it = assignment.stateIterator(); while (it.hasNext()) { if (!it.next().hasValidPosition()) { From d6552ce669a4f676e886c2b62723c268b3cb4f96 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 10 Aug 2020 18:17:45 -0400 Subject: [PATCH 17/18] Add JMH for SubscriptionState --- checkstyle/import-control-jmh-benchmarks.xml | 2 +- .../consumer/internals/SubscriptionState.java | 3 +- .../consumer/SubscriptionStateBenchmark.java | 95 +++++++++++++++++++ 3 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index e5017e4e4c81f..f1538df3bcb4a 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -19,7 +19,6 @@ --> - @@ -49,6 +48,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 62c53e839d5e0..4dcf160392a51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -420,7 +420,8 @@ synchronized int numAssignedPartitions() { return this.assignment.size(); } - synchronized List fetchablePartitions(Predicate isAvailable) { + // Visible for testing + public synchronized List fetchablePartitions(Predicate isAvailable) { // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API List result = new ArrayList<>(); assignment.forEach((topicPartition, topicPartitionState) -> { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java new file mode 100644 index 0000000000000..d88a32ba4f495 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.consumer; + +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SubscriptionStateBenchmark { + @Param({"5000"}) + int topicCount; + + @Param({"50"}) + int partitionCount; + + SubscriptionState subscriptionState; + + @Setup(Level.Trial) + public void setup() { + Set assignment = new HashSet<>(topicCount * partitionCount); + IntStream.range(0, topicCount).forEach(topicId -> + IntStream.range(0, partitionCount).forEach(partitionId -> + assignment.add(new TopicPartition(String.format("topic-%04d", topicId), partitionId)) + ) + ); + subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + subscriptionState.assignFromUser(assignment); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( + 0L, + Optional.of(0), + new Metadata.LeaderAndEpoch(Optional.of(new Node(0, "host", 9092)), Optional.of(10)) + ); + assignment.forEach(topicPartition -> { + subscriptionState.seekUnvalidated(topicPartition, position); + subscriptionState.completeValidation(topicPartition); + }); + } + + @Benchmark + public boolean testHasAllFetchPositions() { + return subscriptionState.hasAllFetchPositions(); + } + + @Benchmark + public int testFetchablePartitions() { + return subscriptionState.fetchablePartitions(tp -> true).size(); + } + + @Benchmark + public int testPartitionsNeedingValidation() { + return subscriptionState.partitionsNeedingValidation(0L).size(); + } +} From 6f3ed50b0c0557a3e31b0b6c2b13188ceab38fd8 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 12 Aug 2020 09:51:53 -0400 Subject: [PATCH 18/18] Add spotbugs exclude for new jmh package --- gradle/spotbugs-exclude.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index f144165536128..37fba9e81ee9d 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -238,6 +238,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read +