Skip to content

Commit

Permalink
[fix][broker] One topic can be closed multiple times concurrently (#1…
Browse files Browse the repository at this point in the history
…7524)

(cherry picked from commit 93afd89)
  • Loading branch information
poorbarcode committed May 7, 2024
1 parent 788124d commit 06a9fec
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -270,6 +272,52 @@ protected TopicStatsHelper initialValue() {
@Getter
private final ExecutorService orderedExecutor;

private volatile CloseFutures closeFutures;

/***
* We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
*
* The topic closing will be called the below scenarios:
* 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}.
* 2. Namespace bundle transfer or unloading.
* a. The unloading topic triggered by unloading namespace bundles will not wait for clients disconnect. Relate
* to {@link CloseFutures#notWaitDisconnectClients}.
* b. The unloading topic triggered by unloading namespace bundles was seperated to two steps when using
* {@link ExtensibleLoadManagerImpl}.
* b-1. step-1: fence the topic on the original Broker, and do not trigger reconnections of clients. Relate
* to {@link CloseFutures#transferring}. This step is a half closing.
* b-2. step-2: send the owner broker information to clients and disconnect clients. Relate
* to {@link CloseFutures#notWaitDisconnectClients}.
*
* The three futures will be setting as the below rule:
* Event: Topic close.
* - If the first one closing is called by "close and not disconnect clients":
* - {@link CloseFutures#transferring} will be initialized as "close and not disconnect clients".
* - {@link CloseFutures#waitDisconnectClients} ang {@link CloseFutures#notWaitDisconnectClients} will be empty,
* the second closing will do a new close after {@link CloseFutures#transferring} is completed.
* - If the first one closing is called by "close and not wait for clients disconnect":
* - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect".
* - {@link CloseFutures#notWaitDisconnectClients} ang {@link CloseFutures#transferring} will be
* initialized as "not waiting for clients disconnect" .
* - If the first one closing is called by "close and wait for clients disconnect", the three futures will be
* initialized as "waiting for clients disconnect".
* Event: Topic delete.
* the three futures will be initialized as "waiting for clients disconnect".
*/
private class CloseFutures {
private final CompletableFuture<Void> transferring;
private final CompletableFuture<Void> notWaitDisconnectClients;
private final CompletableFuture<Void> waitDisconnectClients;

public CloseFutures(CompletableFuture<Void> transferring, CompletableFuture<Void> waitDisconnectClients,
CompletableFuture<Void> notWaitDisconnectClients) {
this.transferring = transferring;
this.waitDisconnectClients = waitDisconnectClients;
this.notWaitDisconnectClients = notWaitDisconnectClients;
}
}

private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -1380,8 +1428,11 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
// Mark the progress of close to prevent close calling concurrently.
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());

return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -1484,6 +1535,11 @@ public void deleteLedgerComplete(Object ctx) {
unfenceTopicToResume();
}
});

FutureUtil.completeAfter(closeFutures.transferring, res);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
return res;
} finally {
lock.writeLock().unlock();
}
Expand All @@ -1499,6 +1555,12 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
return close(true, closeWithoutWaitingClientDisconnect);
}

private enum CloseTypes {
transferring,
notWaitDisconnectClients,
waitDisconnectClients;
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
Expand All @@ -1509,32 +1571,57 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
@Override
public CompletableFuture<Void> close(
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
if (!disconnectClients) {
transferring = true;
}
// Choose the close type.
CloseTypes closeType;
if (!disconnectClients) {
closeType = CloseTypes.transferring;
} else if (closeWithoutWaitingClientDisconnect) {
closeType = CloseTypes.notWaitDisconnectClients;
} else {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
fenceTopicToCloseOrDelete();
closeType = CloseTypes.waitDisconnectClients;
}
/** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/
CompletableFuture<Void> inProgressTransferCloseTask = null;
try {
// Return in-progress future if exists.
if (isClosingOrDeleting) {
if (closeType == CloseTypes.transferring) {
return closeFutures.transferring;
}
if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) {
return closeFutures.notWaitDisconnectClients;
}
if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) {
return closeFutures.waitDisconnectClients;
}
if (transferring) {
inProgressTransferCloseTask = closeFutures.transferring;
}
}
fenceTopicToCloseOrDelete();
if (closeType == CloseTypes.transferring) {
transferring = true;
this.closeFutures = new CloseFutures(new CompletableFuture(), null, null);
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return closeFuture;
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
}
} finally {
lock.writeLock().unlock();
}

List<CompletableFuture<Void>> futures = new ArrayList<>();
if (inProgressTransferCloseTask != null) {
futures.add(inProgressTransferCloseTask);
}

futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
if (disconnectClients) {
if (closeType != CloseTypes.transferring) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)));
Expand Down Expand Up @@ -1572,40 +1659,79 @@ public CompletableFuture<Void> close(
}
}

CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
CompletableFuture<Void> disconnectClientsInCurrentCall = null;
// Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring.
AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
switch (closeType) {
case transferring -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
break;
}
case notWaitDisconnectClients -> {
disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
break;
}
case waitDisconnectClients -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
disconnectClientsToCache.set(disconnectClientsInCurrentCall);
}
}

clientCloseFuture.thenRun(() -> {
// After having disconnected all producers/consumers, close the managed ledger
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (closeType != CloseTypes.transferring) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (disconnectClients) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (closeType != CloseTypes.transferring) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}, null);
}).exceptionally(exception -> {
}
}, null));

disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});

switch (closeType) {
case transferring -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
break;
}
case notWaitDisconnectClients -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
// Since the managed ledger has been closed, eat the error of clients disconnection.
log.error("[{}] Closed managed ledger, but disconnect clients failed,"
+ " this topic will be marked closed", topic, ex);
return null;
})));
break;
}
case waitDisconnectClients -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
}
}

return closeFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -226,7 +227,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
});
}

private void injectMockReplicatorProducerBuilder(
private Runnable injectMockReplicatorProducerBuilder(
BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator)
throws Exception {
String cluster2 = pulsar2.getConfig().getClusterName();
Expand All @@ -246,7 +247,8 @@ private void injectMockReplicatorProducerBuilder(
replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients");
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
PulsarClient spyClient = spy(internalClient);
replicationClients.put(cluster2, spyClient);
assertTrue(replicationClients.remove(cluster2, internalClient));
assertNull(replicationClients.putIfAbsent(cluster2, spyClient));

// Inject producer decorator.
doAnswer(invocation -> {
Expand Down Expand Up @@ -275,6 +277,12 @@ private void injectMockReplicatorProducerBuilder(
}).when(spyProducerBuilder).createAsync();
return spyProducerBuilder;
}).when(spyClient).newProducer(any(Schema.class));

// Return a cleanup injection task;
return () -> {
assertTrue(replicationClients.remove(cluster2, spyClient));
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
};
}

private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception {
Expand Down Expand Up @@ -368,7 +376,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
Expand Down Expand Up @@ -427,6 +435,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
});

// cleanup.
taskToClearInjection.run();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
Expand Down Expand Up @@ -531,7 +540,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
Expand Down Expand Up @@ -593,6 +602,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
});

// cleanup.
taskToClearInjection.run();
cleanupTopics(namespaceName, () -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
Expand Down Expand Up @@ -644,8 +654,9 @@ public void testUnFenceTopicToReuse() throws Exception {
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
});

// cleanup.
// cleanup the injection.
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
// cleanup.
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
Expand Down
Loading

0 comments on commit 06a9fec

Please sign in to comment.