Skip to content

Commit

Permalink
KAFKA-18404: Remove partitionMaxBytes usage from DelayedShareFetch (a…
Browse files Browse the repository at this point in the history
…pache#17870)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
  • Loading branch information
adixitconfluent authored Jan 13, 2025
1 parent f9b4cdb commit 4e24c50
Show file tree
Hide file tree
Showing 7 changed files with 596 additions and 55 deletions.
108 changes: 66 additions & 42 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
Expand Down Expand Up @@ -60,24 +61,35 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;

DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
}

DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
PartitionMaxBytesStrategy partitionMaxBytesStrategy) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
this.partitionsAlreadyFetched = new LinkedHashMap<>();
this.exceptionHandler = exceptionHandler;
this.sharePartitions = sharePartitions;
this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
}

@Override
Expand All @@ -99,7 +111,7 @@ public void onComplete() {
partitionsAcquired.keySet());

try {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
Expand All @@ -121,11 +133,13 @@ public void onComplete() {
}
}

private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
try {
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
responseData = readFromLog(
topicPartitionData,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()));
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.
Expand Down Expand Up @@ -158,7 +172,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
*/
@Override
public boolean tryComplete() {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions();

try {
if (!topicPartitionData.isEmpty()) {
Expand All @@ -167,7 +181,7 @@ public boolean tryComplete() {
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
partitionsAcquired = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
boolean completedByMe = forceComplete();
Expand Down Expand Up @@ -202,28 +216,18 @@ public boolean tryComplete() {
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
*/
// Visible for testing
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be attempted.
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
try {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
Optional.empty()
)
);
topicPartitionData.put(topicIdPartition, sharePartition.nextFetchOffset());
} else {
sharePartition.releaseFetchLock();
log.trace("Record lock partition limit exceeded for SharePartition {}, " +
Expand All @@ -239,24 +243,28 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
return topicPartitionData;
}

private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
LinkedHashMap<TopicIdPartition, Long> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData);
if (sharePartition.fetchOffsetMetadata(fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, fetchOffset);
}
});
if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
// Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata,
// we will take acquired partitions size = topicPartitionData.size() because we do not want to let the
// leftover partitions to starve which will be fetched later.
return readFromLog(
partitionsNotMatchingFetchOffsetMetadata,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size()));
}

private void maybeUpdateFetchOffsetMetadata(
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
private void maybeUpdateFetchOffsetMetadata(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
Expand All @@ -267,17 +275,18 @@ private void maybeUpdateFetchOffsetMetadata(
continue;
}
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition).fetchOffset,
topicPartitionData.get(topicIdPartition),
replicaManagerLogReadResult.info().fetchOffsetMetadata);
}
}

// minByes estimation currently assumes the common case where all fetched data is acquirable.
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
long accumulatedSize = 0;
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
for (Map.Entry<TopicIdPartition, Long> entry : topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
FetchRequest.PartitionData partitionData = entry.getValue();
long fetchOffset = entry.getValue();

LogOffsetMetadata endOffsetMetadata;
try {
Expand All @@ -294,7 +303,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest

SharePartition sharePartition = sharePartitions.get(topicIdPartition);

Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(fetchOffset);
if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get();
Expand All @@ -312,7 +321,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
return true;
} else if (fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when accumulating the bytes.
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionData.maxBytes);
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionMaxBytes.get(topicIdPartition));
accumulatedSize += bytesAvailable;
}
}
Expand All @@ -335,13 +344,25 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)

}

private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet());
if (partitionsToFetch.isEmpty()) {
return new LinkedHashMap<>();
}

LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> topicPartitionData.put(topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
fetchOffset,
0,
partitionMaxBytes.get(topicIdPartition),
Optional.empty())
));

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetch.fetchParams(),
CollectionConverters.asScala(
Expand Down Expand Up @@ -390,18 +411,21 @@ private void handleFetchException(
}

// Visible for testing.
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition, partitionData);
missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset);
}
});
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);

LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(
missingLogReadTopicPartitions,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, missingLogReadTopicPartitions.keySet(), topicPartitionData.size()));
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
Expand Down
Loading

0 comments on commit 4e24c50

Please sign in to comment.