From 5a1ef1cd57b5df2d0367ac4ef15a4405c1b21323 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 6 Sep 2024 22:41:44 +0800 Subject: [PATCH 1/3] [fix][broker] Execute the pending callbacks in order before ready for incoming requests --- .../apache/pulsar/broker/PulsarService.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0b994c640a9f5..86b6c6e98edca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -298,6 +298,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final DefaultMonotonicSnapshotClock monotonicSnapshotClock; private String brokerId; private final CompletableFuture readyForIncomingRequestsFuture = new CompletableFuture<>(); + private final List pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>(); public enum State { Init, Started, Closing, Closed @@ -1023,7 +1024,11 @@ public void start() throws PulsarServerException { this.metricsGenerator = new MetricsGenerator(this); // the broker is ready to accept incoming requests by Pulsar binary protocol and http/https - readyForIncomingRequestsFuture.complete(null); + synchronized (pendingTasksBeforeReadyForIncomingRequests) { + pendingTasksBeforeReadyForIncomingRequests.forEach(Runnable::run); + pendingTasksBeforeReadyForIncomingRequests.clear(); + readyForIncomingRequestsFuture.complete(null); + } // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, @@ -1082,7 +1087,16 @@ public void start() throws PulsarServerException { } public void runWhenReadyForIncomingRequests(Runnable runnable) { - readyForIncomingRequestsFuture.thenRun(runnable); + // Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks, + // not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of + // when they were added. + synchronized (pendingTasksBeforeReadyForIncomingRequests) { + if (readyForIncomingRequestsFuture.isDone()) { + runnable.run(); + } else { + pendingTasksBeforeReadyForIncomingRequests.add(runnable); + } + } } public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException { From cf0394d0bc24ca6cef215aed7b316e78b4d27db5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 7 Sep 2024 19:11:42 +0800 Subject: [PATCH 2/3] Avoid unnecessary lock once the service is ready --- .../main/java/org/apache/pulsar/broker/PulsarService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 86b6c6e98edca..13948e4920189 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1090,13 +1090,18 @@ public void runWhenReadyForIncomingRequests(Runnable runnable) { // Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks, // not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of // when they were added. + final boolean addedToPendingTasks; synchronized (pendingTasksBeforeReadyForIncomingRequests) { if (readyForIncomingRequestsFuture.isDone()) { - runnable.run(); + addedToPendingTasks = false; } else { pendingTasksBeforeReadyForIncomingRequests.add(runnable); + addedToPendingTasks = true; } } + if (!addedToPendingTasks) { + runnable.run(); + } } public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException { From 12b0c6032dce9da9643b167c81f7446a363de161 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 8 Sep 2024 00:00:00 +0800 Subject: [PATCH 3/3] Execute the runnables after the future is completed --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 13948e4920189..b2e67bf4883dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1024,11 +1024,13 @@ public void start() throws PulsarServerException { this.metricsGenerator = new MetricsGenerator(this); // the broker is ready to accept incoming requests by Pulsar binary protocol and http/https + final List runnables; synchronized (pendingTasksBeforeReadyForIncomingRequests) { - pendingTasksBeforeReadyForIncomingRequests.forEach(Runnable::run); + runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests); pendingTasksBeforeReadyForIncomingRequests.clear(); readyForIncomingRequestsFuture.complete(null); } + runnables.forEach(Runnable::run); // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready,