diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index e5017e4e4c81f..f1538df3bcb4a 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -19,7 +19,6 @@ --> - @@ -49,6 +48,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ed14c97712423..2e5a1c71fce2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -155,6 +155,8 @@ public class Fetcher implements Closeable { private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; private final Set nodesWithPendingFetchRequests; private final ApiVersions apiVersions; + private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + private CompletedFetch nextInLineFetch = null; @@ -486,10 +488,8 @@ public void validateOffsetsIfNeeded() { throw exception; // Validate each partition against the current leader and epoch - subscriptions.assignedPartitions().forEach(topicPartition -> { - ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition); - subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch); - }); + // If we see a new metadata version, check all partitions + validatePositionsOnMetadataChange(); // Collect positions needing validation, with backoff Map partitionsToValidate = subscriptions @@ -722,9 +722,11 @@ private List> fetchRecords(CompletedFetch completedFetch, i // Visible for testing void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) { FetchPosition position = new FetchPosition( - offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition)); + offsetData.offset, + Optional.empty(), // This will ensure we skip validation + metadata.currentLeader(partition)); offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch)); - subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy); + subscriptions.maybeSeekUnvalidated(partition, position, requestedResetStrategy); } private void resetOffsetsAsync(Map partitionResetTimestamps) { @@ -782,6 +784,7 @@ private void validateOffsetsAsync(Map partitionsT final Map> regrouped = regroupFetchPositionsByLeader(partitionsToValidate); + long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; regrouped.forEach((node, fetchPositions) -> { if (node.isEmpty()) { metadata.requestUpdate(); @@ -804,7 +807,7 @@ private void validateOffsetsAsync(Map partitionsT return; } - subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs); + subscriptions.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs); RequestFuture future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); @@ -1120,6 +1123,20 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren } } + /** + * If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then + * we should check that all of the assignments have a valid position. + */ + private void validatePositionsOnMetadataChange() { + int newMetadataUpdateVersion = metadata.updateVersion(); + if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) { + subscriptions.assignedPartitions().forEach(topicPartition -> { + ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition); + subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch); + }); + } + } + /** * Create fetch requests for all nodes for which we have assigned partitions * that have no existing requests in flight. @@ -1127,10 +1144,7 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren private Map prepareFetchRequests() { Map fetchable = new LinkedHashMap<>(); - // Ensure the position has an up-to-date leader - subscriptions.assignedPartitions().forEach(tp -> - subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp)) - ); + validatePositionsOnMetadataChange(); long currentTimeMs = time.milliseconds(); @@ -1142,6 +1156,7 @@ private Map prepareFetchRequests() { Optional leaderOpt = position.currentLeader.leader; if (!leaderOpt.isPresent()) { + log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); metadata.requestUpdate(); continue; } @@ -1154,7 +1169,6 @@ private Map prepareFetchRequests() { // If we try to send during the reconnect blackout window, then the request is just // going to be failed anyway before being sent, so skip the send for now log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); - } else if (this.nodesWithPendingFetchRequests.contains(node.id())) { log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 13103bc0c038f..4dcf160392a51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,8 +45,6 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.regex.Pattern; -import java.util.stream.Collector; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion; @@ -330,7 +329,7 @@ public synchronized Set subscription() { } public synchronized Set pausedPartitions() { - return collectPartitions(TopicPartitionState::isPaused, Collectors.toSet()); + return collectPartitions(TopicPartitionState::isPaused); } /** @@ -385,7 +384,7 @@ public void seekUnvalidated(TopicPartition tp, FetchPosition position) { assignedState(tp).seekUnvalidated(position); } - synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetResetStrategy requestedResetStrategy) { + synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position, OffsetResetStrategy requestedResetStrategy) { TopicPartitionState state = assignedStateOrNull(tp); if (state == null) { log.debug("Skipping reset of partition {} since it is no longer assigned", tp); @@ -394,8 +393,8 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetRes } else if (requestedResetStrategy != state.resetStrategy) { log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp); } else { - log.info("Resetting offset for partition {} to offset {}.", tp, offset); - state.seekUnvalidated(new FetchPosition(offset)); + log.info("Resetting offset for partition {} to position {}.", tp, position); + state.seekUnvalidated(position); } } @@ -421,11 +420,17 @@ synchronized int numAssignedPartitions() { return this.assignment.size(); } - synchronized List fetchablePartitions(Predicate isAvailable) { - return assignment.stream() - .filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable()) - .map(PartitionStates.PartitionState::topicPartition) - .collect(Collectors.toList()); + // Visible for testing + public synchronized List fetchablePartitions(Predicate isAvailable) { + // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API + List result = new ArrayList<>(); + assignment.forEach((topicPartition, topicPartitionState) -> { + // Cheap check is first to avoid evaluating the predicate if possible + if (topicPartitionState.isFetchable() && isAvailable.test(topicPartition)) { + result.add(topicPartition); + } + }); + return result; } public synchronized boolean hasAutoAssignedPartitions() { @@ -596,10 +601,9 @@ public synchronized Optional clearPreferredReadReplica(TopicPartition t public synchronized Map allConsumed() { Map allConsumed = new HashMap<>(); - assignment.stream().forEach(state -> { - TopicPartitionState partitionState = state.value(); + assignment.forEach((topicPartition, partitionState) -> { if (partitionState.hasValidPosition()) - allConsumed.put(state.topicPartition(), new OffsetAndMetadata(partitionState.position.offset, + allConsumed.put(topicPartition, new OffsetAndMetadata(partitionState.position.offset, partitionState.position.offsetEpoch, "")); }); return allConsumed; @@ -639,26 +643,34 @@ public synchronized OffsetResetStrategy resetStrategy(TopicPartition partition) } public synchronized boolean hasAllFetchPositions() { - return assignment.stream().allMatch(state -> state.value().hasValidPosition()); + // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API + Iterator it = assignment.stateIterator(); + while (it.hasNext()) { + if (!it.next().hasValidPosition()) { + return false; + } + } + return true; } public synchronized Set initializingPartitions() { - return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING), Collectors.toSet()); + return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING)); } - private > T collectPartitions(Predicate filter, Collector collector) { - return assignment.stream() - .filter(state -> filter.test(state.value())) - .map(PartitionStates.PartitionState::topicPartition) - .collect(collector); + private Set collectPartitions(Predicate filter) { + Set result = new HashSet<>(); + assignment.forEach((topicPartition, topicPartitionState) -> { + if (filter.test(topicPartitionState)) { + result.add(topicPartition); + } + }); + return result; } public synchronized void resetInitializingPositions() { final Set partitionsWithNoOffsets = new HashSet<>(); - assignment.stream().forEach(state -> { - TopicPartition tp = state.topicPartition(); - TopicPartitionState partitionState = state.value(); + assignment.forEach((tp, partitionState) -> { if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) { if (defaultResetStrategy == OffsetResetStrategy.NONE) partitionsWithNoOffsets.add(tp); @@ -672,13 +684,11 @@ public synchronized void resetInitializingPositions() { } public synchronized Set partitionsNeedingReset(long nowMs) { - return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs), - Collectors.toSet()); + return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs)); } public synchronized Set partitionsNeedingValidation(long nowMs) { - return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs), - Collectors.toSet()); + return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs)); } public synchronized boolean isAssigned(TopicPartition tp) { diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java index fb738cfc958f7..e19126d36d32f 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java @@ -20,12 +20,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Stream; +import java.util.function.BiConsumer; /** * This class is a useful building block for doing fetch requests where topic partitions have to be rotated via @@ -85,8 +86,12 @@ public boolean contains(TopicPartition topicPartition) { return map.containsKey(topicPartition); } - public Stream> stream() { - return map.entrySet().stream().map(entry -> new PartitionState<>(entry.getKey(), entry.getValue())); + public Iterator stateIterator() { + return map.values().iterator(); + } + + public void forEach(BiConsumer biConsumer) { + map.forEach(biConsumer); } public Map partitionStateMap() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index edc7c8548652e..02481cac99654 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -504,7 +504,7 @@ public void testParseCorruptedRecord() throws Exception { buffer.flip(); - subscriptions.seek(tp0, 0); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.currentLeader(tp0))); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -542,7 +542,7 @@ private void ensureBlockOnRecord(long blockedOffset) { private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) { // Seek to skip the bad record and fetch again. - subscriptions.seek(tp0, toOffset); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(toOffset, Optional.empty(), metadata.currentLeader(tp0))); // Should not throw exception after the seek. fetcher.fetchedRecords(); assertEquals(1, fetcher.sendFetches()); @@ -979,13 +979,13 @@ public void testFetchOnCompletedFetchesForSomePausedPartitions() { // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses // #1 seek, request, poll, response - subscriptions.seek(tp0, 1); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp0))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); // #2 seek, request, poll, response - subscriptions.seek(tp1, 1); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); @@ -1014,13 +1014,13 @@ public void testFetchOnCompletedFetchesForAllPausedPartitions() { // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses // #1 seek, request, poll, response - subscriptions.seek(tp0, 1); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp0))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); // #2 seek, request, poll, response - subscriptions.seek(tp1, 1); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); @@ -1383,7 +1383,8 @@ public void testSeekBeforeException() { assertEquals(2, fetcher.fetchedRecords().get(tp0).size()); subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); - subscriptions.seek(tp1, 1); + subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); + assertEquals(1, fetcher.sendFetches()); partitions = new HashMap<>(); partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100, @@ -1750,7 +1751,8 @@ public void testEarlierOffsetResetArrivesLate() throws InterruptedException { Object result = invocation.callRealMethod(); latchEarliestDone.countDown(); return result; - }).when(subscriptions).maybeSeekUnvalidated(tp0, 0L, OffsetResetStrategy.EARLIEST); + }).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, + Optional.empty(), metadata.currentLeader(tp0)), OffsetResetStrategy.EARLIEST); es.submit(() -> { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index f144165536128..37fba9e81ee9d 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -238,6 +238,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java new file mode 100644 index 0000000000000..d88a32ba4f495 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.consumer; + +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SubscriptionStateBenchmark { + @Param({"5000"}) + int topicCount; + + @Param({"50"}) + int partitionCount; + + SubscriptionState subscriptionState; + + @Setup(Level.Trial) + public void setup() { + Set assignment = new HashSet<>(topicCount * partitionCount); + IntStream.range(0, topicCount).forEach(topicId -> + IntStream.range(0, partitionCount).forEach(partitionId -> + assignment.add(new TopicPartition(String.format("topic-%04d", topicId), partitionId)) + ) + ); + subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + subscriptionState.assignFromUser(assignment); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( + 0L, + Optional.of(0), + new Metadata.LeaderAndEpoch(Optional.of(new Node(0, "host", 9092)), Optional.of(10)) + ); + assignment.forEach(topicPartition -> { + subscriptionState.seekUnvalidated(topicPartition, position); + subscriptionState.completeValidation(topicPartition); + }); + } + + @Benchmark + public boolean testHasAllFetchPositions() { + return subscriptionState.hasAllFetchPositions(); + } + + @Benchmark + public int testFetchablePartitions() { + return subscriptionState.fetchablePartitions(tp -> true).size(); + } + + @Benchmark + public int testPartitionsNeedingValidation() { + return subscriptionState.partitionsNeedingValidation(0L).size(); + } +}