Skip to content

Commit

Permalink
[fix][broker] The topic might reference a closed ledger (apache#22860)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jun 13, 2024
1 parent 75d7e55 commit a91a172
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}

private CompactionServiceFactory loadCompactionServiceFactory() {
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
var compactionServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,38 +1001,38 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
return getTopic(TopicName.get(topic), createIfMissing, properties);
}

/**
* Retrieves or creates a topic based on the specified parameters.
* 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException.
* 1. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
* 2. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
* 3. If the topic metadata not exists, and {@code createIfMissing} is false,
* returns an empty Optional in a CompletableFuture. And this empty future not be added to the map.
* 4. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
* Any exceptions will remove the topicFuture from the map.
*
* @param topicName The name of the topic, potentially including partition information.
* @param createIfMissing If true, creates the topic if it does not exist.
* @param properties Topic configuration properties used during creation.
* @return CompletableFuture with an Optional of the topic if found or created, otherwise empty.
*/
public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing,
Map<String, String> properties) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topicName.toString(), topicFuture);
} else {
// a non-existing topic in the cache shouldn't prevent creating a topic
if (createIfMissing) {
if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
return topicFuture;
} else {
return topicFuture.thenCompose(value -> {
if (!value.isPresent()) {
// retry and create topic
return getTopic(topicName, createIfMissing, properties);
} else {
// in-progress future completed successfully
return CompletableFuture.completedFuture(value);
}
});
}
} else {
return topicFuture;
}
}
// If topic future exists in the cache returned directly regardless of whether it fails or timeout.
CompletableFuture<Optional<Topic>> tp = topics.get(topicName.toString());
if (tp != null) {
return tp;
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
Expand All @@ -1047,44 +1047,48 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies));
} else {
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
});
}
});
} else {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies));
}
});
});
} else {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
if (!topics.containsKey(topicName.toString())) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
}
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

Expand All @@ -1095,11 +1099,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
});
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
Expand All @@ -1109,11 +1115,15 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
} else {
});
} else {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture == null) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
topicFuture = CompletableFuture.completedFuture(Optional.empty());
}
});
return topicFuture;
}
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
Expand Down Expand Up @@ -1252,15 +1262,9 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
return FutureUtil.failedFuture(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic;
try {
Expand All @@ -1283,7 +1287,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
}).exceptionally(ex -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex);
});
return null;
Expand Down Expand Up @@ -1534,14 +1537,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
topicFuture.completeExceptionally(new NotAllowedException(
"Broker is unable to load persistent topic"));
return topicFuture;
}

checkTopicNsOwnership(topic)
.thenRun(() -> {
Expand All @@ -1556,6 +1551,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
// do not recreate topic if topic is already migrated and deleted by broker
// so, avoid creating a new topic if migration is already started
if (ex != null && (ex.getCause() instanceof TopicMigratedException)) {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex.getCause());
return null;
}
Expand All @@ -1570,6 +1566,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
}
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex.getCause());
return null;
});
Expand Down Expand Up @@ -1744,6 +1741,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand All @@ -1760,6 +1758,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic, topicFuture);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception {
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
Expand All @@ -1452,6 +1443,7 @@ public void testCleanupTopic() throws Exception {
ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

// Re-create topic will success.
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit a91a172

Please sign in to comment.