Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] One topic can be closed multiple times concurrently #17524

Merged
merged 7 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -273,6 +274,8 @@ protected TopicStatsHelper initialValue() {
@Getter
private final ExecutorService orderedExecutor;

private volatile CloseFutures closeFutures;

@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();

Expand All @@ -296,6 +299,50 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult {
Long estimatedOldestUnacknowledgedMessageTimestamp;
}

/***
* We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
*
* The topic closing will be called the below scenarios:
* 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}.
* 2. Namespace bundle transfer or unloading.
* a. The unloading topic triggered by unloading namespace bundles will not wait for clients disconnect. Relate
* to {@link CloseFutures#notWaitDisconnectClients}.
* b. The unloading topic triggered by unloading namespace bundles was seperated to two steps when using
* {@link ExtensibleLoadManagerImpl}.
* b-1. step-1: fence the topic on the original Broker, and do not trigger reconnections of clients. Relate
* to {@link CloseFutures#transferring}. This step is a half closing.
* b-2. step-2: send the owner broker information to clients and disconnect clients. Relate
* to {@link CloseFutures#notWaitDisconnectClients}.
*
* The three futures will be setting as the below rule:
* Event: Topic close.
* - If the first one closing is called by "close and not disconnect clients":
* - {@link CloseFutures#transferring} will be initialized as "close and not disconnect clients".
* - {@link CloseFutures#waitDisconnectClients} ang {@link CloseFutures#notWaitDisconnectClients} will be empty,
* the second closing will do a new close after {@link CloseFutures#transferring} is completed.
* - If the first one closing is called by "close and not wait for clients disconnect":
* - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect".
* - {@link CloseFutures#notWaitDisconnectClients} ang {@link CloseFutures#transferring} will be
* initialized as "not waiting for clients disconnect" .
* - If the first one closing is called by "close and wait for clients disconnect", the three futures will be
* initialized as "waiting for clients disconnect".
* Event: Topic delete.
* the three futures will be initialized as "waiting for clients disconnect".
*/
private class CloseFutures {
private final CompletableFuture<Void> transferring;
private final CompletableFuture<Void> notWaitDisconnectClients;
private final CompletableFuture<Void> waitDisconnectClients;

public CloseFutures(CompletableFuture<Void> transferring, CompletableFuture<Void> waitDisconnectClients,
CompletableFuture<Void> notWaitDisconnectClients) {
this.transferring = transferring;
this.waitDisconnectClients = waitDisconnectClients;
this.notWaitDisconnectClients = notWaitDisconnectClients;
}
}
lhotari marked this conversation as resolved.
Show resolved Hide resolved

private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -1414,8 +1461,11 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
// Mark the progress of close to prevent close calling concurrently.
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());

return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -1517,6 +1567,11 @@ public void deleteLedgerComplete(Object ctx) {
unfenceTopicToResume();
}
});

FutureUtil.completeAfter(closeFutures.transferring, res);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
return res;
} finally {
lock.writeLock().unlock();
}
Expand All @@ -1532,6 +1587,12 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
return close(true, closeWithoutWaitingClientDisconnect);
}

private enum CloseTypes {
transferring,
notWaitDisconnectClients,
waitDisconnectClients;
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
Expand All @@ -1542,32 +1603,57 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
@Override
public CompletableFuture<Void> close(
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
if (!disconnectClients) {
transferring = true;
}
// Choose the close type.
CloseTypes closeType;
if (!disconnectClients) {
closeType = CloseTypes.transferring;
} else if (closeWithoutWaitingClientDisconnect) {
closeType = CloseTypes.notWaitDisconnectClients;
} else {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
fenceTopicToCloseOrDelete();
closeType = CloseTypes.waitDisconnectClients;
}
/** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/
CompletableFuture<Void> inProgressTransferCloseTask = null;
try {
// Return in-progress future if exists.
if (isClosingOrDeleting) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
if (closeType == CloseTypes.transferring) {
return closeFutures.transferring;
}
if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) {
return closeFutures.notWaitDisconnectClients;
}
if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) {
return closeFutures.waitDisconnectClients;
}
if (transferring) {
inProgressTransferCloseTask = closeFutures.transferring;
}
}
fenceTopicToCloseOrDelete();
lhotari marked this conversation as resolved.
Show resolved Hide resolved
if (closeType == CloseTypes.transferring) {
transferring = true;
this.closeFutures = new CloseFutures(new CompletableFuture(), null, null);
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return closeFuture;
heesung-sn marked this conversation as resolved.
Show resolved Hide resolved
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
}
} finally {
lock.writeLock().unlock();
}

List<CompletableFuture<Void>> futures = new ArrayList<>();
if (inProgressTransferCloseTask != null) {
futures.add(inProgressTransferCloseTask);
}

futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
if (disconnectClients) {
if (closeType != CloseTypes.transferring) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)));
Expand Down Expand Up @@ -1605,40 +1691,79 @@ public CompletableFuture<Void> close(
}
}

CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
CompletableFuture<Void> disconnectClientsInCurrentCall = null;
// Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring.
AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
switch (closeType) {
case transferring -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
break;
}
case notWaitDisconnectClients -> {
disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
break;
}
case waitDisconnectClients -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
disconnectClientsToCache.set(disconnectClientsInCurrentCall);
}
}

clientCloseFuture.thenRun(() -> {
// After having disconnected all producers/consumers, close the managed ledger
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (closeType != CloseTypes.transferring) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (disconnectClients) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (closeType != CloseTypes.transferring) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}, null);
}).exceptionally(exception -> {
}
}, null));

disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});

switch (closeType) {
case transferring -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
break;
}
case notWaitDisconnectClients -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
// Since the managed ledger has been closed, eat the error of clients disconnection.
log.error("[{}] Closed managed ledger, but disconnect clients failed,"
+ " this topic will be marked closed", topic, ex);
return null;
})));
break;
}
case waitDisconnectClients -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
}
}

return closeFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -226,7 +227,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
});
}

private void injectMockReplicatorProducerBuilder(
private Runnable injectMockReplicatorProducerBuilder(
BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator)
throws Exception {
String cluster2 = pulsar2.getConfig().getClusterName();
Expand All @@ -246,7 +247,8 @@ private void injectMockReplicatorProducerBuilder(
replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients");
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
PulsarClient spyClient = spy(internalClient);
replicationClients.put(cluster2, spyClient);
assertTrue(replicationClients.remove(cluster2, internalClient));
assertNull(replicationClients.putIfAbsent(cluster2, spyClient));

// Inject producer decorator.
doAnswer(invocation -> {
Expand Down Expand Up @@ -275,6 +277,12 @@ private void injectMockReplicatorProducerBuilder(
}).when(spyProducerBuilder).createAsync();
return spyProducerBuilder;
}).when(spyClient).newProducer(any(Schema.class));

// Return a cleanup injection task;
return () -> {
assertTrue(replicationClients.remove(cluster2, spyClient));
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
};
}

private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception {
Expand Down Expand Up @@ -368,7 +376,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
Expand Down Expand Up @@ -427,6 +435,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
});

// cleanup.
taskToClearInjection.run();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
Expand Down Expand Up @@ -531,7 +540,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
Expand Down Expand Up @@ -593,6 +602,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
});

// cleanup.
taskToClearInjection.run();
cleanupTopics(namespaceName, () -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
Expand Down Expand Up @@ -644,8 +654,9 @@ public void testUnFenceTopicToReuse() throws Exception {
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
});

// cleanup.
// cleanup the injection.
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
// cleanup.
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
Expand Down
Loading
Loading