Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
-->

<import-control pkg="org.apache.kafka.jmh">

<allow pkg="java"/>
<allow pkg="scala"/>
<allow pkg="javax.management"/>
Expand Down Expand Up @@ -49,6 +48,7 @@
<allow pkg="org.mockito"/>
<allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.clients"/>

<subpackage name="cache">
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public class Fetcher<K, V> implements Closeable {
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
private final Set<Integer> nodesWithPendingFetchRequests;
private final ApiVersions apiVersions;
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);


private CompletedFetch nextInLineFetch = null;

Expand Down Expand Up @@ -486,10 +488,8 @@ public void validateOffsetsIfNeeded() {
throw exception;

// Validate each partition against the current leader and epoch
subscriptions.assignedPartitions().forEach(topicPartition -> {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
});
// If we see a new metadata version, check all partitions
validatePositionsOnMetadataChange();

// Collect positions needing validation, with backoff
Map<TopicPartition, FetchPosition> partitionsToValidate = subscriptions
Expand Down Expand Up @@ -722,9 +722,11 @@ private List<ConsumerRecord<K, V>> fetchRecords(CompletedFetch completedFetch, i
// Visible for testing
void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
FetchPosition position = new FetchPosition(
offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition));
offsetData.offset,
Optional.empty(), // This will ensure we skip validation
metadata.currentLeader(partition));
offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy);
subscriptions.maybeSeekUnvalidated(partition, position, requestedResetStrategy);
}

private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
Expand Down Expand Up @@ -782,6 +784,7 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
final Map<Node, Map<TopicPartition, FetchPosition>> regrouped =
regroupFetchPositionsByLeader(partitionsToValidate);

long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
regrouped.forEach((node, fetchPositions) -> {
if (node.isEmpty()) {
metadata.requestUpdate();
Expand All @@ -804,7 +807,7 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
return;
}

subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs);
subscriptions.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs);

RequestFuture<OffsetForEpochResult> future =
offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions);
Expand Down Expand Up @@ -1120,17 +1123,28 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren
}
}

/**
* If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then
* we should check that all of the assignments have a valid position.
*/
private void validatePositionsOnMetadataChange() {
int newMetadataUpdateVersion = metadata.updateVersion();
if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) {
subscriptions.assignedPartitions().forEach(topicPartition -> {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
});
}
}

/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fetchablePartitions method used below is probably another nice opportunity to use something like forEachAssignedPartition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it saves anything in this case. The main benefit of forEachAssignedPartition is avoiding making a copy of the assignment set. Since fetchablePartitions iterates across the internal set directly I don't think it would help

Copy link
Member

@ijuma ijuma Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid a copy in the case @hachikuji mentions as well, right? See below:

synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
        return assignment.stream()
                .filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable())
                .map(PartitionStates.PartitionState::topicPartition)
                .collect(Collectors.toList());
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, I see what he means. I'll look into this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a pass at this and it wasn't so simple. Deferring for now

Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();

// Ensure the position has an up-to-date leader
subscriptions.assignedPartitions().forEach(tp ->
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp))
);
validatePositionsOnMetadataChange();

long currentTimeMs = time.milliseconds();

Expand All @@ -1142,6 +1156,7 @@ private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {

Optional<Node> leaderOpt = position.currentLeader.leader;
if (!leaderOpt.isPresent()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate();
continue;
}
Expand All @@ -1154,7 +1169,6 @@ private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
// If we try to send during the reconnect blackout window, then the request is just
// going to be failed anyway before being sent, so skip the send for now
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);

} else if (this.nodesWithPendingFetchRequests.contains(node.id())) {
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -44,8 +45,6 @@
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion;

Expand Down Expand Up @@ -330,7 +329,7 @@ public synchronized Set<String> subscription() {
}

public synchronized Set<TopicPartition> pausedPartitions() {
return collectPartitions(TopicPartitionState::isPaused, Collectors.toSet());
return collectPartitions(TopicPartitionState::isPaused);
}

/**
Expand Down Expand Up @@ -385,7 +384,7 @@ public void seekUnvalidated(TopicPartition tp, FetchPosition position) {
assignedState(tp).seekUnvalidated(position);
}

synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetResetStrategy requestedResetStrategy) {
synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position, OffsetResetStrategy requestedResetStrategy) {
TopicPartitionState state = assignedStateOrNull(tp);
if (state == null) {
log.debug("Skipping reset of partition {} since it is no longer assigned", tp);
Expand All @@ -394,8 +393,8 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetRes
} else if (requestedResetStrategy != state.resetStrategy) {
log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp);
} else {
log.info("Resetting offset for partition {} to offset {}.", tp, offset);
state.seekUnvalidated(new FetchPosition(offset));
log.info("Resetting offset for partition {} to position {}.", tp, position);
state.seekUnvalidated(position);
}
}

Expand All @@ -421,11 +420,17 @@ synchronized int numAssignedPartitions() {
return this.assignment.size();
}

synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
return assignment.stream()
.filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable())
.map(PartitionStates.PartitionState::topicPartition)
.collect(Collectors.toList());
// Visible for testing
public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
// Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
List<TopicPartition> result = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a small comment that this is in the hotpath and is written the "ugly" way for a reason. It's also probably worth mentioning that we do the cheap isFetchable check first.

assignment.forEach((topicPartition, topicPartitionState) -> {
// Cheap check is first to avoid evaluating the predicate if possible
if (topicPartitionState.isFetchable() && isAvailable.test(topicPartition)) {
result.add(topicPartition);
}
});
return result;
}

public synchronized boolean hasAutoAssignedPartitions() {
Expand Down Expand Up @@ -596,10 +601,9 @@ public synchronized Optional<Integer> clearPreferredReadReplica(TopicPartition t

public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.stream().forEach(state -> {
TopicPartitionState partitionState = state.value();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(state.topicPartition(), new OffsetAndMetadata(partitionState.position.offset,
allConsumed.put(topicPartition, new OffsetAndMetadata(partitionState.position.offset,
partitionState.position.offsetEpoch, ""));
});
return allConsumed;
Expand Down Expand Up @@ -639,26 +643,34 @@ public synchronized OffsetResetStrategy resetStrategy(TopicPartition partition)
}

public synchronized boolean hasAllFetchPositions() {
return assignment.stream().allMatch(state -> state.value().hasValidPosition());
// Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
Iterator<TopicPartitionState> it = assignment.stateIterator();
while (it.hasNext()) {
if (!it.next().hasValidPosition()) {
return false;
}
}
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah I now understand why the previous version was slower, it was allocating a PartitionState instance per element. But we only use the value here. So, we could still use allMatch without the performance penalty.

}

public synchronized Set<TopicPartition> initializingPartitions() {
return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING), Collectors.toSet());
return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING));
}

private <T extends Collection<TopicPartition>> T collectPartitions(Predicate<TopicPartitionState> filter, Collector<TopicPartition, ?, T> collector) {
return assignment.stream()
.filter(state -> filter.test(state.value()))
.map(PartitionStates.PartitionState::topicPartition)
.collect(collector);
private Set<TopicPartition> collectPartitions(Predicate<TopicPartitionState> filter) {
Set<TopicPartition> result = new HashSet<>();
assignment.forEach((topicPartition, topicPartitionState) -> {
if (filter.test(topicPartitionState)) {
result.add(topicPartition);
}
});
return result;
}


public synchronized void resetInitializingPositions() {
final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
assignment.stream().forEach(state -> {
TopicPartition tp = state.topicPartition();
TopicPartitionState partitionState = state.value();
assignment.forEach((tp, partitionState) -> {
if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) {
if (defaultResetStrategy == OffsetResetStrategy.NONE)
partitionsWithNoOffsets.add(tp);
Expand All @@ -672,13 +684,11 @@ public synchronized void resetInitializingPositions() {
}

public synchronized Set<TopicPartition> partitionsNeedingReset(long nowMs) {
return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs),
Collectors.toSet());
return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs));
}

public synchronized Set<TopicPartition> partitionsNeedingValidation(long nowMs) {
return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs),
Collectors.toSet());
return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs));
}

public synchronized boolean isAssigned(TopicPartition tp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import java.util.function.BiConsumer;

/**
* This class is a useful building block for doing fetch requests where topic partitions have to be rotated via
Expand Down Expand Up @@ -85,8 +86,12 @@ public boolean contains(TopicPartition topicPartition) {
return map.containsKey(topicPartition);
}

public Stream<PartitionState<S>> stream() {
return map.entrySet().stream().map(entry -> new PartitionState<>(entry.getKey(), entry.getValue()));
public Iterator<S> stateIterator() {
return map.values().iterator();
}

public void forEach(BiConsumer<TopicPartition, S> biConsumer) {
map.forEach(biConsumer);
}

public Map<TopicPartition, S> partitionStateMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ public void testParseCorruptedRecord() throws Exception {

buffer.flip();

subscriptions.seek(tp0, 0);
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.currentLeader(tp0)));

// normal fetch
assertEquals(1, fetcher.sendFetches());
Expand Down Expand Up @@ -542,7 +542,7 @@ private void ensureBlockOnRecord(long blockedOffset) {

private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) {
// Seek to skip the bad record and fetch again.
subscriptions.seek(tp0, toOffset);
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(toOffset, Optional.empty(), metadata.currentLeader(tp0)));
// Should not throw exception after the seek.
fetcher.fetchedRecords();
assertEquals(1, fetcher.sendFetches());
Expand Down Expand Up @@ -979,13 +979,13 @@ public void testFetchOnCompletedFetchesForSomePausedPartitions() {
// seek to tp0 and tp1 in two polls to generate 2 complete requests and responses

// #1 seek, request, poll, response
subscriptions.seek(tp0, 1);
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp0)));
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));

// #2 seek, request, poll, response
subscriptions.seek(tp1, 1);
subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1)));
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));

Expand Down Expand Up @@ -1014,13 +1014,13 @@ public void testFetchOnCompletedFetchesForAllPausedPartitions() {
// seek to tp0 and tp1 in two polls to generate 2 complete requests and responses

// #1 seek, request, poll, response
subscriptions.seek(tp0, 1);
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp0)));
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));

// #2 seek, request, poll, response
subscriptions.seek(tp1, 1);
subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1)));
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));

Expand Down Expand Up @@ -1383,7 +1383,8 @@ public void testSeekBeforeException() {
assertEquals(2, fetcher.fetchedRecords().get(tp0).size());

subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
subscriptions.seek(tp1, 1);
subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1)));

assertEquals(1, fetcher.sendFetches());
partitions = new HashMap<>();
partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
Expand Down Expand Up @@ -1750,7 +1751,8 @@ public void testEarlierOffsetResetArrivesLate() throws InterruptedException {
Object result = invocation.callRealMethod();
latchEarliestDone.countDown();
return result;
}).when(subscriptions).maybeSeekUnvalidated(tp0, 0L, OffsetResetStrategy.EARLIEST);
}).when(subscriptions).maybeSeekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.empty(), metadata.currentLeader(tp0)), OffsetResetStrategy.EARLIEST);

es.submit(() -> {
subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
Expand Down
1 change: 1 addition & 0 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Package name="org.apache.kafka.jmh.fetchsession.generated"/>
<Package name="org.apache.kafka.jmh.fetcher.generated"/>
<Package name="org.apache.kafka.jmh.server.generated"/>
<Package name="org.apache.kafka.jmh.consumer.generated"/>
</Or>
</Match>

Expand Down
Loading