Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-157 Bucketing topic metadata to allow more topics per namespace #16901

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,20 @@ private long getNumberOfEntries(Range<PositionImpl> range,

public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
String managedLedgerName) throws Exception {
return estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName));
return estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName),
managedLedgerName);
}

public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
TopicName topicName) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
TopicName topicName, String managedLedgerName) throws Exception {
long numberOfEntries = 0;
long totalSize = 0;
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
final PersistentOfflineTopicStats offlineTopicStats = new PersistentOfflineTopicStats(managedLedgerName,
brokerName);

// calculate total managed ledger size and number of entries without loading the topic
readLedgerMeta(factory, topicName, ledgers);
readLedgerMeta(factory, topicName, ledgers, managedLedgerName);
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) {
numberOfEntries += ls.getEntries();
totalSize += ls.getSize();
Expand All @@ -130,15 +130,15 @@ public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFac
}

// calculate per cursor message backlog
calculateCursorBacklogs(factory, topicName, ledgers, offlineTopicStats);
calculateCursorBacklogs(factory, managedLedgerName, ledgers, offlineTopicStats);
offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());

return offlineTopicStats;
}

private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicName topicName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers,
final String managedLedgerName) throws Exception {
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch mlMetaCounter = new CountDownLatch(1);
Expand Down Expand Up @@ -208,14 +208,13 @@ public void operationFailed(ManagedLedgerException.MetaStoreException e) {
}
}

private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, final TopicName topicName,
private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, final String managedLedgerName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers,
final PersistentOfflineTopicStats offlineTopicStats) throws Exception {

if (ledgers.isEmpty()) {
return;
}
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,10 @@ public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
return getStore().deleteRecursive(tenantBundlePath);
}

public CompletableFuture<Integer> getBucketCountAsync(NamespaceName namespaceName) {
return getPoliciesAsync(namespaceName)
.thenApply(optPolicies -> optPolicies.map(policies -> policies.number_of_buckets).orElse(1));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
localPolicies = new LocalPoliciesResources(localMetadataStore, operationTimeoutSec);
loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
bookieResources = new BookieResources(localMetadataStore, operationTimeoutSec);
topicResources = new TopicResources(localMetadataStore);
topicResources = new TopicResources(localMetadataStore, namespaceResources);
} else {
dynamicConfigResources = null;
localPolicies = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
Expand All @@ -41,22 +42,34 @@ public class TopicResources {
private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";

private final MetadataStore store;
private final NamespaceResources namespaceResources;

private final Map<BiConsumer<String, NotificationType>, Pattern> topicListeners;

public TopicResources(MetadataStore store) {
public TopicResources(MetadataStore store, NamespaceResources namespaceResources) {
this.store = store;
this.namespaceResources = namespaceResources;
topicListeners = new ConcurrentHashMap<>();
store.registerListener(this::handleNotification);
}

public CompletableFuture<List<String>> listPersistentTopicsAsync(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";

return store.getChildren(path).thenApply(children ->
children.stream().map(c -> TopicName.get(TopicDomain.persistent.toString(), ns, decode(c)).toString())
.collect(Collectors.toList())
);
CompletableFuture<List<String>> topics = getTopicsAtPath(path);
return topics.thenApply(children -> children.stream()
.map(name -> TopicName.get(TopicDomain.persistent.toString(), ns, decode(name)).toString())
.collect(Collectors.toList()));
}

private CompletableFuture<List<String>> getTopicsAtPath(String path) {
CompletableFuture<List<String>> childrenOfNamespace = store.getChildren(path);
CompletableFuture<List<String>> topics = childrenOfNamespace.thenCompose(children ->
!children.isEmpty() && children.get(0).startsWith(TopicName.BUCKET_ID_PERFIX)
? children.stream().map(bucket -> store.getChildren(path + "/" + bucket))
.collect(FutureUtil.toList())
: CompletableFuture.completedFuture(children));
Comment on lines +66 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks identical to L76-81, could they share a common helper method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. I did not realize this. I extracted the common part into a method.

return topics;
}

public CompletableFuture<List<String>> getExistingPartitions(TopicName topic) {
Expand All @@ -65,27 +78,34 @@ public CompletableFuture<List<String>> getExistingPartitions(TopicName topic) {

public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, TopicDomain domain) {
String topicPartitionPath = MANAGED_LEDGER_PATH + "/" + ns + "/" + domain;
return store.getChildren(topicPartitionPath).thenApply(topics ->
topics.stream()
CompletableFuture<List<String>> topics = getTopicsAtPath(topicPartitionPath);
return topics.thenApply(localNames ->
localNames.stream()
.map(s -> String.format("%s://%s/%s", domain.value(), ns, decode(s)))
.collect(Collectors.toList())
);
}

public CompletableFuture<Void> deletePersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.delete(path, Optional.of(-1L));
return namespaceResources.getBucketCountAsync(topic.getNamespaceObject()).thenCompose(buckets -> {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(buckets);
return store.delete(path, Optional.of(-1L));
});
}

public CompletableFuture<Void> createPersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.put(path, new byte[0], Optional.of(-1L))
.thenApply(__ -> null);
return namespaceResources.getBucketCountAsync(topic.getNamespaceObject()).thenCompose(buckets -> {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(buckets);
return store.put(path, new byte[0], Optional.of(-1L))
.thenApply(__ -> null);
});
}

public CompletableFuture<Boolean> persistentTopicExists(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.exists(path);
return namespaceResources.getBucketCountAsync(topic.getNamespaceObject()).thenCompose(buckets -> {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(buckets);
return store.exists(path);
});
}

public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns) {
Expand Down Expand Up @@ -141,7 +161,8 @@ void handleNotification(Notification notification) {

Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) {
return Pattern.compile(
MANAGED_LEDGER_PATH + "/(" + namespaceName + ")/(" + TopicDomain.persistent + ")/(" + "[^/]+)");
MANAGED_LEDGER_PATH + "/(" + namespaceName + ")/(" + TopicDomain.persistent
+ ")/(?:\\$[0-9]+/)?([^/]+)");
}

public void registerPersistentTopicListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ public class TopicResourcesTest {

private MetadataStore metadataStore;
private TopicResources topicResources;
private NamespaceResources namespaceResources;

@BeforeMethod
public void setup() {
metadataStore = mock(MetadataStore.class);
topicResources = new TopicResources(metadataStore);
namespaceResources = mock(NamespaceResources.class);
topicResources = new TopicResources(metadataStore, namespaceResources);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,35 @@ private static void deleteManagedLedgers(MetadataStore metadataStore, ManagedLed
final String tenantRoot = managedLedgersRoot + "/" + tenant;
metadataStore.getChildren(tenantRoot).join().forEach(namespace -> {
final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
metadataStore.getChildren(namespaceRoot).join().forEach(topic -> {
final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic));
try {
managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding());
} catch (InterruptedException | ManagedLedgerException e) {
log.error("Failed to delete ledgers of {}: {}", topicName, e);
throw new RuntimeException(e);
metadataStore.getChildren(namespaceRoot).join().forEach(namespaceChild -> {
if (namespaceChild.startsWith("$")) {
// "$" means this is a bucket and topics are children of the bucket
final String bucketRoot = String.join("/", namespaceRoot, namespaceChild);
metadataStore.getChildren(bucketRoot).join().stream().map(topic ->
TopicName.get(String.join("/", tenant, namespace, topic))).forEach(topicName ->{
deleteManagedLedger(managedLedgerFactory,
topicName.getPersistenceNamingEncoding(namespaceChild), topicName);
}
);
} else {
TopicName topicName = TopicName.get(String.join("/", tenant, namespace, namespaceChild));
deleteManagedLedger(managedLedgerFactory, topicName.getPersistenceNamingEncoding(1), topicName);
}
});
});
});
}

private static void deleteManagedLedger(ManagedLedgerFactory managedLedgerFactory, String name,
TopicName topicName) {
try {
managedLedgerFactory.delete(name);
} catch (InterruptedException | ManagedLedgerException e) {
log.error("Failed to delete ledgers of {}: {}", topicName, e);
throw new RuntimeException(e);
}
}

private static void deleteSchemaLedgers(MetadataStore metadataStore, BookKeeper bookKeeper) {
final String schemaLedgersRoot = "/schemas";
metadataStore.getChildren(schemaLedgersRoot).join().forEach(tenant -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,9 +1380,10 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean
}

protected void internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse asyncResponse) {
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
pulsar().getPulsarResources().getNamespaceResources().getBucketCountAsync(namespaceName)
.thenCompose(buckets -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
.thenAccept(__ -> {
String managedLedger = topicName.getPersistenceNamingEncoding();
String managedLedger = topicName.getPersistenceNamingEncoding(buckets);
pulsar().getManagedLedgerFactory()
.asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
Expand All @@ -1396,7 +1397,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}).exceptionally(ex -> {
})).exceptionally(ex -> {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
Expand Down Expand Up @@ -3214,23 +3215,25 @@ protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync
}
}

return pulsar().getBrokerService().getManagedLedgerConfig(topicName)
.thenCompose(config -> {
ManagedLedgerOfflineBacklog offlineTopicBacklog =
new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(),
pulsar().getAdvertisedAddress(), false);
try {
PersistentOfflineTopicStats estimateOfflineTopicStats =
offlineTopicBacklog.estimateUnloadedTopicBacklog(
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(),
topicName);
pulsar().getBrokerService()
.cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
return CompletableFuture.completedFuture(estimateOfflineTopicStats);
} catch (Exception e) {
throw new RestException(e);
}
});
return pulsar().getPulsarResources().getNamespaceResources()
.getBucketCountAsync(topicName.getNamespaceObject()).thenCompose(buckets ->
pulsar().getBrokerService().getManagedLedgerConfig(topicName)
.thenCompose(config -> {
ManagedLedgerOfflineBacklog offlineTopicBacklog =
new ManagedLedgerOfflineBacklog(config.getDigestType(),
config.getPassword(), pulsar().getAdvertisedAddress(), false);
try {
PersistentOfflineTopicStats estimateOfflineTopicStats =
offlineTopicBacklog.estimateUnloadedTopicBacklog(
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(),
topicName, topicName.getPersistenceNamingEncoding(buckets));
pulsar().getBrokerService()
.cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
return CompletableFuture.completedFuture(estimateOfflineTopicStats);
} catch (Exception e) {
throw new RestException(e);
}
}));

});
}
Expand Down
Loading