From dfda8b25af9b761421d0be599c1d80a091e59d0d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Apr 2023 00:12:28 +0800 Subject: [PATCH 01/10] [fix][broker]Fix deadlock of metadata store --- .../coordination/impl/LockManagerImpl.java | 2 +- .../coordination/impl/ResourceLockImpl.java | 26 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java index 4da6b7998a0c4..05edfea65ebb4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java @@ -83,7 +83,7 @@ public CompletableFuture> readLock(String path) { @Override public CompletableFuture> acquireLock(String path, T value) { - ResourceLockImpl lock = new ResourceLockImpl<>(store, serde, path); + ResourceLockImpl lock = new ResourceLockImpl<>(store, serde, path, executor); CompletableFuture> result = new CompletableFuture<>(); lock.acquire(value).thenRun(() -> { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java index 93c994b2436b9..07b52754d10bb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java @@ -21,6 +21,7 @@ import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.common.util.FutureUtil; @@ -46,6 +47,8 @@ public class ResourceLockImpl implements ResourceLock { private boolean revalidateAfterReconnection = false; private final FutureUtil.Sequencer sequencer; + private final Executor executor; + private enum State { Init, Valid, @@ -55,7 +58,7 @@ private enum State { private State state; - public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, String path) { + public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, String path, Executor executor) { this.store = store; this.serde = serde; this.path = path; @@ -63,6 +66,7 @@ public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, Str this.expiredFuture = new CompletableFuture<>(); this.sequencer = FutureUtil.Sequencer.create(); this.state = State.Init; + this.executor = executor; } @Override @@ -96,13 +100,13 @@ public synchronized CompletableFuture release() { CompletableFuture result = new CompletableFuture<>(); store.delete(path, Optional.of(version)) - .thenRun(() -> { + .thenRunAsync(() -> { synchronized (ResourceLockImpl.this) { state = State.Released; } expiredFuture.complete(null); result.complete(null); - }).exceptionally(ex -> { + }, executor).exceptionallyAsync(ex -> { if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { // The lock is not there on release. We can anyway proceed synchronized (ResourceLockImpl.this) { @@ -114,7 +118,7 @@ public synchronized CompletableFuture release() { result.completeExceptionally(ex); } return null; - }); + }, executor); return result; } @@ -169,7 +173,7 @@ private CompletableFuture acquireWithNoRevalidation(T newValue) { CompletableFuture result = new CompletableFuture<>(); store.put(path, payload, Optional.of(version), EnumSet.of(CreateOption.Ephemeral)) - .thenAccept(stat -> { + .thenAcceptAsync(stat -> { synchronized (ResourceLockImpl.this) { state = State.Valid; version = stat.getVersion(); @@ -177,7 +181,7 @@ private CompletableFuture acquireWithNoRevalidation(T newValue) { } log.info("Acquired resource lock on {}", path); result.complete(null); - }).exceptionally(ex -> { + }, executor).exceptionallyAsync(ex -> { if (ex.getCause() instanceof BadVersionException) { result.completeExceptionally( new LockBusyException("Resource at " + path + " is already locked")); @@ -185,7 +189,7 @@ private CompletableFuture acquireWithNoRevalidation(T newValue) { result.completeExceptionally(ex.getCause()); } return null; - }); + }, executor); return result; } @@ -243,7 +247,7 @@ private synchronized CompletableFuture revalidate(T newValue) { log.debug("doRevalidate with newValue={}, version={}", newValue, version); } return store.get(path) - .thenCompose(optGetResult -> { + .thenComposeAsync(optGetResult -> { if (!optGetResult.isPresent()) { // The lock just disappeared, try to acquire it again // Reset the expectation on the version @@ -286,7 +290,7 @@ private synchronized CompletableFuture revalidate(T newValue) { // Reset the expectation that the key is not there anymore setVersion(-1L) ) - .thenCompose(__ -> acquireWithNoRevalidation(newValue)) + .thenComposeAsync(__ -> acquireWithNoRevalidation(newValue), executor) .thenRun(() -> log.info("Successfully re-acquired stale lock at {}", path)); } } @@ -305,10 +309,10 @@ private synchronized CompletableFuture revalidate(T newValue) { // Reset the expectation that the key is not there anymore setVersion(-1L) ) - .thenCompose(__ -> acquireWithNoRevalidation(newValue)) + .thenComposeAsync(__ -> acquireWithNoRevalidation(newValue), executor) .thenRun(() -> log.info("Successfully re-acquired lock at {}", path)); } - }); + }, executor); } private synchronized void setVersion(long version) { From 0d0c315edc219de1145e52b50788086e2d342ea1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Apr 2023 03:08:38 +0800 Subject: [PATCH 02/10] add test --- .../pulsar/broker/MetadataStoreStuckTest.java | 231 ++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java new file mode 100644 index 0000000000000..9e12832551583 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import java.net.Inet4Address; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; +import org.apache.pulsar.broker.namespace.OwnershipCache; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.junit.Assert; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class MetadataStoreStuckTest { + + // prefer inet4. + private static final String LOCALHOST = Inet4Address.getLoopbackAddress().getHostAddress(); + private static final String CLUSTER = "metadata_stuck_cluster"; + private static final String DEFAULT_TENANT = "public"; + private static final String DEFAULT_NAMESPACE = DEFAULT_TENANT + "/default"; + + protected LocalBookkeeperEnsemble bkEnsemble; + protected ServiceConfiguration pulsarConfig; + protected PulsarService pulsarService; + protected int brokerWebServicePort; + protected int brokerServicePort; + protected String metadataServiceUri; + protected BookKeeper bookKeeperClient; + protected String brokerUrl; + protected String brokerServiceUrl; + protected PulsarAdmin pulsarAdmin; + protected PulsarClient pulsarClient; + + @BeforeClass + protected void setup() throws Exception { + log.info("--- Start cluster ---"); + startLocalBookie(); + initPulsarConfig(); + startPulsar(); + } + + @AfterClass + protected void cleanup() throws Exception { + log.info("--- Shutting down ---"); + silentStopPulsar(); + stopLocalBookie(); + } + + protected void startLocalBookie() throws Exception{ + log.info("Start bookie "); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + metadataServiceUri = String.format("zk:%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort()); + initBookieClient(); + } + + protected void initBookieClient() throws Exception { + bookKeeperClient = new BookKeeper(String.format("%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort())); + } + + protected void stopLocalBookie() { + log.info("Close bookie client"); + try { + bookKeeperClient.close(); + // TODO delete bk files. + // TODO delete zk files. + } catch (Exception e){ + log.error("Close bookie client fail", e); + } + log.info("Stop bookie "); + try { + bkEnsemble.stop(); + // TODO delete bk files. + // TODO delete zk files. + } catch (Exception e){ + log.error("Stop bookie fail", e); + } + } + + protected void initPulsarConfig() throws Exception{ + pulsarConfig = new ServiceConfiguration(); + pulsarConfig.setAdvertisedAddress(LOCALHOST); + pulsarConfig.setMetadataStoreUrl(metadataServiceUri); + pulsarConfig.setClusterName(CLUSTER); + pulsarConfig.setTransactionCoordinatorEnabled(false); + pulsarConfig.setAllowAutoTopicCreation(true); + pulsarConfig.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + pulsarConfig.setAutoSkipNonRecoverableData(true); + } + + protected void startPulsar() throws Exception { + log.info("Start pulsar "); + pulsarService = new PulsarService(pulsarConfig); + pulsarService.start(); + brokerWebServicePort = pulsarService.getListenPortHTTP().get(); + brokerServicePort = pulsarService.getBrokerListenPort().get(); + brokerUrl = String.format("http://%s:%s", LOCALHOST, brokerWebServicePort); + brokerServiceUrl = String.format("pulsar://%s:%s", LOCALHOST, brokerServicePort); + initPulsarAdmin(); + initPulsarClient(); + initDefaultNamespace(); + } + + protected void silentStopPulsar() throws Exception { + log.info("Close pulsar client "); + try { + pulsarClient.close(); + }catch (Exception e){ + log.error("Close pulsar client fail", e); + } + log.info("Close pulsar admin "); + try { + pulsarAdmin.close(); + }catch (Exception e){ + log.error("Close pulsar admin fail", e); + } + log.info("Stop pulsar service "); + try { + pulsarService.close(); + }catch (Exception e){ + log.error("Stop pulsar service fail", e); + } + } + + protected void initPulsarAdmin() throws Exception { + pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl).build(); + } + + protected void initPulsarClient() throws Exception { + pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build(); + } + + protected void initDefaultNamespace() throws Exception { + if (!pulsarAdmin.clusters().getClusters().contains(CLUSTER)) { + pulsarAdmin.clusters().createCluster(CLUSTER, ClusterData.builder().serviceUrl(brokerUrl).build()); + } + if (!pulsarAdmin.tenants().getTenants().contains(DEFAULT_TENANT)){ + pulsarAdmin.tenants().createTenant(DEFAULT_TENANT, + TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER)).build()); + } + if (!pulsarAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) { + pulsarAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE, Collections.singleton(CLUSTER)); + } + } + + + @Test + public void testSchemaLedgerLost() throws Exception { + OwnershipCache ownershipCache = pulsarService.getNamespaceService().getOwnershipCache(); + NamespaceName namespaceName = NamespaceName.get(DEFAULT_NAMESPACE); + List bundles = pulsarService.getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName).getBundles(); + String topicName = BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NAMESPACE + "/tp_"); + final NamespaceBundle bundle0 = bundles.get(0); + final NamespaceBundle bundle1 = bundles.get(1); + + // task: lookup. + final AtomicBoolean lookupTaskRunning = new AtomicBoolean(); + final Thread lookupTask = new Thread(() -> { + pulsarService.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join(); + lookupTaskRunning.set(true); + }); + + // task: create topic. + Thread createTopicTask = new Thread(() -> { + acquiringOwnership(ownershipCache, bundle0).thenCompose(res -> { + log.info("Acquire ownership_0: {}, result: {}", bundle0, res); + return acquiringOwnership(ownershipCache, bundle1); + }).thenCompose(res -> { + log.info("Acquire ownership_1: {}, result: {}", bundle1, res); + lookupTask.start(); + Awaitility.await().untilAsserted(() -> { + Assert.assertTrue(lookupTaskRunning.get()); + }); + try {Thread.sleep(1000);} catch (InterruptedException e) {} + List bundleList = pulsarService.getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName).getBundles(); + log.info("get bundle list: {}", bundleList); + return CompletableFuture.completedFuture(res); + }).join(); + }); + + // Verify all tasks will be finished in time. + createTopicTask.start(); + createTopicTask.join(); + lookupTask.join(); + } + + private CompletableFuture acquiringOwnership(OwnershipCache ownershipCache, + NamespaceBundle bundle) { + try { + return ownershipCache.tryAcquiringOwnership(bundle); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } +} \ No newline at end of file From a928f4a387478b71b6b9cbda6b91301c4a5c3917 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Apr 2023 03:14:36 +0800 Subject: [PATCH 03/10] improve test --- .../apache/pulsar/broker/MetadataStoreStuckTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java index 9e12832551583..0e0529d8581fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; @@ -187,12 +188,15 @@ public void testSchemaLedgerLost() throws Exception { final NamespaceBundle bundle0 = bundles.get(0); final NamespaceBundle bundle1 = bundles.get(1); + AtomicInteger finishedTaskCount = new AtomicInteger(0); + // task: lookup. final AtomicBoolean lookupTaskRunning = new AtomicBoolean(); final Thread lookupTask = new Thread(() -> { pulsarService.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() .getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join(); lookupTaskRunning.set(true); + finishedTaskCount.incrementAndGet(); }); // task: create topic. @@ -210,14 +214,15 @@ public void testSchemaLedgerLost() throws Exception { List bundleList = pulsarService.getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName).getBundles(); log.info("get bundle list: {}", bundleList); + finishedTaskCount.incrementAndGet(); return CompletableFuture.completedFuture(res); - }).join(); + }); }); // Verify all tasks will be finished in time. createTopicTask.start(); - createTopicTask.join(); - lookupTask.join(); + + Awaitility.await().untilAsserted(() -> Assert.assertEquals(finishedTaskCount.get(), 2)); } private CompletableFuture acquiringOwnership(OwnershipCache ownershipCache, From f9430aa1dc4d821d810fc37654de7274e1f11a02 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 May 2023 16:18:10 +0800 Subject: [PATCH 04/10] switch to other threads after acquire lock --- .../broker/namespace/OwnershipCache.java | 15 +++++++---- .../coordination/impl/LockManagerImpl.java | 2 +- .../coordination/impl/ResourceLockImpl.java | 26 ++++++++----------- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 7d0b5a4147721..b08dc9395897e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -96,7 +96,7 @@ private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader asyncLoad(NamespaceBundle namespaceBundle, Executor executor) { return lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) - .thenApply(rl -> { + .thenApplyAsync(rl -> { locallyAcquiredLocks.put(namespaceBundle, rl); rl.getLockExpiredFuture() .thenRun(() -> { @@ -106,7 +106,12 @@ public CompletableFuture asyncLoad(NamespaceBundle namespaceBundle, namespaceService.onNamespaceBundleUnload(namespaceBundle); }); return new OwnedBundle(namespaceBundle); - }); + }, executor).exceptionallyAsync(ex -> { + if (ex instanceof RuntimeException){ + throw (RuntimeException) ex; + } + throw new RuntimeException(ex); + }, executor); } } @@ -288,10 +293,10 @@ public Optional> getOwnedBundleAsync(NamespaceBun /** * Disable bundle in local cache and on zk. - * - * @param bundle - * @throws Exception + * @Deprecated This is a dangerous method that will occupy the ZK thread. Please switch to your own thread after + * calling this method, which is currently only used for test */ + @Deprecated public CompletableFuture disableOwnership(NamespaceBundle bundle) { return updateBundleState(bundle, false) .thenCompose(__ -> { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java index 05edfea65ebb4..4da6b7998a0c4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java @@ -83,7 +83,7 @@ public CompletableFuture> readLock(String path) { @Override public CompletableFuture> acquireLock(String path, T value) { - ResourceLockImpl lock = new ResourceLockImpl<>(store, serde, path, executor); + ResourceLockImpl lock = new ResourceLockImpl<>(store, serde, path); CompletableFuture> result = new CompletableFuture<>(); lock.acquire(value).thenRun(() -> { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java index 07b52754d10bb..93c994b2436b9 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java @@ -21,7 +21,6 @@ import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.common.util.FutureUtil; @@ -47,8 +46,6 @@ public class ResourceLockImpl implements ResourceLock { private boolean revalidateAfterReconnection = false; private final FutureUtil.Sequencer sequencer; - private final Executor executor; - private enum State { Init, Valid, @@ -58,7 +55,7 @@ private enum State { private State state; - public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, String path, Executor executor) { + public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, String path) { this.store = store; this.serde = serde; this.path = path; @@ -66,7 +63,6 @@ public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, Str this.expiredFuture = new CompletableFuture<>(); this.sequencer = FutureUtil.Sequencer.create(); this.state = State.Init; - this.executor = executor; } @Override @@ -100,13 +96,13 @@ public synchronized CompletableFuture release() { CompletableFuture result = new CompletableFuture<>(); store.delete(path, Optional.of(version)) - .thenRunAsync(() -> { + .thenRun(() -> { synchronized (ResourceLockImpl.this) { state = State.Released; } expiredFuture.complete(null); result.complete(null); - }, executor).exceptionallyAsync(ex -> { + }).exceptionally(ex -> { if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { // The lock is not there on release. We can anyway proceed synchronized (ResourceLockImpl.this) { @@ -118,7 +114,7 @@ public synchronized CompletableFuture release() { result.completeExceptionally(ex); } return null; - }, executor); + }); return result; } @@ -173,7 +169,7 @@ private CompletableFuture acquireWithNoRevalidation(T newValue) { CompletableFuture result = new CompletableFuture<>(); store.put(path, payload, Optional.of(version), EnumSet.of(CreateOption.Ephemeral)) - .thenAcceptAsync(stat -> { + .thenAccept(stat -> { synchronized (ResourceLockImpl.this) { state = State.Valid; version = stat.getVersion(); @@ -181,7 +177,7 @@ private CompletableFuture acquireWithNoRevalidation(T newValue) { } log.info("Acquired resource lock on {}", path); result.complete(null); - }, executor).exceptionallyAsync(ex -> { + }).exceptionally(ex -> { if (ex.getCause() instanceof BadVersionException) { result.completeExceptionally( new LockBusyException("Resource at " + path + " is already locked")); @@ -189,7 +185,7 @@ private CompletableFuture acquireWithNoRevalidation(T newValue) { result.completeExceptionally(ex.getCause()); } return null; - }, executor); + }); return result; } @@ -247,7 +243,7 @@ private synchronized CompletableFuture revalidate(T newValue) { log.debug("doRevalidate with newValue={}, version={}", newValue, version); } return store.get(path) - .thenComposeAsync(optGetResult -> { + .thenCompose(optGetResult -> { if (!optGetResult.isPresent()) { // The lock just disappeared, try to acquire it again // Reset the expectation on the version @@ -290,7 +286,7 @@ private synchronized CompletableFuture revalidate(T newValue) { // Reset the expectation that the key is not there anymore setVersion(-1L) ) - .thenComposeAsync(__ -> acquireWithNoRevalidation(newValue), executor) + .thenCompose(__ -> acquireWithNoRevalidation(newValue)) .thenRun(() -> log.info("Successfully re-acquired stale lock at {}", path)); } } @@ -309,10 +305,10 @@ private synchronized CompletableFuture revalidate(T newValue) { // Reset the expectation that the key is not there anymore setVersion(-1L) ) - .thenComposeAsync(__ -> acquireWithNoRevalidation(newValue), executor) + .thenCompose(__ -> acquireWithNoRevalidation(newValue)) .thenRun(() -> log.info("Successfully re-acquired lock at {}", path)); } - }, executor); + }); } private synchronized void setVersion(long version) { From 4ff18f7ffea59904620b4705098c345e43266bc1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 May 2023 16:27:01 +0800 Subject: [PATCH 05/10] java comment --- .../org/apache/pulsar/broker/namespace/OwnershipCache.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index b08dc9395897e..4b6d4d805bb74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -293,8 +293,8 @@ public Optional> getOwnedBundleAsync(NamespaceBun /** * Disable bundle in local cache and on zk. - * @Deprecated This is a dangerous method that will occupy the ZK thread. Please switch to your own thread after - * calling this method, which is currently only used for test + * @Deprecated This is a dangerous method which is currently only used for test, it will occupy the ZK thread. + * Please switch to your own thread after calling this method. */ @Deprecated public CompletableFuture disableOwnership(NamespaceBundle bundle) { From f817444a6c1a6772756271dc903c0437e896bdaf Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 May 2023 16:33:31 +0800 Subject: [PATCH 06/10] improve the code --- .../pulsar/broker/namespace/OwnershipCache.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 4b6d4d805bb74..708e0cb7eca1b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -95,7 +95,8 @@ private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader asyncLoad(NamespaceBundle namespaceBundle, Executor executor) { - return lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) + CompletableFuture res = new CompletableFuture<>(); + lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) .thenApplyAsync(rl -> { locallyAcquiredLocks.put(namespaceBundle, rl); rl.getLockExpiredFuture() @@ -106,12 +107,14 @@ public CompletableFuture asyncLoad(NamespaceBundle namespaceBundle, namespaceService.onNamespaceBundleUnload(namespaceBundle); }); return new OwnedBundle(namespaceBundle); - }, executor).exceptionallyAsync(ex -> { - if (ex instanceof RuntimeException){ - throw (RuntimeException) ex; + }, executor).whenCompleteAsync((bundle, ex) -> { + if (ex != null){ + res.completeExceptionally(ex); + } else { + res.complete(bundle); } - throw new RuntimeException(ex); }, executor); + return res; } } From 56e54e4eb79f291cbcf65f723ff6867a2eee8381 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 May 2023 16:34:16 +0800 Subject: [PATCH 07/10] format --- .../broker/namespace/OwnershipCache.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 708e0cb7eca1b..401c401b3b2cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -97,23 +97,23 @@ private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader asyncLoad(NamespaceBundle namespaceBundle, Executor executor) { CompletableFuture res = new CompletableFuture<>(); lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) - .thenApplyAsync(rl -> { - locallyAcquiredLocks.put(namespaceBundle, rl); - rl.getLockExpiredFuture() - .thenRun(() -> { - log.info("Resource lock for {} has expired", rl.getPath()); - namespaceService.unloadNamespaceBundle(namespaceBundle); - invalidateLocalOwnerCache(namespaceBundle); - namespaceService.onNamespaceBundleUnload(namespaceBundle); - }); - return new OwnedBundle(namespaceBundle); - }, executor).whenCompleteAsync((bundle, ex) -> { - if (ex != null){ - res.completeExceptionally(ex); - } else { - res.complete(bundle); - } - }, executor); + .thenApplyAsync(rl -> { + locallyAcquiredLocks.put(namespaceBundle, rl); + rl.getLockExpiredFuture() + .thenRun(() -> { + log.info("Resource lock for {} has expired", rl.getPath()); + namespaceService.unloadNamespaceBundle(namespaceBundle); + invalidateLocalOwnerCache(namespaceBundle); + namespaceService.onNamespaceBundleUnload(namespaceBundle); + }); + return new OwnedBundle(namespaceBundle); + }, executor).whenCompleteAsync((bundle, ex) -> { + if (ex != null){ + res.completeExceptionally(ex); + } else { + res.complete(bundle); + } + }, executor); return res; } } From db05361bd19e1104fefd5a13687308a443c1e11b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 12 May 2023 07:37:33 +0800 Subject: [PATCH 08/10] getBundle(tp) -> getBundleAsync(tp) --- .../broker/namespace/NamespaceService.java | 13 +- .../broker/namespace/OwnershipCache.java | 32 +-- .../pulsar/broker/MetadataStoreStuckTest.java | 236 ------------------ 3 files changed, 19 insertions(+), 262 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a0e2bf7534c02..d07ab3d6dd65d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1156,12 +1156,13 @@ public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) return getBundleAsync(topicName) .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); } - Optional> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); - if (!res.isPresent()) { - return CompletableFuture.completedFuture(false); - } - - return res.get().thenApply(ob -> ob != null && ob.isActive()); + return getBundleAsync(topicName).thenCompose(bundle -> { + Optional> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); + if (!optionalFuture.isPresent()) { + return CompletableFuture.completedFuture(false); + } + return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); + }); } private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 401c401b3b2cd..86003153714cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -95,26 +95,18 @@ private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader asyncLoad(NamespaceBundle namespaceBundle, Executor executor) { - CompletableFuture res = new CompletableFuture<>(); - lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) - .thenApplyAsync(rl -> { - locallyAcquiredLocks.put(namespaceBundle, rl); - rl.getLockExpiredFuture() - .thenRun(() -> { - log.info("Resource lock for {} has expired", rl.getPath()); - namespaceService.unloadNamespaceBundle(namespaceBundle); - invalidateLocalOwnerCache(namespaceBundle); - namespaceService.onNamespaceBundleUnload(namespaceBundle); - }); - return new OwnedBundle(namespaceBundle); - }, executor).whenCompleteAsync((bundle, ex) -> { - if (ex != null){ - res.completeExceptionally(ex); - } else { - res.complete(bundle); - } - }, executor); - return res; + return lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) + .thenApply(rl -> { + locallyAcquiredLocks.put(namespaceBundle, rl); + rl.getLockExpiredFuture() + .thenRun(() -> { + log.info("Resource lock for {} has expired", rl.getPath()); + namespaceService.unloadNamespaceBundle(namespaceBundle); + invalidateLocalOwnerCache(namespaceBundle); + namespaceService.onNamespaceBundleUnload(namespaceBundle); + }); + return new OwnedBundle(namespaceBundle); + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java deleted file mode 100644 index 0e0529d8581fe..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MetadataStoreStuckTest.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker; - -import java.net.Inet4Address; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; -import org.apache.pulsar.broker.namespace.OwnershipCache; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TopicType; -import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; -import org.junit.Assert; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Slf4j -@Test(groups = "broker") -public class MetadataStoreStuckTest { - - // prefer inet4. - private static final String LOCALHOST = Inet4Address.getLoopbackAddress().getHostAddress(); - private static final String CLUSTER = "metadata_stuck_cluster"; - private static final String DEFAULT_TENANT = "public"; - private static final String DEFAULT_NAMESPACE = DEFAULT_TENANT + "/default"; - - protected LocalBookkeeperEnsemble bkEnsemble; - protected ServiceConfiguration pulsarConfig; - protected PulsarService pulsarService; - protected int brokerWebServicePort; - protected int brokerServicePort; - protected String metadataServiceUri; - protected BookKeeper bookKeeperClient; - protected String brokerUrl; - protected String brokerServiceUrl; - protected PulsarAdmin pulsarAdmin; - protected PulsarClient pulsarClient; - - @BeforeClass - protected void setup() throws Exception { - log.info("--- Start cluster ---"); - startLocalBookie(); - initPulsarConfig(); - startPulsar(); - } - - @AfterClass - protected void cleanup() throws Exception { - log.info("--- Shutting down ---"); - silentStopPulsar(); - stopLocalBookie(); - } - - protected void startLocalBookie() throws Exception{ - log.info("Start bookie "); - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); - bkEnsemble.start(); - metadataServiceUri = String.format("zk:%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort()); - initBookieClient(); - } - - protected void initBookieClient() throws Exception { - bookKeeperClient = new BookKeeper(String.format("%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort())); - } - - protected void stopLocalBookie() { - log.info("Close bookie client"); - try { - bookKeeperClient.close(); - // TODO delete bk files. - // TODO delete zk files. - } catch (Exception e){ - log.error("Close bookie client fail", e); - } - log.info("Stop bookie "); - try { - bkEnsemble.stop(); - // TODO delete bk files. - // TODO delete zk files. - } catch (Exception e){ - log.error("Stop bookie fail", e); - } - } - - protected void initPulsarConfig() throws Exception{ - pulsarConfig = new ServiceConfiguration(); - pulsarConfig.setAdvertisedAddress(LOCALHOST); - pulsarConfig.setMetadataStoreUrl(metadataServiceUri); - pulsarConfig.setClusterName(CLUSTER); - pulsarConfig.setTransactionCoordinatorEnabled(false); - pulsarConfig.setAllowAutoTopicCreation(true); - pulsarConfig.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - pulsarConfig.setAutoSkipNonRecoverableData(true); - } - - protected void startPulsar() throws Exception { - log.info("Start pulsar "); - pulsarService = new PulsarService(pulsarConfig); - pulsarService.start(); - brokerWebServicePort = pulsarService.getListenPortHTTP().get(); - brokerServicePort = pulsarService.getBrokerListenPort().get(); - brokerUrl = String.format("http://%s:%s", LOCALHOST, brokerWebServicePort); - brokerServiceUrl = String.format("pulsar://%s:%s", LOCALHOST, brokerServicePort); - initPulsarAdmin(); - initPulsarClient(); - initDefaultNamespace(); - } - - protected void silentStopPulsar() throws Exception { - log.info("Close pulsar client "); - try { - pulsarClient.close(); - }catch (Exception e){ - log.error("Close pulsar client fail", e); - } - log.info("Close pulsar admin "); - try { - pulsarAdmin.close(); - }catch (Exception e){ - log.error("Close pulsar admin fail", e); - } - log.info("Stop pulsar service "); - try { - pulsarService.close(); - }catch (Exception e){ - log.error("Stop pulsar service fail", e); - } - } - - protected void initPulsarAdmin() throws Exception { - pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl).build(); - } - - protected void initPulsarClient() throws Exception { - pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build(); - } - - protected void initDefaultNamespace() throws Exception { - if (!pulsarAdmin.clusters().getClusters().contains(CLUSTER)) { - pulsarAdmin.clusters().createCluster(CLUSTER, ClusterData.builder().serviceUrl(brokerUrl).build()); - } - if (!pulsarAdmin.tenants().getTenants().contains(DEFAULT_TENANT)){ - pulsarAdmin.tenants().createTenant(DEFAULT_TENANT, - TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER)).build()); - } - if (!pulsarAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) { - pulsarAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE, Collections.singleton(CLUSTER)); - } - } - - - @Test - public void testSchemaLedgerLost() throws Exception { - OwnershipCache ownershipCache = pulsarService.getNamespaceService().getOwnershipCache(); - NamespaceName namespaceName = NamespaceName.get(DEFAULT_NAMESPACE); - List bundles = pulsarService.getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName).getBundles(); - String topicName = BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NAMESPACE + "/tp_"); - final NamespaceBundle bundle0 = bundles.get(0); - final NamespaceBundle bundle1 = bundles.get(1); - - AtomicInteger finishedTaskCount = new AtomicInteger(0); - - // task: lookup. - final AtomicBoolean lookupTaskRunning = new AtomicBoolean(); - final Thread lookupTask = new Thread(() -> { - pulsarService.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join(); - lookupTaskRunning.set(true); - finishedTaskCount.incrementAndGet(); - }); - - // task: create topic. - Thread createTopicTask = new Thread(() -> { - acquiringOwnership(ownershipCache, bundle0).thenCompose(res -> { - log.info("Acquire ownership_0: {}, result: {}", bundle0, res); - return acquiringOwnership(ownershipCache, bundle1); - }).thenCompose(res -> { - log.info("Acquire ownership_1: {}, result: {}", bundle1, res); - lookupTask.start(); - Awaitility.await().untilAsserted(() -> { - Assert.assertTrue(lookupTaskRunning.get()); - }); - try {Thread.sleep(1000);} catch (InterruptedException e) {} - List bundleList = pulsarService.getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName).getBundles(); - log.info("get bundle list: {}", bundleList); - finishedTaskCount.incrementAndGet(); - return CompletableFuture.completedFuture(res); - }); - }); - - // Verify all tasks will be finished in time. - createTopicTask.start(); - - Awaitility.await().untilAsserted(() -> Assert.assertEquals(finishedTaskCount.get(), 2)); - } - - private CompletableFuture acquiringOwnership(OwnershipCache ownershipCache, - NamespaceBundle bundle) { - try { - return ownershipCache.tryAcquiringOwnership(bundle); - } catch (Exception e) { - return CompletableFuture.failedFuture(e); - } - } -} \ No newline at end of file From f686e1f6087a132c66d1b231a1c2102c7d2c3e24 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 12 May 2023 07:45:59 +0800 Subject: [PATCH 09/10] deprecated the method NamespaceService.isServiceUnitActive --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d07ab3d6dd65d..b84879f785c68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1137,6 +1137,10 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName())); } + /** + * @Deprecated This method is only used by test. call "isServiceUnitActiveAsync" is better. + */ + @Deprecated public boolean isServiceUnitActive(TopicName topicName) { try { OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName)); From 03ae7588e07ce68701e586b5d79186cfd3c660bf Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 12 May 2023 21:52:25 +0800 Subject: [PATCH 10/10] edit method isServiceUnitActive to make the test work better --- .../broker/namespace/NamespaceService.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index b84879f785c68..9d8d9e3890a19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -38,7 +38,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; @@ -1138,19 +1140,16 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) } /** - * @Deprecated This method is only used by test. call "isServiceUnitActiveAsync" is better. + * @Deprecated This method is only used in test now. */ @Deprecated public boolean isServiceUnitActive(TopicName topicName) { try { - OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName)); - if (ownedBundle == null) { - return false; - } - return ownedBundle.isActive(); - } catch (Exception e) { - LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName, e); - return false; + return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig() + .getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e); + throw new RuntimeException(e); } }