From 0bb0dd85358ce7fa9e72b6af6e3bcaf4392a09d2 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Mon, 30 Sep 2024 19:19:17 -0700 Subject: [PATCH 1/6] [fix][broker] timeout if broker registry hangs and monitor registry --- .../extensions/BrokerRegistry.java | 5 +++ .../extensions/BrokerRegistryImpl.java | 26 ++++++++++++--- .../extensions/ExtensibleLoadManagerImpl.java | 18 +++++++++- .../extensions/BrokerRegistryTest.java | 33 +++++++++++++++++++ .../ExtensibleLoadManagerImplTest.java | 15 +++++++++ 5 files changed, 91 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java index 79dba9c63342e..d154edfbb320e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java @@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable { */ boolean isStarted(); + /** + * Return the broker has been registered. + */ + boolean isRegistered(); + /** * Register local broker to metadata store. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index a13b332e6eb5f..45fee528aacdc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -77,10 +77,11 @@ protected enum State { @VisibleForTesting final AtomicReference state = new AtomicReference<>(State.Init); - public BrokerRegistryImpl(PulsarService pulsar) { + @VisibleForTesting + BrokerRegistryImpl(PulsarService pulsar, MetadataCache brokerLookupDataMetadataCache) { this.pulsar = pulsar; this.conf = pulsar.getConfiguration(); - this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class); + this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache; this.scheduler = pulsar.getLoadManagerExecutor(); this.listeners = new ArrayList<>(); this.brokerIdKeyPath = keyPath(pulsar.getBrokerId()); @@ -99,6 +100,10 @@ public BrokerRegistryImpl(PulsarService pulsar) { pulsar.getConfig().lookupProperties()); } + public BrokerRegistryImpl(PulsarService pulsar) { + this(pulsar, pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class)); + } + @Override public synchronized void start() throws PulsarServerException { if (!this.state.compareAndSet(State.Init, State.Started)) { @@ -118,6 +123,12 @@ public boolean isStarted() { return state == State.Started || state == State.Registered; } + @Override + public boolean isRegistered() { + final var state = this.state.get(); + return state == State.Registered; + } + @Override public CompletableFuture registerAsync() { final var state = this.state.get(); @@ -127,9 +138,14 @@ public CompletableFuture registerAsync() { } log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state); return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral)) - .thenAccept(__ -> { - this.state.set(State.Registered); - log.info("[{}] Finished registering self", getBrokerId()); + .orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS) + .whenComplete((__, ex) -> { + if (ex == null) { + this.state.set(State.Registered); + log.info("[{}] Finished registering self", getBrokerId()); + } else { + log.error("[{}] Failed registering self", getBrokerId(), ex); + } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index d8a279b854576..abca2bb398232 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -35,8 +35,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -987,8 +989,12 @@ protected void monitor() { return; } + // Monitor broker registry + // Periodically check the broker registry in case metadata store fails. + validateBrokerRegistry(); + // Monitor role - // Periodically check the role in case ZK watcher fails. + // Periodically check the role in case metadata store fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); if (isChannelOwner) { // System topic config might fail due to the race condition @@ -1087,5 +1093,15 @@ private boolean isPersistentSystemTopicUsed() { .equals(pulsar.getConfiguration().getLoadManagerServiceUnitStateTableViewClassName()); } + private void validateBrokerRegistry() + throws ExecutionException, InterruptedException, TimeoutException { + var timeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + var lookup = brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, TimeUnit.SECONDS); + if (lookup.isEmpty()) { + log.warn("Found this broker:{} has not registered yet. Trying to register it", + brokerRegistry.getBrokerId()); + brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 28a2a18500f5f..941d0e4cbc3a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.loadbalance.extensions; import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -36,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -48,6 +52,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; @@ -396,6 +401,34 @@ public void testKeyPath() { assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId"); } + @Test + public void testRegisterAsyncTimeout() throws Exception { + var pulsar1 = createPulsarService(); + pulsar1.start(); + pulsar1.getConfiguration().setMetadataStoreOperationTimeoutSeconds(1); + var metadataCache = mock(MetadataCache.class); + var brokerRegistry = new BrokerRegistryImpl(pulsar1, metadataCache); + + // happy case + doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any()); + brokerRegistry.start(); + + // unhappy case (timeout) + doAnswer(invocationOnMock -> { + return CompletableFuture.supplyAsync(() -> null, CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS)); + }).when(metadataCache).put(any(), any(), any()); + try { + brokerRegistry.registerAsync().join(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + + // happy case again + doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any()); + brokerRegistry.registerAsync().join(); + } + + private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) { return brokerRegistry.state.get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 7871e612c847a..d8d3e5bb44ffb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -132,6 +132,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; @@ -2106,6 +2107,20 @@ public void compactionScheduleTest() { }); } + @Test(timeOut = 30 * 1000) + public void testMonitorBrokerRegistry() throws MetadataStoreException { + primaryLoadManager.getBrokerRegistry().unregister(); + assertFalse(primaryLoadManager.getBrokerRegistry().isRegistered()); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { // wait until true + primaryLoadManager.monitor(); + assertTrue(primaryLoadManager.getBrokerRegistry().isRegistered()); + }); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override From ead02461d9477a7306397cc9c5e207520eeb888f Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 1 Oct 2024 19:27:05 -0700 Subject: [PATCH 2/6] add health check verfiication --- .../pulsar/broker/admin/impl/BrokersBase.java | 11 ++- .../extensions/BrokerRegistryImpl.java | 55 ++++++++++-- .../channel/ServiceUnitStateChannelImpl.java | 90 ++++++++++++++++--- .../channel/ServiceUnitStateChannelTest.java | 58 ++++++++---- .../apache/pulsar/client/admin/Brokers.java | 12 ++- .../client/admin/internal/BrokersImpl.java | 18 +++- 6 files changed, 199 insertions(+), 45 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 4d0b598a8e4f1..e13cb1858f79d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -48,6 +48,7 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService.State; @@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) { @ApiOperation(value = "Run a healthCheck against the broker") @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is OK"), + @ApiResponse(code = 307, message = "Current broker is not the target broker"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")}) public void healthCheck(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Topic Version") - @QueryParam("topicVersion") TopicVersion topicVersion) { + @QueryParam("topicVersion") TopicVersion topicVersion, + @QueryParam("brokerId") String brokerId) { validateSuperUserAccessAsync() .thenAccept(__ -> checkDeadlockedThreads()) + .thenCompose(__ -> maybeRedirectToBroker( + StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId)) .thenCompose(__ -> internalRunHealthCheck(topicVersion)) .thenAccept(__ -> { LOG.info("[{}] Successfully run health check.", clientAppId()); asyncResponse.resume(Response.ok("ok").build()); }).exceptionally(ex -> { - LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + if (!isRedirectException(ex)) { + LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 45fee528aacdc..24bd19c008ba6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -54,6 +54,10 @@ public class BrokerRegistryImpl implements BrokerRegistry { private final PulsarService pulsar; + private static final int MAX_REGISTER_RETRY = 10; + + private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000; + private final ServiceConfiguration conf; private final BrokerLookupData brokerLookupData; @@ -149,6 +153,35 @@ public CompletableFuture registerAsync() { }); } + private void doRegisterAsyncWithRetries(int retry, CompletableFuture future) { + this.scheduler.schedule(() -> { + registerAsync().whenComplete((__, e) -> { + if (e != null) { + if (retry == MAX_REGISTER_RETRY) { + future.completeExceptionally(new PulsarServerException("Stopped registering self retries", e)); + } else { + doRegisterAsyncWithRetries(retry + 1, future); + } + } else { + future.complete(null); + } + }); + }, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS); + } + + private CompletableFuture registerAsyncWithRetries() { + var future = registerAsync(); + return future.handle((__, e) -> { + if (e != null) { + var retryFuture = new CompletableFuture(); + doRegisterAsyncWithRetries(1, retryFuture); + return retryFuture.join(); + } else { + return null; + } + }); + } + @Override public synchronized void unregister() throws MetadataStoreException { if (state.compareAndSet(State.Registered, State.Unregistering)) { @@ -235,17 +268,25 @@ private void handleMetadataStoreNotification(Notification t) { // The registered node is an ephemeral node that could be deleted when the metadata store client's session // is expired. In this case, we should register again. final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); + + CompletableFuture register; if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { - registerAsync(); + register = registerAsyncWithRetries(); + } else { + register = CompletableFuture.completedFuture(null); } - if (listeners.isEmpty()) { - return; - } - this.scheduler.submit(() -> { - for (BiConsumer listener : listeners) { - listener.accept(brokerId, t.getType()); + // Make sure to run the listeners after re-registered. + register.thenAccept(__ -> { + if (listeners.isEmpty()) { + return; } + this.scheduler.submit(() -> { + for (BiConsumer listener : listeners) { + listener.accept(brokerId, t.getType()); + } + }); }); + } catch (RejectedExecutionException e) { // Executor is shutting down } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ce975495feb2a..27b87dcb6159f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -86,11 +86,13 @@ import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Reflections; @@ -108,6 +110,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10; private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000; + private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3; + private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000; private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema schema; @@ -115,6 +119,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final String brokerId; private final Map> cleanupJobs; private final StateChangeListeners stateChangeListeners; + private BrokerRegistry brokerRegistry; private LeaderElectionService leaderElectionService; @@ -350,6 +355,11 @@ protected LeaderElectionService getLeaderElectionService() { .get().getLeaderElectionService(); } + @VisibleForTesting + protected PulsarAdmin getPulsarAdmin() throws PulsarServerException { + return pulsar.getAdminClient(); + } + @Override public synchronized void close() throws PulsarServerException { channelState = Closed; @@ -1255,20 +1265,24 @@ private MetadataState getMetadataState() { } private void handleBrokerCreationEvent(String broker) { - CompletableFuture future = cleanupJobs.remove(broker); - if (future != null) { - future.cancel(false); - totalInactiveBrokerCleanupCancelledCnt++; - log.info("Successfully cancelled the ownership cleanup for broker:{}." - + " Active cleanup job count:{}", - broker, cleanupJobs.size()); - } else { - if (debug()) { - log.info("No needs to cancel the ownership cleanup for broker:{}." - + " There was no scheduled cleanup job. Active cleanup job count:{}", - broker, cleanupJobs.size()); - } - } + + healthCheckBrokerAsync(broker) + .thenAccept(__ -> { + CompletableFuture future = cleanupJobs.remove(broker); + if (future != null) { + future.cancel(false); + totalInactiveBrokerCleanupCancelledCnt++; + log.info("Successfully cancelled the ownership cleanup for broker:{}." + + " Active cleanup job count:{}", + broker, cleanupJobs.size()); + } else { + if (debug()) { + log.info("No needs to cancel the ownership cleanup for broker:{}." + + " There was no scheduled cleanup job. Active cleanup job count:{}", + broker, cleanupJobs.size()); + } + } + }); } private void handleBrokerDeletionEvent(String broker) { @@ -1431,6 +1445,37 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max System.currentTimeMillis() - started); } + private CompletableFuture healthCheckBrokerAsync(String brokerId) { + CompletableFuture future = new CompletableFuture<>(); + doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future); + return future; + } + + private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture future) { + try { + var admin = getPulsarAdmin(); + admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId)) + .whenComplete((__, e) -> { + if (e == null) { + log.info("Completed health-check broker :{}", brokerId, e); + future.complete(null); + return; + } + if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) { + log.error("Failed health-check broker :{}", brokerId, e); + future.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + } else { + pulsar.getExecutor() + .schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future), + Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50), + MILLISECONDS); + } + }); + } catch (PulsarServerException e) { + future.completeExceptionally(e); + } + } + private synchronized void doCleanup(String broker, boolean gracefully) { try { if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) @@ -1444,6 +1489,23 @@ private synchronized void doCleanup(String broker, boolean gracefully) { return; } + // if not gracefully, verify the broker is inactive by health-check. + if (!gracefully) { + try { + healthCheckBrokerAsync(broker).get( + pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup", + broker); + return; + } catch (Exception e) { + if (debug()) { + log.info("Failed to check broker:{} health", broker, e); + } + log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker); + } + } + + long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 92cdf61f44269..7e58cc4502ac5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -89,6 +90,8 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.Brokers; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -136,10 +139,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { private BrokerRegistryImpl registry; + private PulsarAdmin pulsarAdmin; + private ExtensibleLoadManagerImpl loadManager; private final String serviceUnitStateTableViewClassName; + private Brokers brokers; + @DataProvider(name = "serviceUnitStateTableViewClassName") public static Object[][] serviceUnitStateTableViewClassName() { return new Object[][]{ @@ -174,7 +181,8 @@ protected void setup() throws Exception { admin.namespaces().createNamespace(namespaceName2); pulsar1 = pulsar; - registry = new BrokerRegistryImpl(pulsar); + registry = spy(new BrokerRegistryImpl(pulsar1)); + pulsarAdmin = spy(pulsar.getAdminClient()); loadManagerContext = mock(LoadManagerContext.class); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore(); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore(); @@ -207,6 +215,10 @@ protected void setup() throws Exception { childBundle31 = namespaceName2 + "/" + childBundle1Range; childBundle32 = namespaceName2 + "/" + childBundle2Range; + + brokers = mock(Brokers.class); + doReturn(CompletableFuture.failedFuture(new RuntimeException("failed"))).when(brokers) + .healthcheckAsync(any(), any()); } @BeforeMethod @@ -220,6 +232,7 @@ protected void initChannels() throws Exception { cleanMetadataState(channel1); cleanMetadataState(channel2); enableChannels(); + reset(pulsarAdmin); } @@ -719,17 +732,19 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { @Test(priority = 8) public void handleBrokerCreationEventTest() throws IllegalAccessException { var cleanupJobs = getCleanupJobs(channel1); - String broker = "broker-1"; + String broker = brokerId2; var future = new CompletableFuture(); cleanupJobs.put(broker, future); ((ServiceUnitStateChannelImpl) channel1).handleBrokerRegistrationEvent(broker, NotificationType.Created); - assertEquals(0, cleanupJobs.size()); - assertTrue(future.isCancelled()); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, cleanupJobs.size()); + assertTrue(future.isCancelled()); + }); + } @Test(priority = 9) - public void handleBrokerDeletionEventTest() - throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + public void handleBrokerDeletionEventTest() throws Exception { var cleanupJobs1 = getCleanupJobs(channel1); var cleanupJobs2 = getCleanupJobs(channel2); @@ -782,8 +797,12 @@ public void handleBrokerDeletionEventTest() System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + leaderChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(brokerId2, @@ -841,6 +860,7 @@ public void handleBrokerDeletionEventTest() 3, 0, 0); + reset(pulsarAdmin); // broker is back online leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Created); @@ -865,6 +885,7 @@ public void handleBrokerDeletionEventTest() // broker is offline again + doReturn(brokers).when(pulsarAdmin).brokers(); FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3, true); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); @@ -906,6 +927,7 @@ public void handleBrokerDeletionEventTest() 4, 0, 1); + reset(pulsarAdmin); // test unstable state channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker))); @@ -1585,9 +1607,12 @@ public void testOverrideInactiveBrokerStateData() System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + waitUntilNewOwner(channel2, releasingBundle, brokerId2); waitUntilNewOwner(channel2, childBundle11, brokerId2); waitUntilNewOwner(channel2, childBundle12, brokerId2); @@ -1600,7 +1625,7 @@ public void testOverrideInactiveBrokerStateData() // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); cleanTableViews(); - + reset(pulsarAdmin); } @Test(priority = 19) @@ -1736,13 +1761,10 @@ public void testActiveGetOwner() throws Exception { } // case 5: the owner lookup gets delayed - var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); - FieldUtils.writeDeclaredField(channel1, - "brokerRegistry", spyRegistry, true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1000, true); var delayedFuture = new CompletableFuture(); - doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker)); + doReturn(delayedFuture).when(registry).lookupAsync(eq(broker)); CompletableFuture.runAsync(() -> { try { Thread.sleep(500); @@ -1760,7 +1782,7 @@ public void testActiveGetOwner() throws Exception { // case 6: the owner is inactive doReturn(CompletableFuture.completedFuture(Optional.empty())) - .when(spyRegistry).lookupAsync(eq(broker)); + .when(registry).lookupAsync(eq(broker)); // verify getOwnerAsync times out start = System.currentTimeMillis(); @@ -1778,9 +1800,11 @@ public void testActiveGetOwner() throws Exception { if (leader1.equals(brokerId2)) { leaderChannel = (ServiceUnitStateChannelImpl) channel2; } + System.out.println("$$$ running"); leaderChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout @@ -1792,7 +1816,7 @@ public void testActiveGetOwner() throws Exception { waitUntilState(channel2, bundle, Init); assertTrue(System.currentTimeMillis() - start < 20_000); - + reset(pulsarAdmin); // case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel try { disableChannels(); @@ -1807,6 +1831,7 @@ public void testActiveGetOwner() throws Exception { FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); getCleanupJobs(leaderChannel).clear(); + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); // verify the ownership cleanup, and channel's getOwnerAsync returns brokerId1 without timeout @@ -1817,10 +1842,8 @@ public void testActiveGetOwner() throws Exception { // test clean-up FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); - FieldUtils.writeDeclaredField(channel1, - "brokerRegistry", registry, true); cleanTableViews(); - + reset(pulsarAdmin); } @Test(priority = 21) @@ -2253,7 +2276,7 @@ private static void validateMonitorCounters(ServiceUnitStateChannel channel, } ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) - throws IllegalAccessException { + throws IllegalAccessException, PulsarServerException { var tmpChannel = new ServiceUnitStateChannelImpl(pulsar); FieldUtils.writeDeclaredField(tmpChannel, "ownershipMonitorDelayTimeInSecs", 5, true); var channel = spy(tmpChannel); @@ -2261,6 +2284,7 @@ ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) doReturn(loadManagerContext).when(channel).getContext(); doReturn(registry).when(channel).getBrokerRegistry(); doReturn(loadManager).when(channel).getLoadManager(); + doReturn(pulsarAdmin).when(channel).getPulsarAdmin(); var leaderElectionService = new LeaderElectionService( diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index dc0b7c9885a9a..eed73f38282ac 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -320,10 +321,19 @@ Map getOwnedNamespaces(String cluster, String */ void healthcheck(TopicVersion topicVersion) throws PulsarAdminException; + /** + * Run a healthcheck on the target broker or on the broker. + * @param brokerId target broker id to check the health. If empty, it checks the health on the connected broker. + * + * @throws PulsarAdminException if the healthcheck fails. + */ + void healthcheck(TopicVersion topicVersion, Optional brokerId) throws PulsarAdminException; + /** * Run a healthcheck on the broker asynchronously. */ - CompletableFuture healthcheckAsync(TopicVersion topicVersion); + CompletableFuture healthcheckAsync(TopicVersion topicVersion, Optional brokerId); + /** * Trigger the current broker to graceful-shutdown asynchronously. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index b82c3fd0f414b..35b261b196eee 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.ws.rs.client.Entity; import javax.ws.rs.client.InvocationCallback; @@ -168,26 +169,35 @@ public CompletableFuture backlogQuotaCheckAsync() { @Override @Deprecated public void healthcheck() throws PulsarAdminException { - healthcheck(TopicVersion.V1); + healthcheck(TopicVersion.V1, Optional.empty()); } @Override @Deprecated public CompletableFuture healthcheckAsync() { - return healthcheckAsync(TopicVersion.V1); + return healthcheckAsync(TopicVersion.V1, Optional.empty()); } + @Override public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException { - sync(() -> healthcheckAsync(topicVersion)); + sync(() -> healthcheckAsync(topicVersion, Optional.empty())); } @Override - public CompletableFuture healthcheckAsync(TopicVersion topicVersion) { + public void healthcheck(TopicVersion topicVersion, Optional brokerId) throws PulsarAdminException { + sync(() -> healthcheckAsync(topicVersion, brokerId)); + } + + @Override + public CompletableFuture healthcheckAsync(TopicVersion topicVersion, Optional brokerId) { WebTarget path = adminBrokers.path("health"); if (topicVersion != null) { path = path.queryParam("topicVersion", topicVersion); } + if (brokerId.isPresent()) { + path = path.queryParam("brokerId", brokerId.get()); + } final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() { From e790d7bb0ceffa9375327345f75dd51987b62f64 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 2 Oct 2024 05:49:33 -0700 Subject: [PATCH 3/6] skip active check when not registered --- .../extensions/BrokerRegistryImpl.java | 18 ++++++------------ .../channel/ServiceUnitStateChannelImpl.java | 8 ++++++++ .../channel/ServiceUnitStateChannelTest.java | 14 +++++++++++++- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 24bd19c008ba6..4e967074c7808 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -52,12 +52,10 @@ @Slf4j public class BrokerRegistryImpl implements BrokerRegistry { - private final PulsarService pulsar; - - private static final int MAX_REGISTER_RETRY = 10; - private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000; + private final PulsarService pulsar; + private final ServiceConfiguration conf; private final BrokerLookupData brokerLookupData; @@ -154,14 +152,10 @@ public CompletableFuture registerAsync() { } private void doRegisterAsyncWithRetries(int retry, CompletableFuture future) { - this.scheduler.schedule(() -> { + pulsar.getExecutor().schedule(() -> { registerAsync().whenComplete((__, e) -> { if (e != null) { - if (retry == MAX_REGISTER_RETRY) { - future.completeExceptionally(new PulsarServerException("Stopped registering self retries", e)); - } else { - doRegisterAsyncWithRetries(retry + 1, future); - } + doRegisterAsyncWithRetries(retry + 1, future); } else { future.complete(null); } @@ -170,8 +164,7 @@ private void doRegisterAsyncWithRetries(int retry, CompletableFuture futur } private CompletableFuture registerAsyncWithRetries() { - var future = registerAsync(); - return future.handle((__, e) -> { + return registerAsync().handle((__, e) -> { if (e != null) { var retryFuture = new CompletableFuture(); doRegisterAsyncWithRetries(1, retryFuture); @@ -271,6 +264,7 @@ private void handleMetadataStoreNotification(Notification t) { CompletableFuture register; if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { + this.state.set(State.Started); register = registerAsyncWithRetries(); } else { register = CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 27b87dcb6159f..148c146c74063 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -458,6 +458,14 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { + + // If this broker's registry does not exist(possibly suffering from connecting to the metadata store), + // we return the owner without its activeness check. + // This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable. + if (!brokerRegistry.isRegistered()) { + return CompletableFuture.completedFuture(owner); + } + return dedupeGetOwnerRequest(serviceUnit) .thenCompose(newOwner -> { if (newOwner == null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 7e58cc4502ac5..b6e38d4f6956c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -182,6 +182,7 @@ protected void setup() throws Exception { pulsar1 = pulsar; registry = spy(new BrokerRegistryImpl(pulsar1)); + registry.start(); pulsarAdmin = spy(pulsar.getAdminClient()); loadManagerContext = mock(LoadManagerContext.class); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore(); @@ -1790,6 +1791,18 @@ public void testActiveGetOwner() throws Exception { assertTrue(ex.getCause() instanceof IllegalStateException); assertTrue(System.currentTimeMillis() - start >= 1000); + try { + // verify getOwnerAsync returns immediately when not registered + registry.unregister(); + start = System.currentTimeMillis(); + assertEquals(broker, channel1.getOwnerAsync(bundle).get().get()); + elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed < 1000); + } finally { + registry.registerAsync().join(); + } + + // case 7: the ownership cleanup(no new owner) by the leader channel doReturn(CompletableFuture.completedFuture(Optional.empty())) .when(loadManager).selectAsync(any(), any(), any()); @@ -1800,7 +1813,6 @@ public void testActiveGetOwner() throws Exception { if (leader1.equals(brokerId2)) { leaderChannel = (ServiceUnitStateChannelImpl) channel2; } - System.out.println("$$$ running"); leaderChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); From f812acdc5f7529e3ebdda1a1b950a6724a479b89 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 2 Oct 2024 13:58:05 -0700 Subject: [PATCH 4/6] registerAsyncWithRetries on executor thread only --- .../loadbalance/extensions/BrokerRegistryImpl.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 4e967074c7808..afa6a099a273b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -164,15 +164,9 @@ private void doRegisterAsyncWithRetries(int retry, CompletableFuture futur } private CompletableFuture registerAsyncWithRetries() { - return registerAsync().handle((__, e) -> { - if (e != null) { - var retryFuture = new CompletableFuture(); - doRegisterAsyncWithRetries(1, retryFuture); - return retryFuture.join(); - } else { - return null; - } - }); + var retryFuture = new CompletableFuture(); + doRegisterAsyncWithRetries(1, retryFuture); + return retryFuture; } @Override From 36c455b6df281c89ac1c6e8c538032b4da392aba Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 2 Oct 2024 15:17:48 -0700 Subject: [PATCH 5/6] improved handleBrokerCreationEvent --- .../channel/ServiceUnitStateChannelImpl.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 148c146c74063..49d038d512e59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1274,23 +1274,25 @@ private MetadataState getMetadataState() { private void handleBrokerCreationEvent(String broker) { - healthCheckBrokerAsync(broker) - .thenAccept(__ -> { - CompletableFuture future = cleanupJobs.remove(broker); - if (future != null) { - future.cancel(false); - totalInactiveBrokerCleanupCancelledCnt++; - log.info("Successfully cancelled the ownership cleanup for broker:{}." - + " Active cleanup job count:{}", - broker, cleanupJobs.size()); - } else { - if (debug()) { - log.info("No needs to cancel the ownership cleanup for broker:{}." - + " There was no scheduled cleanup job. Active cleanup job count:{}", + if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) { + healthCheckBrokerAsync(broker) + .thenAccept(__ -> { + CompletableFuture future = cleanupJobs.remove(broker); + if (future != null) { + future.cancel(false); + totalInactiveBrokerCleanupCancelledCnt++; + log.info("Successfully cancelled the ownership cleanup for broker:{}." + + " Active cleanup job count:{}", broker, cleanupJobs.size()); + } else { + if (debug()) { + log.info("No needs to cancel the ownership cleanup for broker:{}." + + " There was no scheduled cleanup job. Active cleanup job count:{}", + broker, cleanupJobs.size()); + } } - } - }); + }); + } } private void handleBrokerDeletionEvent(String broker) { From 051295dedd74ac14d47d337a6d873a9f1716f317 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 2 Oct 2024 17:07:41 -0700 Subject: [PATCH 6/6] registerAsyncWithRetries asap --- .../broker/loadbalance/extensions/BrokerRegistryImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index afa6a099a273b..5a8307df27a63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -165,7 +165,7 @@ private void doRegisterAsyncWithRetries(int retry, CompletableFuture futur private CompletableFuture registerAsyncWithRetries() { var retryFuture = new CompletableFuture(); - doRegisterAsyncWithRetries(1, retryFuture); + doRegisterAsyncWithRetries(0, retryFuture); return retryFuture; }