From 034367198b9716838d5ed82354b6a42b9428c867 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 13 Sep 2024 11:14:08 +0800 Subject: [PATCH] [fix][broker] Fail fast if the extensible load manager failed to start (#23297) (cherry picked from commit fc60ec06ae98fa4000473a636bfb06729c210048) --- .../pulsar/broker/PulsarServerException.java | 17 +++ .../apache/pulsar/broker/PulsarService.java | 2 +- .../extensions/ExtensibleLoadManagerImpl.java | 122 ++++++------------ .../extensions/LoadManagerFailFastTest.java | 120 +++++++++++++++++ 4 files changed, 179 insertions(+), 82 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java index 2235b9a7128b8..d7c0d0adb3afc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; import java.io.IOException; +import java.util.concurrent.CompletionException; public class PulsarServerException extends IOException { private static final long serialVersionUID = 1; @@ -44,4 +45,20 @@ public NotFoundException(Throwable t) { super(t); } } + + public static PulsarServerException from(Throwable throwable) { + if (throwable instanceof CompletionException) { + return from(throwable.getCause()); + } + if (throwable instanceof PulsarServerException pulsarServerException) { + return pulsarServerException; + } else { + return new PulsarServerException(throwable); + } + } + + // Wrap this checked exception into a specific unchecked exception + public static CompletionException toUncheckedException(PulsarServerException e) { + return new CompletionException(e); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1c4d3b168b622..392e536e39db9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1001,7 +1001,7 @@ public void start() throws PulsarServerException { state = State.Started; } catch (Exception e) { LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e); - PulsarServerException startException = new PulsarServerException(e); + PulsarServerException startException = PulsarServerException.from(e); readyForIncomingRequestsFuture.completeExceptionally(startException); throw startException; } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 40efa6390a78a..8e34f2f697fb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -81,7 +81,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; -import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory; @@ -99,10 +98,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -125,10 +121,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; - public static final int STARTUP_TIMEOUT_SECONDS = 30; - - public static final int MAX_RETRY = 5; - private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set INTERNAL_TOPICS = @@ -212,7 +204,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private final ConcurrentHashMap>> lookupRequests = new ConcurrentHashMap<>(); - private final CompletableFuture initWaiter = new CompletableFuture<>(); + private final CompletableFuture initWaiter = new CompletableFuture<>(); /** * Get all the bundles that are owned by this broker. @@ -385,7 +377,7 @@ public void start() throws PulsarServerException { return; } try { - this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.brokerRegistry = createBrokerRegistry(pulsar); this.leaderElectionService = new LeaderElectionService( pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, @@ -400,53 +392,14 @@ public void start() throws PulsarServerException { }); }); }); - this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); + this.serviceUnitStateChannel = createServiceUnitStateChannel(pulsar); this.brokerRegistry.start(); this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter, pulsar.getBrokerId()); this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); - pulsar.runWhenReadyForIncomingRequests(() -> { - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) - .create(); - int retry = 0; - while (!Thread.currentThread().isInterrupted()) { - try { - brokerRegistry.register(); - this.serviceUnitStateChannel.start(); - break; - } catch (Exception e) { - log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", - pulsar.getBrokerId(), ++retry, e); - try { - Thread.sleep(backoff.next()); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - // preserve thread's interrupt status - Thread.currentThread().interrupt(); - try { - pulsar.close(); - } catch (PulsarServerException exc) { - log.error("Failed to close pulsar service.", exc); - } - return; - } - failStarting(e); - if (retry >= MAX_RETRY) { - log.error("Failed to start the service unit state channel after retry {} th. " - + "Closing pulsar service.", retry, e); - try { - pulsar.close(); - } catch (PulsarServerException ex) { - log.error("Failed to close pulsar service.", ex); - } - } - } - } - }); + this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); @@ -455,15 +408,10 @@ public void start() throws PulsarServerException { SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - - try { - this.brokerLoadDataStore = LoadDataStoreFactory - .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.topBundlesLoadDataStore = LoadDataStoreFactory - .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); - } catch (LoadDataStoreException e) { - throw new PulsarServerException(e); - } + this.brokerLoadDataStore = LoadDataStoreFactory + .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + this.topBundlesLoadDataStore = LoadDataStoreFactory + .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); this.context = LoadManagerContextImpl.builder() .configuration(conf) @@ -487,6 +435,7 @@ public void start() throws PulsarServerException { pulsar.runWhenReadyForIncomingRequests(() -> { try { + this.serviceUnitStateChannel.start(); var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() @@ -521,38 +470,33 @@ public void start() throws PulsarServerException { MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.splitScheduler.start(); - this.initWaiter.complete(null); + this.initWaiter.complete(true); this.started = true; log.info("Started load manager."); - } catch (Exception ex) { - failStarting(ex); + } catch (Throwable e) { + failStarting(e); } }); - } catch (Exception ex) { + } catch (Throwable ex) { failStarting(ex); } } - private void failStarting(Exception ex) { - log.error("Failed to start the extensible load balance and close broker registry {}.", - this.brokerRegistry, ex); + private void failStarting(Throwable throwable) { if (this.brokerRegistry != null) { try { - brokerRegistry.unregister(); - } catch (MetadataStoreException e) { - // ignore - } - } - if (this.serviceUnitStateChannel != null) { - try { - serviceUnitStateChannel.close(); - } catch (IOException e) { - // ignore + brokerRegistry.close(); + } catch (PulsarServerException e) { + // If close failed, this broker might still exist in the metadata store. Then it could be found by other + // brokers as an available broker. Hence, print a warning log for it. + log.warn("Failed to close the broker registry: {}", e.getMessage()); } } - initWaiter.completeExceptionally(ex); + initWaiter.complete(false); // exit the background thread gracefully + throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable)); } + @Override public void initialize(PulsarService pulsar) { this.pulsar = pulsar; @@ -897,7 +841,9 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; @@ -947,7 +893,9 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; @@ -1018,7 +966,9 @@ private List getIgnoredCommandMetrics(String advertisedBrokerAddress) { @VisibleForTesting protected void monitor() { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } // Monitor role // Periodically check the role in case ZK watcher fails. @@ -1073,4 +1023,14 @@ private void closeInternalTopics() { log.warn("Failed to wait for closing internal topics", e); } } + + @VisibleForTesting + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + return new BrokerRegistryImpl(pulsar); + } + + @VisibleForTesting + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + return new ServiceUnitStateChannelImpl(pulsar); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java new file mode 100644 index 0000000000000..a400bf733e557 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.common.util.PortManager; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class LoadManagerFailFastTest { + + private static final String cluster = "test"; + private final int zkPort = PortManager.nextLockedFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final ServiceConfiguration config = new ServiceConfiguration(); + + @BeforeClass + protected void setup() throws Exception { + bk.start(); + config.setClusterName(cluster); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:localhost:" + zkPort); + } + + @AfterClass + protected void cleanup() throws Exception { + bk.stop(); + } + + @Test(timeOut = 30000) + public void testBrokerRegistryFailure() throws Exception { + config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry"); + } + Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get() + .isEmpty()); + } + + @Test(timeOut = 30000) + public void testServiceUnitStateChannelFailure() throws Exception { + config.setLoadManagerClassName(ChannelLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel"); + } + Awaitility.await().untilAsserted(() -> Assert.assertTrue(pulsar.getLocalMetadataStore() + .getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty())); + } + + private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + final var mockBrokerRegistry = Mockito.mock(BrokerRegistryImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start BrokerRegistry")).when(mockBrokerRegistry) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + return mockBrokerRegistry; + } + } + + private static class ChannelLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + final var channel = Mockito.mock(ServiceUnitStateChannelImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start ServiceUnitStateChannel")).when(channel) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any()); + return channel; + } + } +}