From 7df59b9066e71a202727f7d9dca72fc3a7b5c134 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 22 Jan 2024 03:44:36 +0800 Subject: [PATCH 1/7] rebase master --- .../service/persistent/PersistentTopic.java | 120 ++++++++++++++---- .../persistent/PersistentTopicTest.java | 83 +++++++++++- .../apache/pulsar/common/util/FutureUtil.java | 33 ++++- 3 files changed, 212 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c1a75d67e3c4e..cb2d67fb8980b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -53,6 +53,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Value; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -273,6 +274,8 @@ protected TopicStatsHelper initialValue() { @Getter private final ExecutorService orderedExecutor; + private volatile CloseFutures closeFutures; + @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); @@ -296,6 +299,44 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult { Long estimatedOldestUnacknowledgedMessageTimestamp; } + /*** + * We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return + * the in-progress one when it is called the second time. + * + * The topic closing will be called the below scenarios: + * 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}. + * 2. Namespace bundle transfer or unloading. + * a. The unloading topic triggered by unloading namespace bundles will not wait for clients disconnect. Relate + * to {@link CloseFutures#notWaitDisconnectClients}. + * b. The unloading topic triggered by unloading namespace bundles was seperated to two steps when using + * {@link ExtensibleLoadManagerImpl}. + * b-1. step-1: fence the topic on the original Broker, and do not trigger reconnections of clients. Relate + * to {@link CloseFutures#transferring}. This step is a half closing. + * b-2. step-2: send the owner broker information to clients and disconnect clients. Relate + * to {@link CloseFutures#notWaitDisconnectClients}. + * + * The three futures will be setting as the below rule: + * Event: Topic close. + * - If the first one closing is called by "close and not disconnect clients": + * - {@link CloseFutures#transferring} will be initialized as "close and not disconnect clients". + * - {@link CloseFutures#waitDisconnectClients} ang {@link CloseFutures#notWaitDisconnectClients} will be empty, + * the second closing will do a new close after {@link CloseFutures#transferring} is completed. + * - If the first one closing is called by "close and not wait for clients disconnect": + * - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect". + * - {@link CloseFutures#notWaitDisconnectClients} ang {@link CloseFutures#transferring} will be + * initialized as "not waiting for clients disconnect" . + * - If the first one closing is called by "close and wait for clients disconnect", the three futures will be + * initialized as "waiting for clients disconnect". + * Event: Topic delete. + * the three futures will be initialized as "waiting for clients disconnect". + */ + @AllArgsConstructor + private class CloseFutures { + private final CompletableFuture waitDisconnectClients; + private final CompletableFuture notWaitDisconnectClients; + private final CompletableFuture transferring; + } + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -1414,8 +1455,11 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting + // Mark the progress of close to prevent close calling concurrently. + this.closeFutures = + new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture()); - return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() + CompletableFuture res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> { CompletableFuture deleteFuture = new CompletableFuture<>(); @@ -1517,6 +1561,11 @@ public void deleteLedgerComplete(Object ctx) { unfenceTopicToResume(); } }); + + FutureUtil.completeAfter(closeFutures.transferring, res); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res); + FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res); + return res; } finally { lock.writeLock().unlock(); } @@ -1542,27 +1591,45 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect @Override public CompletableFuture close( boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { - CompletableFuture closeFuture = new CompletableFuture<>(); - lock.writeLock().lock(); + CompletableFuture inProgressTransferCloseTask = null; try { if (!disconnectClients) { transferring = true; } // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker // forcefully wants to close managed-ledger without waiting all resources to be closed. - if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) { - fenceTopicToCloseOrDelete(); + if (isClosingOrDeleting) { + // Return in-progress future if exists. + if (!disconnectClients) { + return closeFutures.transferring; + } + if (closeWithoutWaitingClientDisconnect && closeFutures.notWaitDisconnectClients != null) { + return closeFutures.notWaitDisconnectClients; + } + if (!closeWithoutWaitingClientDisconnect && closeFutures.waitDisconnectClients != null) { + return closeFutures.waitDisconnectClients; + } + /** There is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/ + if (transferring) { + inProgressTransferCloseTask = closeFutures.transferring; + } + } + fenceTopicToCloseOrDelete(); + if (!disconnectClients) { + this.closeFutures = new CloseFutures(new CompletableFuture(), null, null); } else { - log.warn("[{}] Topic is already being closed or deleted", topic); - closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced")); - return closeFuture; + this.closeFutures = + new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture()); } } finally { lock.writeLock().unlock(); } List> futures = new ArrayList<>(); + if (inProgressTransferCloseTask != null) { + futures.add(inProgressTransferCloseTask); + } futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); @@ -1605,20 +1672,17 @@ public CompletableFuture close( } } - CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect - ? CompletableFuture.completedFuture(null) - : FutureUtil.waitForAll(futures); - - clientCloseFuture.thenRun(() -> { - // After having disconnected all producers/consumers, close the managed ledger + CompletableFuture disconnectClientsFuture = FutureUtil.waitForAll(futures); + CompletableFuture closeMLFuture = new CompletableFuture<>(); + FutureUtil.runWithCurrentThread(() -> { ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { if (disconnectClients) { // Everything is now closed, remove the topic from map - disposeTopic(closeFuture); + disposeTopic(closeMLFuture); } else { - closeFuture.complete(null); + closeMLFuture.complete(null); } } @@ -1626,20 +1690,32 @@ public void closeComplete(Object ctx) { public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); if (disconnectClients) { - disposeTopic(closeFuture); + disposeTopic(closeMLFuture); } else { - closeFuture.complete(null); + closeMLFuture.complete(null); } } }, null); }).exceptionally(exception -> { - log.error("[{}] Error closing topic", topic, exception); - unfenceTopicToResume(); - closeFuture.completeExceptionally(exception); + log.error("[{}] Error closing managed ledger", topic, exception); + closeMLFuture.completeExceptionally(exception); return null; }); - return closeFuture; + + if (!disconnectClients) { + // If closing for change the topic state to transferring, + // only initialize the variable "closeFutures.transferring". + FutureUtil.completeAfterAll(closeFutures.transferring, closeMLFuture, disconnectClientsFuture); + return closeFutures.transferring; + } + FutureUtil.completeAfterAll(closeFutures.transferring, closeMLFuture, disconnectClientsFuture); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeMLFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeMLFuture, disconnectClientsFuture); + if (closeWithoutWaitingClientDisconnect) { + return closeFutures.notWaitDisconnectClients; + } + return closeFutures.waitDisconnectClients; } private void disposeTopic(CompletableFuture closeFuture) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 44d24668cc381..1efee58d0084b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -49,6 +49,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -56,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; @@ -70,6 +73,8 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -322,6 +327,83 @@ public void testPersistentPartitionedTopicUnload() throws Exception { } } + @DataProvider(name = "closeWithoutWaitingClientDisconnectInFirstBatch") + public Object[][] closeWithoutWaitingClientDisconnectInFirstBatch() { + return new Object[][]{ + new Object[] {true}, + new Object[] {false}, + }; + } + + @Test(dataProvider = "closeWithoutWaitingClientDisconnectInFirstBatch") + public void testConcurrentClose(boolean closeWithoutWaitingClientDisconnectInFirstBatch) throws Exception { + final String topicName = "persistent://prop/ns/concurrentClose"; + final String ns = "prop/ns"; + admin.namespaces().createNamespace(ns, 1); + admin.topics().createNonPartitionedTopic(topicName); + final Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); + List> futureList = + make2ConcurrentBatchesOfClose(topic, 10, closeWithoutWaitingClientDisconnectInFirstBatch); + Map>> futureMap = + futureList.stream().collect(Collectors.groupingBy(Objects::hashCode)); + /** + * The first call: get the return value of "topic.close". + * The other 19 calls: get the cached value which related {@link PersistentTopic#closeFutures}. + */ + assertEquals(futureMap.size(), 3); + for (List list : futureMap.values()){ + if (list.size() == 1){ + // This is the first call, the future is the return value of `topic.close`. + } else { + // Two types future list: wait client close or not. + assertTrue(list.size() >= 9 && list.size() <= 10); + } + } + } + + private List> make2ConcurrentBatchesOfClose(Topic topic, int tryTimes, + boolean closeWithoutWaitingClientDisconnectInFirstBatch){ + final List> futureList = Collections.synchronizedList(new ArrayList<>()); + final List taskList = new ArrayList<>(); + CountDownLatch allTaskBeginLatch = new CountDownLatch(1); + // Call a batch of close. + for (int i = 0; i < tryTimes; i++) { + Thread thread = new Thread(() -> { + try { + allTaskBeginLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + futureList.add(topic.close(closeWithoutWaitingClientDisconnectInFirstBatch)); + }); + thread.start(); + taskList.add(thread); + } + // Call another batch of close. + for (int i = 0; i < tryTimes; i++) { + Thread thread = new Thread(() -> { + try { + allTaskBeginLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + futureList.add(topic.close(!closeWithoutWaitingClientDisconnectInFirstBatch)); + }); + thread.start(); + taskList.add(thread); + } + // Wait close task executed. + allTaskBeginLatch.countDown(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()->{ + for (Thread thread : taskList){ + if (thread.isAlive()){ + return false; + } + } + return true; + }); + return futureList; + } @DataProvider(name = "topicAndMetricsLevel") public Object[][] indexPatternTestData() { @@ -331,7 +413,6 @@ public Object[][] indexPatternTestData() { }; } - @Test(dataProvider = "topicAndMetricsLevel") public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception { PulsarClient client = pulsar.getClient(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 6f62589853593..f6fcb12f35939 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -69,6 +70,36 @@ public static CompletableFuture> waitForAll(Stream void completeAfter(final CompletableFuture dest, CompletableFuture src) { + src.whenComplete((v, ex) -> { + if (ex != null) { + dest.completeExceptionally(ex); + } else { + dest.complete(v); + } + }); + } + + /** + * Make the dest future complete after others. {@param dest} is will be completed with a {@link Void} value + * if all the futures of {@param src} is completed, or be completed exceptionally with the same error as the first + * one completed exceptionally future of {@param src}. + */ + public static void completeAfterAll(final CompletableFuture dest, + CompletableFuture... src) { + FutureUtil.waitForAll(Arrays.asList(src)).whenComplete((ignore, ex) -> { + if (ex != null) { + dest.completeExceptionally(ex); + } else { + dest.complete(null); + } + }); + } + /** * Return a future that represents the completion of any future in the provided Collection. * @@ -131,7 +162,7 @@ public static CompletableFuture> waitForAny(Collection waitForAllAndSupportCancel( - Collection> futures) { + Collection> futures) { CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]); CompletableFuture combinedFuture = CompletableFuture.allOf(futuresArray); whenCancelledOrTimedOut(combinedFuture, () -> { From a6b2601d155a01813a775d7dc25001319f140299 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 26 Apr 2024 10:42:42 +0800 Subject: [PATCH 2/7] remove useless imports --- .../pulsar/broker/service/persistent/PersistentTopicTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 1efee58d0084b..41093c03cf352 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -73,8 +73,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; -import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; From 0d643734cf7c9d4d6895f79146ed298da38a8ff4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 26 Apr 2024 15:59:12 +0800 Subject: [PATCH 3/7] fix NPE --- .../broker/service/persistent/PersistentTopic.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cb2d67fb8980b..c91507213d5d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -330,11 +330,17 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult { * Event: Topic delete. * the three futures will be initialized as "waiting for clients disconnect". */ - @AllArgsConstructor private class CloseFutures { - private final CompletableFuture waitDisconnectClients; - private final CompletableFuture notWaitDisconnectClients; private final CompletableFuture transferring; + private final CompletableFuture notWaitDisconnectClients; + private final CompletableFuture waitDisconnectClients; + + public CloseFutures(CompletableFuture transferring, CompletableFuture waitDisconnectClients, + CompletableFuture notWaitDisconnectClients) { + this.transferring = transferring; + this.waitDisconnectClients = waitDisconnectClients; + this.notWaitDisconnectClients = notWaitDisconnectClients; + } } private static class TopicStatsHelper { From 651c84e1d04e6841b0fe9dbf2ef5824b304b7ba6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 26 Apr 2024 16:04:05 +0800 Subject: [PATCH 4/7] remove useless imports --- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c91507213d5d7..1f270f5bde61d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -53,7 +53,6 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nonnull; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Value; import org.apache.bookkeeper.client.api.LedgerMetadata; From 30a6c21ff1565408bf8e6cb6c337d27b4e3a36cd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 26 Apr 2024 16:50:47 +0800 Subject: [PATCH 5/7] fix flaky test --- .../pulsar/broker/service/persistent/PersistentTopicTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 41093c03cf352..d523586c2e2d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -348,7 +348,7 @@ public void testConcurrentClose(boolean closeWithoutWaitingClientDisconnectInFir * The first call: get the return value of "topic.close". * The other 19 calls: get the cached value which related {@link PersistentTopic#closeFutures}. */ - assertEquals(futureMap.size(), 3); + assertTrue(futureMap.size() <= 3); for (List list : futureMap.values()){ if (list.size() == 1){ // This is the first call, the future is the return value of `topic.close`. From 057fe1d64819ef7c332c99c3b03e644f4d668023 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sun, 28 Apr 2024 00:54:57 +0800 Subject: [PATCH 6/7] fix the behavior change --- .../service/persistent/PersistentTopic.java | 139 +++++++++++------- .../broker/service/OneWayReplicatorTest.java | 21 ++- 2 files changed, 105 insertions(+), 55 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1f270f5bde61d..954257113555f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -1586,6 +1587,12 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect return close(true, closeWithoutWaitingClientDisconnect); } + private enum CloseTypes { + transferring, + notWaitDisconnectClients, + waitDisconnectClients; + } + /** * Close this topic - close all producers and subscriptions associated with this topic. * @@ -1597,31 +1604,38 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect public CompletableFuture close( boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { lock.writeLock().lock(); - CompletableFuture inProgressTransferCloseTask = null; - try { - if (!disconnectClients) { - transferring = true; - } + // Choose the close type. + CloseTypes closeType; + if (!disconnectClients) { + closeType = CloseTypes.transferring; + } else if (closeWithoutWaitingClientDisconnect) { + closeType = CloseTypes.notWaitDisconnectClients; + } else { // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker // forcefully wants to close managed-ledger without waiting all resources to be closed. + closeType = CloseTypes.waitDisconnectClients; + } + /** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/ + CompletableFuture inProgressTransferCloseTask = null; + try { + // Return in-progress future if exists. if (isClosingOrDeleting) { - // Return in-progress future if exists. - if (!disconnectClients) { - return closeFutures.transferring; + if (closeType == CloseTypes.transferring) { + return closeFutures.transferring; } - if (closeWithoutWaitingClientDisconnect && closeFutures.notWaitDisconnectClients != null) { + if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) { return closeFutures.notWaitDisconnectClients; } - if (!closeWithoutWaitingClientDisconnect && closeFutures.waitDisconnectClients != null) { + if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) { return closeFutures.waitDisconnectClients; } - /** There is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/ if (transferring) { inProgressTransferCloseTask = closeFutures.transferring; } } fenceTopicToCloseOrDelete(); - if (!disconnectClients) { + if (closeType == CloseTypes.transferring) { + transferring = true; this.closeFutures = new CloseFutures(new CompletableFuture(), null, null); } else { this.closeFutures = @@ -1639,7 +1653,7 @@ public CompletableFuture close( futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); - if (disconnectClients) { + if (closeType != CloseTypes.transferring) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData))); @@ -1677,50 +1691,75 @@ public CompletableFuture close( } } - CompletableFuture disconnectClientsFuture = FutureUtil.waitForAll(futures); - CompletableFuture closeMLFuture = new CompletableFuture<>(); - FutureUtil.runWithCurrentThread(() -> { - ledger.asyncClose(new CloseCallback() { - @Override - public void closeComplete(Object ctx) { - if (disconnectClients) { - // Everything is now closed, remove the topic from map - disposeTopic(closeMLFuture); - } else { - closeMLFuture.complete(null); - } + CompletableFuture disconnectClientsInCurrentCall = null; + // Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring. + MutableObject> disconnectClientsToCache = new MutableObject<>(); + switch (closeType) { + case transferring -> { + disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); + break; + } + case notWaitDisconnectClients -> { + disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null); + disconnectClientsToCache.setValue(FutureUtil.waitForAll(futures)); + break; + } + case waitDisconnectClients -> { + disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); + disconnectClientsToCache.setValue(disconnectClientsInCurrentCall); + } + } + + CompletableFuture closeFuture = new CompletableFuture<>(); + Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + if (closeType != CloseTypes.transferring) { + // Everything is now closed, remove the topic from map + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); } + } - @Override - public void closeFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - if (disconnectClients) { - disposeTopic(closeMLFuture); - } else { - closeMLFuture.complete(null); - } + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); + if (closeType != CloseTypes.transferring) { + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); } - }, null); - }).exceptionally(exception -> { - log.error("[{}] Error closing managed ledger", topic, exception); - closeMLFuture.completeExceptionally(exception); + } + }, null)); + + disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> { + log.error("[{}] Error closing topic", topic, exception); + unfenceTopicToResume(); + closeFuture.completeExceptionally(exception); return null; }); + switch (closeType) { + case transferring -> { + FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); + break; + } + case notWaitDisconnectClients -> { + FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, + closeFuture.thenCompose(ignore -> disconnectClientsToCache.getValue())); + break; + } + case waitDisconnectClients -> { + FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture); + } + } - if (!disconnectClients) { - // If closing for change the topic state to transferring, - // only initialize the variable "closeFutures.transferring". - FutureUtil.completeAfterAll(closeFutures.transferring, closeMLFuture, disconnectClientsFuture); - return closeFutures.transferring; - } - FutureUtil.completeAfterAll(closeFutures.transferring, closeMLFuture, disconnectClientsFuture); - FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeMLFuture); - FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeMLFuture, disconnectClientsFuture); - if (closeWithoutWaitingClientDisconnect) { - return closeFutures.notWaitDisconnectClients; - } - return closeFutures.waitDisconnectClients; + return closeFuture; } private void disposeTopic(CompletableFuture closeFuture) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 9b8b567af081b..eb31c13b0d528 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; @@ -226,7 +227,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception }); } - private void injectMockReplicatorProducerBuilder( + private Runnable injectMockReplicatorProducerBuilder( BiFunction producerDecorator) throws Exception { String cluster2 = pulsar2.getConfig().getClusterName(); @@ -246,7 +247,8 @@ private void injectMockReplicatorProducerBuilder( replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); PulsarClient spyClient = spy(internalClient); - replicationClients.put(cluster2, spyClient); + assertTrue(replicationClients.remove(cluster2, internalClient)); + assertNull(replicationClients.putIfAbsent(cluster2, spyClient)); // Inject producer decorator. doAnswer(invocation -> { @@ -275,6 +277,12 @@ private void injectMockReplicatorProducerBuilder( }).when(spyProducerBuilder).createAsync(); return spyProducerBuilder; }).when(spyClient).newProducer(any(Schema.class)); + + // Return a cleanup injection task; + return () -> { + assertTrue(replicationClients.remove(cluster2, spyClient)); + assertNull(replicationClients.putIfAbsent(cluster2, internalClient)); + }; } private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception { @@ -368,7 +376,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { @@ -427,6 +435,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception }); // cleanup. + taskToClearInjection.run(); cleanupTopics(() -> { admin1.topics().delete(topicName); admin2.topics().delete(topicName); @@ -531,7 +540,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { @@ -593,6 +602,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception }); // cleanup. + taskToClearInjection.run(); cleanupTopics(namespaceName, () -> { admin1.topics().delete(topicName); admin2.topics().delete(topicName); @@ -644,8 +654,9 @@ public void testUnFenceTopicToReuse() throws Exception { assertTrue(replicator2.producer != null && replicator2.producer.isConnected()); }); - // cleanup. + // cleanup the injection. persistentTopic.getProducers().remove(mockProducerName, mockProducer); + // cleanup. producer1.close(); cleanupTopics(() -> { admin1.topics().delete(topicName); From 996cebaf2a156d6881c00e5fa1574c42f45a30a7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sun, 28 Apr 2024 11:08:48 +0800 Subject: [PATCH 7/7] address comment --- .../service/persistent/PersistentTopic.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 954257113555f..ea3feeac003d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -85,7 +86,6 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -1693,7 +1693,7 @@ public CompletableFuture close( CompletableFuture disconnectClientsInCurrentCall = null; // Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring. - MutableObject> disconnectClientsToCache = new MutableObject<>(); + AtomicReference> disconnectClientsToCache = new AtomicReference<>(); switch (closeType) { case transferring -> { disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); @@ -1701,12 +1701,12 @@ public CompletableFuture close( } case notWaitDisconnectClients -> { disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null); - disconnectClientsToCache.setValue(FutureUtil.waitForAll(futures)); + disconnectClientsToCache.set(FutureUtil.waitForAll(futures)); break; } case waitDisconnectClients -> { disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); - disconnectClientsToCache.setValue(disconnectClientsInCurrentCall); + disconnectClientsToCache.set(disconnectClientsInCurrentCall); } } @@ -1749,7 +1749,12 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, - closeFuture.thenCompose(ignore -> disconnectClientsToCache.getValue())); + closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> { + // Since the managed ledger has been closed, eat the error of clients disconnection. + log.error("[{}] Closed managed ledger, but disconnect clients failed," + + " this topic will be marked closed", topic, ex); + return null; + }))); break; } case waitDisconnectClients -> {