Skip to content

Commit

Permalink
[fix][broker] One topic can be closed multiple times concurrently (ap…
Browse files Browse the repository at this point in the history
…ache#17524)

(cherry picked from commit 93afd89)
(cherry picked from commit 620fe9b)
  • Loading branch information
poorbarcode authored and srinath-ctds committed May 16, 2024
1 parent 90036e1 commit 45de62a
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -259,6 +260,37 @@ protected TopicStatsHelper initialValue() {
@Getter
private final ExecutorService orderedExecutor;

private volatile CloseFutures closeFutures;

/***
* We use 2 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
*
* The topic closing will be called the below scenarios:
* 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}.
* 2. Namespace bundle unloading. The unloading topic triggered by unloading namespace bundles will not wait for
* clients disconnect. See {@link CloseFutures#notWaitDisconnectClients}.
*
* The two futures will be setting as the below rule:
* Event: Topic close.
* - If the first one closing is called by "close and not wait for clients disconnect":
* - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect".
* - If the first one closing is called by "close and wait for clients disconnect", the two futures will be
* initialized as "waiting for clients disconnect".
* Event: Topic delete.
* the three futures will be initialized as "waiting for clients disconnect".
*/
private class CloseFutures {
private final CompletableFuture<Void> notWaitDisconnectClients;
private final CompletableFuture<Void> waitDisconnectClients;

public CloseFutures(CompletableFuture<Void> waitDisconnectClients,
CompletableFuture<Void> notWaitDisconnectClients) {
this.waitDisconnectClients = waitDisconnectClients;
this.notWaitDisconnectClients = notWaitDisconnectClients;
}
}

private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -1356,8 +1388,10 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
// Mark the progress of close to prevent close calling concurrently.
this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture());

return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -1460,6 +1494,10 @@ public void deleteLedgerComplete(Object ctx) {
unfenceTopicToResume();
}
});

FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
return res;
} finally {
lock.writeLock().unlock();
}
Expand All @@ -1470,6 +1508,11 @@ public CompletableFuture<Void> close() {
return close(false);
}

private enum CloseTypes {
notWaitDisconnectClients,
waitDisconnectClients;
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
Expand All @@ -1478,19 +1521,32 @@ public CompletableFuture<Void> close() {
*/
@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
CloseTypes closeType;
if (closeWithoutWaitingClientDisconnect) {
closeType = CloseTypes.notWaitDisconnectClients;
} else {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
fenceTopicToCloseOrDelete();
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return closeFuture;
closeType = CloseTypes.waitDisconnectClients;
}

lock.writeLock().lock();
try {
// Return in-progress future if exists.
if (isClosingOrDeleting) {
switch (closeType) {
case notWaitDisconnectClients -> {
return closeFutures.notWaitDisconnectClients;
}
case waitDisconnectClients -> {
return closeFutures.waitDisconnectClients;
}
}
}
// No in-progress closing.
fenceTopicToCloseOrDelete();
this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture());
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1528,11 +1584,22 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
}
}

CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
CompletableFuture<Void> disconnectClientsInCurrentCall = null;
AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
switch (closeType) {
case notWaitDisconnectClients -> {
disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
break;
}
case waitDisconnectClients -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
disconnectClientsToCache.set(disconnectClientsInCurrentCall);
}
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

clientCloseFuture.thenRun(() -> {
Runnable closeLedgerAfterCloseClients = () -> {
// After having disconnected all producers/consumers, close the managed ledger
ledger.asyncClose(new CloseCallback() {
@Override
Expand All @@ -1547,13 +1614,32 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
disposeTopic(closeFuture);
}
}, null);
}).exceptionally(exception -> {
};
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});

switch (closeType) {
case notWaitDisconnectClients -> {
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
// Since the managed ledger has been closed, eat the error of clients disconnection.
log.error("[{}] Closed managed ledger, but disconnect clients failed,"
+ " this topic will be marked closed", topic, ex);
return null;
})));
break;
}
case waitDisconnectClients -> {
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
}
}

return closeFuture;
}

Expand Down Expand Up @@ -1839,10 +1925,10 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
lock.readLock().lock();
try {
if (isClosingOrDeleting) {
// Whether is "transferring" or not, do not create new replicator.
// Do not create new replicator.
log.info("[{}] Skip to create replicator because this topic is closing."
+ " remote cluster: {}. State of transferring : {}",
topic, remoteCluster, transferring);
+ " remote cluster: {}.",
topic, remoteCluster);
return;
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -232,7 +233,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
});
}

private void injectMockReplicatorProducerBuilder(
private Runnable injectMockReplicatorProducerBuilder(
BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator)
throws Exception {
String cluster2 = pulsar2.getConfig().getClusterName();
Expand All @@ -252,7 +253,8 @@ private void injectMockReplicatorProducerBuilder(
replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients");
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
PulsarClient spyClient = spy(internalClient);
replicationClients.put(cluster2, spyClient);
assertTrue(replicationClients.remove(cluster2, internalClient));
assertNull(replicationClients.putIfAbsent(cluster2, spyClient));

// Inject producer decorator.
doAnswer(invocation -> {
Expand Down Expand Up @@ -281,6 +283,12 @@ private void injectMockReplicatorProducerBuilder(
}).when(spyProducerBuilder).createAsync();
return spyProducerBuilder;
}).when(spyClient).newProducer(any(Schema.class));

// Return a cleanup injection task;
return () -> {
assertTrue(replicationClients.remove(cluster2, spyClient));
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
};
}

private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception {
Expand Down Expand Up @@ -374,7 +382,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
Expand Down Expand Up @@ -433,6 +441,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
});

// cleanup.
taskToClearInjection.run();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
Expand Down Expand Up @@ -537,7 +546,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
Expand Down Expand Up @@ -599,6 +608,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
});

// cleanup.
taskToClearInjection.run();
cleanupTopics(namespaceName, () -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
Expand All @@ -619,8 +629,6 @@ public void testUnFenceTopicToReuse() throws Exception {
final String mockProducerName = UUID.randomUUID().toString();
final org.apache.pulsar.broker.service.Producer mockProducer =
mock(org.apache.pulsar.broker.service.Producer.class);
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
.when(mockProducer).disconnect(any());
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
.when(mockProducer).disconnect();
PersistentTopic persistentTopic =
Expand All @@ -631,7 +639,7 @@ public void testUnFenceTopicToReuse() throws Exception {
GeoPersistentReplicator replicator1 =
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
try {
persistentTopic.close(true, false).join();
persistentTopic.close(false).join();
fail("Expected close fails due to a producer close fails");
} catch (Exception ex) {
log.info("Expected error: {}", ex.getMessage());
Expand All @@ -650,8 +658,9 @@ public void testUnFenceTopicToReuse() throws Exception {
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
});

// cleanup.
// cleanup the injection.
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
// cleanup.
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
Expand Down
Loading

0 comments on commit 45de62a

Please sign in to comment.