Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Execute the pending callbacks in order before ready for incoming requests #23266

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -1023,7 +1024,13 @@ public void start() throws PulsarServerException {
this.metricsGenerator = new MetricsGenerator(this);

// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
readyForIncomingRequestsFuture.complete(null);
final List<Runnable> runnables;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests);
pendingTasksBeforeReadyForIncomingRequests.clear();
readyForIncomingRequestsFuture.complete(null);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
runnables.forEach(Runnable::run);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down Expand Up @@ -1082,7 +1089,21 @@ public void start() throws PulsarServerException {
}

public void runWhenReadyForIncomingRequests(Runnable runnable) {
readyForIncomingRequestsFuture.thenRun(runnable);
// Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks,
// not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of
// when they were added.
final boolean addedToPendingTasks;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
if (readyForIncomingRequestsFuture.isDone()) {
addedToPendingTasks = false;
} else {
pendingTasksBeforeReadyForIncomingRequests.add(runnable);
addedToPendingTasks = true;
}
}
if (!addedToPendingTasks) {
runnable.run();
}
}

public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {
Expand Down
Loading