From 620fe9be239fbb35c2db279660a8634410f9f357 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Apr 2024 13:40:18 +0800 Subject: [PATCH] [fix][broker] One topic can be closed multiple times concurrently (#17524) (cherry picked from commit 93afd89b047ac56d3b7e476f578993197cf41935) --- .../service/persistent/PersistentTopic.java | 122 +++++++++++++++--- .../broker/service/OneWayReplicatorTest.java | 25 ++-- .../persistent/PersistentTopicTest.java | 81 +++++++++++- .../apache/pulsar/common/util/FutureUtil.java | 33 ++++- 4 files changed, 233 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3933706da8232..904e1ed670eff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -258,6 +259,37 @@ protected TopicStatsHelper initialValue() { @Getter private final ExecutorService orderedExecutor; + private volatile CloseFutures closeFutures; + + /*** + * We use 2 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return + * the in-progress one when it is called the second time. + * + * The topic closing will be called the below scenarios: + * 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}. + * 2. Namespace bundle unloading. The unloading topic triggered by unloading namespace bundles will not wait for + * clients disconnect. See {@link CloseFutures#notWaitDisconnectClients}. + * + * The two futures will be setting as the below rule: + * Event: Topic close. + * - If the first one closing is called by "close and not wait for clients disconnect": + * - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect". + * - If the first one closing is called by "close and wait for clients disconnect", the two futures will be + * initialized as "waiting for clients disconnect". + * Event: Topic delete. + * the three futures will be initialized as "waiting for clients disconnect". + */ + private class CloseFutures { + private final CompletableFuture notWaitDisconnectClients; + private final CompletableFuture waitDisconnectClients; + + public CloseFutures(CompletableFuture waitDisconnectClients, + CompletableFuture notWaitDisconnectClients) { + this.waitDisconnectClients = waitDisconnectClients; + this.notWaitDisconnectClients = notWaitDisconnectClients; + } + } + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -1349,8 +1381,10 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting + // Mark the progress of close to prevent close calling concurrently. + this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture()); - return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() + CompletableFuture res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> { CompletableFuture deleteFuture = new CompletableFuture<>(); @@ -1453,6 +1487,10 @@ public void deleteLedgerComplete(Object ctx) { unfenceTopicToResume(); } }); + + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res); + FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res); + return res; } finally { lock.writeLock().unlock(); } @@ -1463,6 +1501,11 @@ public CompletableFuture close() { return close(false); } + private enum CloseTypes { + notWaitDisconnectClients, + waitDisconnectClients; + } + /** * Close this topic - close all producers and subscriptions associated with this topic. * @@ -1471,19 +1514,32 @@ public CompletableFuture close() { */ @Override public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { - CompletableFuture closeFuture = new CompletableFuture<>(); - lock.writeLock().lock(); - try { + CloseTypes closeType; + if (closeWithoutWaitingClientDisconnect) { + closeType = CloseTypes.notWaitDisconnectClients; + } else { // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker // forcefully wants to close managed-ledger without waiting all resources to be closed. - if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) { - fenceTopicToCloseOrDelete(); - } else { - log.warn("[{}] Topic is already being closed or deleted", topic); - closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced")); - return closeFuture; + closeType = CloseTypes.waitDisconnectClients; + } + + lock.writeLock().lock(); + try { + // Return in-progress future if exists. + if (isClosingOrDeleting) { + switch (closeType) { + case notWaitDisconnectClients -> { + return closeFutures.notWaitDisconnectClients; + } + case waitDisconnectClients -> { + return closeFutures.waitDisconnectClients; + } + } } + // No in-progress closing. + fenceTopicToCloseOrDelete(); + this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture()); } finally { lock.writeLock().unlock(); } @@ -1513,11 +1569,22 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect }); } - CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect - ? CompletableFuture.completedFuture(null) - : FutureUtil.waitForAll(futures); + CompletableFuture disconnectClientsInCurrentCall = null; + AtomicReference> disconnectClientsToCache = new AtomicReference<>(); + switch (closeType) { + case notWaitDisconnectClients -> { + disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null); + disconnectClientsToCache.set(FutureUtil.waitForAll(futures)); + break; + } + case waitDisconnectClients -> { + disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); + disconnectClientsToCache.set(disconnectClientsInCurrentCall); + } + } + CompletableFuture closeFuture = new CompletableFuture<>(); - clientCloseFuture.thenRun(() -> { + Runnable closeLedgerAfterCloseClients = () -> { // After having disconnected all producers/consumers, close the managed ledger ledger.asyncClose(new CloseCallback() { @Override @@ -1532,13 +1599,32 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { disposeTopic(closeFuture); } }, null); - }).exceptionally(exception -> { + }; + disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); unfenceTopicToResume(); closeFuture.completeExceptionally(exception); return null; }); + switch (closeType) { + case notWaitDisconnectClients -> { + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, + closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> { + // Since the managed ledger has been closed, eat the error of clients disconnection. + log.error("[{}] Closed managed ledger, but disconnect clients failed," + + " this topic will be marked closed", topic, ex); + return null; + }))); + break; + } + case waitDisconnectClients -> { + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture); + } + } + return closeFuture; } @@ -1824,10 +1910,10 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma lock.readLock().lock(); try { if (isClosingOrDeleting) { - // Whether is "transferring" or not, do not create new replicator. + // Do not create new replicator. log.info("[{}] Skip to create replicator because this topic is closing." - + " remote cluster: {}. State of transferring : {}", - topic, remoteCluster, transferring); + + " remote cluster: {}.", + topic, remoteCluster); return; } Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index a1fbd8fad128d..e422f3d8c6fda 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; @@ -232,7 +233,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception }); } - private void injectMockReplicatorProducerBuilder( + private Runnable injectMockReplicatorProducerBuilder( BiFunction producerDecorator) throws Exception { String cluster2 = pulsar2.getConfig().getClusterName(); @@ -252,7 +253,8 @@ private void injectMockReplicatorProducerBuilder( replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); PulsarClient spyClient = spy(internalClient); - replicationClients.put(cluster2, spyClient); + assertTrue(replicationClients.remove(cluster2, internalClient)); + assertNull(replicationClients.putIfAbsent(cluster2, spyClient)); // Inject producer decorator. doAnswer(invocation -> { @@ -281,6 +283,12 @@ private void injectMockReplicatorProducerBuilder( }).when(spyProducerBuilder).createAsync(); return spyProducerBuilder; }).when(spyClient).newProducer(any(Schema.class)); + + // Return a cleanup injection task; + return () -> { + assertTrue(replicationClients.remove(cluster2, spyClient)); + assertNull(replicationClients.putIfAbsent(cluster2, internalClient)); + }; } private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception { @@ -374,7 +382,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { @@ -433,6 +441,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception }); // cleanup. + taskToClearInjection.run(); cleanupTopics(() -> { admin1.topics().delete(topicName); admin2.topics().delete(topicName); @@ -537,7 +546,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { @@ -599,6 +608,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception }); // cleanup. + taskToClearInjection.run(); cleanupTopics(namespaceName, () -> { admin1.topics().delete(topicName); admin2.topics().delete(topicName); @@ -619,8 +629,6 @@ public void testUnFenceTopicToReuse() throws Exception { final String mockProducerName = UUID.randomUUID().toString(); final org.apache.pulsar.broker.service.Producer mockProducer = mock(org.apache.pulsar.broker.service.Producer.class); - doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error"))) - .when(mockProducer).disconnect(any()); doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error"))) .when(mockProducer).disconnect(); PersistentTopic persistentTopic = @@ -631,7 +639,7 @@ public void testUnFenceTopicToReuse() throws Exception { GeoPersistentReplicator replicator1 = (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); try { - persistentTopic.close(true, false).join(); + persistentTopic.close(false).join(); fail("Expected close fails due to a producer close fails"); } catch (Exception ex) { log.info("Expected error: {}", ex.getMessage()); @@ -650,8 +658,9 @@ public void testUnFenceTopicToReuse() throws Exception { assertTrue(replicator2.producer != null && replicator2.producer.isConnected()); }); - // cleanup. + // cleanup the injection. persistentTopic.getProducers().remove(mockProducerName, mockProducer); + // cleanup. producer1.close(); cleanupTopics(() -> { admin1.topics().delete(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 464243f4bbbef..7c62c115f0711 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -47,6 +47,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -54,6 +56,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; @@ -321,6 +324,83 @@ public void testPersistentPartitionedTopicUnload() throws Exception { } } + @DataProvider(name = "closeWithoutWaitingClientDisconnectInFirstBatch") + public Object[][] closeWithoutWaitingClientDisconnectInFirstBatch() { + return new Object[][]{ + new Object[] {true}, + new Object[] {false}, + }; + } + + @Test(dataProvider = "closeWithoutWaitingClientDisconnectInFirstBatch") + public void testConcurrentClose(boolean closeWithoutWaitingClientDisconnectInFirstBatch) throws Exception { + final String topicName = "persistent://prop/ns/concurrentClose"; + final String ns = "prop/ns"; + admin.namespaces().createNamespace(ns, 1); + admin.topics().createNonPartitionedTopic(topicName); + final Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); + List> futureList = + make2ConcurrentBatchesOfClose(topic, 10, closeWithoutWaitingClientDisconnectInFirstBatch); + Map>> futureMap = + futureList.stream().collect(Collectors.groupingBy(Objects::hashCode)); + /** + * The first call: get the return value of "topic.close". + * The other 19 calls: get the cached value which related {@link PersistentTopic#closeFutures}. + */ + assertTrue(futureMap.size() <= 3); + for (List list : futureMap.values()){ + if (list.size() == 1){ + // This is the first call, the future is the return value of `topic.close`. + } else { + // Two types future list: wait client close or not. + assertTrue(list.size() >= 9 && list.size() <= 10); + } + } + } + + private List> make2ConcurrentBatchesOfClose(Topic topic, int tryTimes, + boolean closeWithoutWaitingClientDisconnectInFirstBatch){ + final List> futureList = Collections.synchronizedList(new ArrayList<>()); + final List taskList = new ArrayList<>(); + CountDownLatch allTaskBeginLatch = new CountDownLatch(1); + // Call a batch of close. + for (int i = 0; i < tryTimes; i++) { + Thread thread = new Thread(() -> { + try { + allTaskBeginLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + futureList.add(topic.close(closeWithoutWaitingClientDisconnectInFirstBatch)); + }); + thread.start(); + taskList.add(thread); + } + // Call another batch of close. + for (int i = 0; i < tryTimes; i++) { + Thread thread = new Thread(() -> { + try { + allTaskBeginLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + futureList.add(topic.close(!closeWithoutWaitingClientDisconnectInFirstBatch)); + }); + thread.start(); + taskList.add(thread); + } + // Wait close task executed. + allTaskBeginLatch.countDown(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()->{ + for (Thread thread : taskList){ + if (thread.isAlive()){ + return false; + } + } + return true; + }); + return futureList; + } @DataProvider(name = "topicAndMetricsLevel") public Object[][] indexPatternTestData() { @@ -330,7 +410,6 @@ public Object[][] indexPatternTestData() { }; } - @Test(dataProvider = "topicAndMetricsLevel") public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception { PulsarClient client = pulsar.getClient(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 6f62589853593..f6fcb12f35939 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -69,6 +70,36 @@ public static CompletableFuture> waitForAll(Stream void completeAfter(final CompletableFuture dest, CompletableFuture src) { + src.whenComplete((v, ex) -> { + if (ex != null) { + dest.completeExceptionally(ex); + } else { + dest.complete(v); + } + }); + } + + /** + * Make the dest future complete after others. {@param dest} is will be completed with a {@link Void} value + * if all the futures of {@param src} is completed, or be completed exceptionally with the same error as the first + * one completed exceptionally future of {@param src}. + */ + public static void completeAfterAll(final CompletableFuture dest, + CompletableFuture... src) { + FutureUtil.waitForAll(Arrays.asList(src)).whenComplete((ignore, ex) -> { + if (ex != null) { + dest.completeExceptionally(ex); + } else { + dest.complete(null); + } + }); + } + /** * Return a future that represents the completion of any future in the provided Collection. * @@ -131,7 +162,7 @@ public static CompletableFuture> waitForAny(Collection waitForAllAndSupportCancel( - Collection> futures) { + Collection> futures) { CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]); CompletableFuture combinedFuture = CompletableFuture.allOf(futuresArray); whenCancelledOrTimedOut(combinedFuture, () -> {