Skip to content

KAFKA-19019: Add support for remote storage fetch for share groups #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

What

This PR adds the support for remote storage fetch for share groups.

Limitation

There is a limitation in remote storage fetch for consumer groups that
we can only perform remote fetch for a single topic partition in a fetch
request. Since, the logic of share fetch requests is largely based on
how consumer
groups work, we are following similar logic in implementing remote
storage fetch. However, this problem
should be addressed as part of KAFKA-19133 which should help us perform
fetch for multiple remote fetch topic partition in a single share fetch
request.

Testing

I have followed the AK
documentation

to test my code locally (by adopting LocalTieredStorage.java) and with
the help of unit tests.

Summary by CodeRabbit

  • New Features

    • Added support for remote storage fetches in share fetch operations, enabling asynchronous remote data retrieval alongside local log reads.
  • Bug Fixes

    • Improved error handling and completion logic for share fetches involving remote storage, ensuring robust operation in failure scenarios.
  • Tests

    • Introduced comprehensive tests covering remote storage fetch integration, including success, failure, and edge cases for delayed share fetch logic.

This comment was marked as resolved.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
core/src/main/java/kafka/server/share/DelayedShareFetch.java (2)

724-733: Pass an immutable copy when queueing follow‑up actions

topicIdPartitions is passed directly to the lambda queued via replicaManager.addToActionQueue.
Because the original Set is typically a mutable LinkedHashSet owned by the caller, later mutations
(e.g. clear()) will change the contents seen by the queued runnable, producing stale or empty work.

-replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition ->
+Set<TopicIdPartition> tpCopy = new LinkedHashSet<>(topicIdPartitions);
+replicaManager.addToActionQueue(() -> tpCopy.forEach(topicIdPartition ->
     replicaManager.completeDelayedShareFetchRequest(
         new DelayedShareFetchGroupKey(shareFetch.groupId(),
                                       topicIdPartition.topicId(),
                                       topicIdPartition.partition()))));

594-597: Expose remoteFetch as Optional instead of nullable

remoteFetch() collapses the Optional and returns null when absent, re‑introducing a nullable
contract that callers must special‑case.

-RemoteFetch remoteFetch() {
-    return remoteFetchOpt.orElse(null);
-}
+Optional<RemoteFetch> remoteFetch() {
+    return remoteFetchOpt;
+}

This keeps the null‑free invariant and simplifies call sites (ifPresent, orElse …).

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java (1)

1760-1764: remoteFetch field in test‑builder is never set

The builder keeps a final Optional<DelayedShareFetch.RemoteFetch> remoteFetch = Optional.empty() and
always passes that to DelayedShareFetch. All tests therefore exercise only the production code path
that creates the RemoteFetch internally.

If you intended to inject a pre‑fabricated RemoteFetch for deterministic testing, expose a
withRemoteFetch(...) setter; otherwise drop the field to avoid confusion.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 25788b3.

📒 Files selected for processing (2)
  • core/src/main/java/kafka/server/share/DelayedShareFetch.java (11 hunks)
  • core/src/test/java/kafka/server/share/DelayedShareFetchTest.java (11 hunks)
🔇 Additional comments (1)
core/src/main/java/kafka/server/share/DelayedShareFetch.java (1)

682-708:

❌ Incorrect review comment

Potential data race on remoteFetchOpt / remoteStorageFetchException

remoteFetchOpt and remoteStorageFetchException are mutated in tryComplete (thread‑pool threads) and
read in maybeCompletePendingRemoteFetch as well as onComplete without any synchronisation or volatile
declaration. DelayedOperation.tryComplete(...) may run concurrently on multiple threads, so these two
fields can be observed in a half‑initialised state causing:

  • NullPointerException (remoteFetchOpt.get() when another thread cleared it),
  • lost wake‑up (one thread completes the future while another still sees !isDone()).

Guard the two fields with the inherited lock or mark them volatile/AtomicReference.


🏁 Script executed:

#!/bin/bash
grep -n "class DelayedShareFetch" core/src/main/java/kafka/server/share/DelayedShareFetch.java
grep -n "remoteFetchOpt" core/src/main/java/kafka/server/share/DelayedShareFetch.java
grep -n "remoteStorageFetchException" core/src/main/java/kafka/server/share/DelayedShareFetch.java
grep -n "tryComplete" core/src/main/java/kafka/server/share/DelayedShareFetch.java
grep -n "maybeCompletePendingRemoteFetch" core/src/main/java/kafka/server/share/DelayedShareFetch.java
grep -n "onComplete" core/src/main/java/kafka/server/share/DelayedShareFetch.java

Length of output: 3971


#!/bin/bash
# Locate DelayedOperation class
find core/src -type f -name "DelayedOperation.java"

# Display the beginning to inspect locking and structure
DELAYED_OP_PATH=$(find core/src -type f -name "DelayedOperation.java" | head -n1)
echo "=== $DELAYED_OP_PATH ==="
sed -n '1,200p' "$DELAYED_OP_PATH"

# Show where tryComplete is implemented in DelayedOperation
grep -R "tryComplete" -n "$DELAYED_OP_PATH"
sed -n '200,350p' "$DELAYED_OP_PATH"

# Inspect usage of 'lock' in DelayedOperation
grep -R "lock" -n "$DELAYED_OP_PATH"

#!/bin/bash
set -e

FILE="core/src/main/java/kafka/server/share/DelayedShareFetch.java"

echo "=== maybeCompletePendingRemoteFetch (lines 660-740) ==="
sed -n '660,740p' "$FILE"

echo
echo "=== tryComplete (lines 270-360) ==="
sed -n '270,360p' "$FILE"

echo
echo "=== onComplete (lines 180-260) ==="
sed -n '180,260p' "$FILE"

echo
echo "Searching for lock usage in DelayedShareFetch:"
grep -R "lock" -n "$FILE"

#!/bin/bash
set -e

# 1. Show mutations of remoteFetchOpt
FILE="core/src/main/java/kafka/server/share/DelayedShareFetch.java"
echo "=== remoteFetchOpt assignments and updates (lines 140-240) ==="
sed -n '140,240p' "$FILE"

echo
# 2. Show where remoteStorageFetchException is set
echo "=== remoteStorageFetchException mutations (lines 600-700) ==="
sed -n '600,700p' "$FILE"

echo
# 3. Inspect DelayedOperation for lock usage
OP_PATH=$(find core/src -type f -name "DelayedOperation.java" | head -n1)
echo "=== DelayedOperation file: $OP_PATH (first 200 lines) ==="
sed -n '1,200p' "$OP_PATH"
echo
echo "=== 'lock' declarations and usage in DelayedOperation ==="
grep -n "lock" "$OP_PATH"

No data race – fields are already guarded by DelayedOperation’s lock

All accesses and mutations of remoteFetchOpt and remoteStorageFetchException occur inside overridden tryCompletemaybeCompletePendingRemoteFetch, and onComplete methods, which DelayedOperation invokes under its internal ReentrantLock. Since these methods (and their helper calls) run only while holding that lock, there is no risk of observing a half‑initialized state or lost wake‑up. Please disregard the synchronization suggestion.

Likely an incorrect or invalid review comment.

@visz11
Copy link
Collaborator

visz11 commented Jul 24, 2025

/refacto-test

Copy link

refacto-test bot commented Aug 7, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Comment on lines +851 to +857
private void cancelRemoteFetchTask() {
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);
if (!cancelled) {
log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}",
remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone());
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Resource Leakage in Remote Fetch Task

The cancelRemoteFetchTask method attempts to cancel a remote fetch task but only logs a debug message if cancellation fails. There's no fallback mechanism to ensure resources are released if the task cannot be cancelled. This could lead to resource leakage if multiple cancellation attempts fail, potentially causing memory leaks or thread exhaustion under high load.

Suggested change
private void cancelRemoteFetchTask() {
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);
if (!cancelled) {
log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}",
remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone());
}
}
private void cancelRemoteFetchTask() {
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);
if (!cancelled) {
log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}",
remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone());
// If cancellation failed and task is done, ensure we clean up any resources
if (remoteFetchOpt.get().remoteFetchTask().isDone()) {
try {
// Force get() to ensure any exceptions are handled and resources released
remoteFetchOpt.get().remoteFetchTask().get(0, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// Expected if task completed exceptionally or timed out
log.debug("Forced completion of remote fetch task: {}", e.getMessage());
}
}
}
}

Comment on lines +746 to +756
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error.isPresent()) {
Throwable error = remoteLogReadResult.error.get();
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
shareFetchPartitionData.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
)
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insufficient Error Handling in Remote Fetch Processing

The code handles remote fetch errors by simply wrapping them in a LogReadResult, but doesn't properly classify or sanitize the error information. This could potentially leak sensitive information about the system's internal structure or state through error messages. Additionally, there's no differentiation between different types of errors (e.g., transient network issues vs. permission problems), which could lead to incorrect handling of security-related failures.

Suggested change
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error.isPresent()) {
Throwable error = remoteLogReadResult.error.get();
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
shareFetchPartitionData.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
)
);
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error.isPresent()) {
Throwable error = remoteLogReadResult.error.get();
// Classify and sanitize the error before propagating
Errors kafkaError = Errors.UNKNOWN_SERVER_ERROR;
if (error instanceof TimeoutException) {
kafkaError = Errors.REQUEST_TIMED_OUT;
} else if (error instanceof KafkaStorageException) {
kafkaError = Errors.KAFKA_STORAGE_ERROR;
} else if (error instanceof NotLeaderOrFollowerException) {
kafkaError = Errors.NOT_LEADER_OR_FOLLOWER;
} else if (error instanceof UnknownTopicOrPartitionException) {
kafkaError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
}
// Log the detailed error but return a sanitized error to the client
log.debug("Remote fetch error for partition {}: {}", remoteFetch.topicIdPartition(), error.getMessage());
shareFetchPartitionData.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
ReplicaManager.createLogReadResult(kafkaError.exception()).toFetchPartitionData(false)
)
);

Comment on lines +681 to +707
private boolean maybeCompletePendingRemoteFetch() {
boolean canComplete = false;

TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition();
try {
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
} catch (KafkaStorageException e) { // Case a
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (UnknownTopicOrPartitionException e) { // Case b
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
}

if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
releasePartitionLocks(partitionsAcquired.keySet());
}
return completedByMe;
} else
return false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Race Condition in Remote Fetch Completion

There's a potential race condition in the remote fetch completion logic. The code first checks if the remote fetch can complete due to various conditions, then separately checks if the remoteFetchResult is done. Between these checks, the state could change, leading to inconsistent behavior. Additionally, there's no synchronization mechanism to ensure that only one thread completes the remote fetch operation, which could lead to multiple threads attempting to process the same result or release the same locks.

Suggested change
private boolean maybeCompletePendingRemoteFetch() {
boolean canComplete = false;
TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition();
try {
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
} catch (KafkaStorageException e) { // Case a
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (UnknownTopicOrPartitionException e) { // Case b
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
}
if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
releasePartitionLocks(partitionsAcquired.keySet());
}
return completedByMe;
} else
return false;
private boolean maybeCompletePendingRemoteFetch() {
boolean canComplete = false;
TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition();
try {
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
} catch (KafkaStorageException e) { // Case a
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (UnknownTopicOrPartitionException e) { // Case b
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
}
// Atomically check if we can complete and if so, attempt completion
synchronized(this) {
// Re-check conditions inside synchronized block to prevent race conditions
if ((canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) && !isCompleted()) { // Case d
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
releasePartitionLocks(partitionsAcquired.keySet());
}
return completedByMe;
} else
return false;
}

Comment on lines +824 to +829
} catch (InterruptedException | ExecutionException e) {
log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (Exception e) {
log.error("Unexpected error in processing delayed share fetch request", e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked Exception in Remote Storage Fetch Implementation

The code catches and handles InterruptedException and ExecutionException specifically, but then also has a generic catch for all other exceptions. This is problematic because it could mask serious security issues like SecurityExceptions or AccessControlExceptions that should be handled differently. Additionally, the thread's interrupted status is not restored when catching InterruptedException, which is a common cause of thread management issues that can lead to resource leaks or deadlocks.

Suggested change
} catch (InterruptedException | ExecutionException e) {
log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (Exception e) {
log.error("Unexpected error in processing delayed share fetch request", e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("Thread interrupted while completing remote fetch {} for delayed share fetch request", remoteFetchOpt.get());
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (ExecutionException e) {
log.error("Execution exception in completing remote fetch {} for delayed share fetch request: {}", remoteFetchOpt.get(), e.getCause());
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (SecurityException | AccessControlException e) {
// Handle security-related exceptions differently
log.error("Security exception in remote fetch operation: {}", e.getMessage());
// Consider additional security-specific handling here
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (Exception e) {
log.error("Unexpected error in processing delayed share fetch request", e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);

Copy link

refacto-test bot commented Aug 7, 2025

Remote Storage Fetch Implementation Lacks Proper Error Handling

👍 Well Done
Comprehensive Remote Fetch Implementation

The implementation thoroughly handles both local and remote storage fetch operations with appropriate separation of concerns.

Proper Resource Management

The code attempts to cancel remote fetch tasks when appropriate, preventing resource leaks when tasks are no longer needed.

Defensive Error Handling

The code generally handles exceptions well by wrapping them and propagating them to the appropriate handlers.

📌 Files Processed
  • core/src/main/java/kafka/server/share/DelayedShareFetch.java
  • core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
📝 Additional Comments
core/src/main/java/kafka/server/share/DelayedShareFetch.java (1)
Missing Validation for Remote Fetch Result

The code directly retrieves the fetchDataInfo from remoteLogReadResult without validating that the Optional is present. While the code structure suggests this should always be present when error is not present, there's no explicit validation to ensure this invariant holds. If for some reason both error and fetchDataInfo are empty, this could lead to a NoSuchElementException.

Standards:

  • CWE-754: Improper Check for Unusual or Exceptional Conditions
  • Defensive Programming Principles

@coderabbit-test coderabbit-test deleted a comment from refacto-test bot Aug 7, 2025
@coderabbit-test coderabbit-test deleted a comment from visz11 Aug 7, 2025
@coderabbit-test coderabbit-test deleted a comment from refacto-test bot Aug 7, 2025
@coderabbit-test coderabbit-test deleted a comment from visz11 Aug 7, 2025
Copy link

refacto-test bot commented Aug 7, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants