Skip to content

Commit 1ebe4ee

Browse files
authored
[feature][admin] Support to get topic properties. (#15944)
### Motivation As #12818 has supported creating topics with metadata, this patch is adding a `get` API to support getting topic properties.
1 parent 2c7c9e5 commit 1ebe4ee

File tree

7 files changed

+154
-2
lines changed

7 files changed

+154
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

+28
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,34 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
565565
});
566566
}
567567

568+
protected CompletableFuture<Map<String, String>> internalGetPropertiesAsync(boolean authoritative) {
569+
return validateTopicOwnershipAsync(topicName, authoritative)
570+
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA))
571+
.thenCompose(__ -> {
572+
if (topicName.isPartitioned()) {
573+
return getPropertiesAsync();
574+
}
575+
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
576+
.thenCompose(metadata -> {
577+
if (metadata.partitions == 0) {
578+
return getPropertiesAsync();
579+
}
580+
return CompletableFuture.completedFuture(metadata.properties);
581+
});
582+
});
583+
}
584+
585+
private CompletableFuture<Map<String, String>> getPropertiesAsync() {
586+
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
587+
.thenApply(opt -> {
588+
if (!opt.isPresent()) {
589+
throw new RestException(Status.NOT_FOUND,
590+
getTopicNotFoundErrorMessage(topicName.toString()));
591+
}
592+
return ((PersistentTopic) opt.get()).getManagedLedger().getProperties();
593+
});
594+
}
595+
568596
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
569597
return pulsar().getNamespaceService().checkTopicExists(topicName)
570598
.thenAccept(exist -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

+34
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,40 @@ public void getPartitionedMetadata(
918918
});
919919
}
920920

921+
@GET
922+
@Path("/{tenant}/{namespace}/{topic}/properties")
923+
@ApiOperation(value = "Get topic properties.")
924+
@ApiResponses(value = {
925+
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
926+
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
927+
@ApiResponse(code = 403, message = "Don't have admin permission"),
928+
@ApiResponse(code = 404, message = "Topic does not exist"),
929+
@ApiResponse(code = 409, message = "Concurrent modification"),
930+
@ApiResponse(code = 412, message = "Topic name is invalid"),
931+
@ApiResponse(code = 500, message = "Internal server error")
932+
})
933+
public void getProperties(
934+
@Suspended final AsyncResponse asyncResponse,
935+
@ApiParam(value = "Specify the tenant", required = true)
936+
@PathParam("tenant") String tenant,
937+
@ApiParam(value = "Specify the namespace", required = true)
938+
@PathParam("namespace") String namespace,
939+
@ApiParam(value = "Specify topic name", required = true)
940+
@PathParam("topic") @Encoded String encodedTopic,
941+
@ApiParam(value = "Is authentication required to perform this operation")
942+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
943+
validatePersistentTopicName(tenant, namespace, encodedTopic);
944+
internalGetPropertiesAsync(authoritative)
945+
.thenAccept(asyncResponse::resume)
946+
.exceptionally(ex -> {
947+
if (!isRedirectException(ex)) {
948+
log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
949+
}
950+
resumeAsyncResponseExceptionally(asyncResponse, ex);
951+
return null;
952+
});
953+
}
954+
921955
@DELETE
922956
@Path("/{tenant}/{namespace}/{topic}/partitions")
923957
@ApiOperation(value = "Delete a partitioned topic.",

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

+21
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,27 @@ public void testPersistentTopicList() throws Exception {
858858
assertEquals(topicsInNs.size(), 0);
859859
}
860860

861+
@Test
862+
public void testCreateAndGetTopicProperties() throws Exception {
863+
final String namespace = "prop-xyz/ns2";
864+
final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
865+
admin.namespaces().createNamespace(namespace, 20);
866+
Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
867+
nonPartitionedTopicProperties.put("key1", "value1");
868+
admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
869+
Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
870+
Assert.assertNotNull(properties11);
871+
Assert.assertEquals(properties11.get("key1"), "value1");
872+
873+
final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
874+
Map<String, String> partitionedTopicProperties = new HashMap<>();
875+
partitionedTopicProperties.put("key2", "value2");
876+
admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
877+
Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
878+
Assert.assertNotNull(properties22);
879+
Assert.assertEquals(properties22.get("key2"), "value2");
880+
}
881+
861882
@Test
862883
public void testNonPersistentTopics() throws Exception {
863884
final String namespace = "prop-xyz/ns2";

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ public void testNonPartitionedTopics() {
453453

454454
@Test
455455
public void testCreateNonPartitionedTopic() {
456-
final String topic = "standard-topic-partition-a";
456+
final String topic = "testCreateNonPartitionedTopic-a";
457457
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic);
458458
AsyncResponse response = mock(AsyncResponse.class);
459459
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic, true, null);
@@ -476,7 +476,7 @@ public void testCreateNonPartitionedTopic() {
476476
response = mock(AsyncResponse.class);
477477
metaResponse = mock(AsyncResponse.class);
478478
metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
479-
final String topic2 = "standard-topic-partition-b";
479+
final String topic2 = "testCreateNonPartitionedTopic-b";
480480
TopicName topicName2 = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic2);
481481
Map<String, String> topicMetadata = Maps.newHashMap();
482482
topicMetadata.put("key1", "value1");
@@ -488,6 +488,13 @@ public void testCreateNonPartitionedTopic() {
488488
testTenant, testNamespace, topic2, true, false);
489489
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
490490
Assert.assertNull(metaResponseCaptor.getValue().properties);
491+
metaResponse = mock(AsyncResponse.class);
492+
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
493+
persistentTopics.getProperties(metaResponse,
494+
testTenant, testNamespace, topic2, true);
495+
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
496+
Assert.assertNotNull(metaResponseCaptor2.getValue());
497+
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
491498
}
492499

493500
@Test
@@ -516,6 +523,12 @@ public void testCreatePartitionedTopic() {
516523
Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
517524
Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
518525
});
526+
AsyncResponse response3 = mock(AsyncResponse.class);
527+
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
528+
persistentTopics.getProperties(response3, testTenant, testNamespace, topicName2, true);
529+
verify(response3, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
530+
Assert.assertNotNull(metaResponseCaptor2.getValue());
531+
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
519532
}
520533

521534
@Test

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java

+16
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,22 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
717717
*/
718718
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic);
719719

720+
/**
721+
* Get properties of a topic.
722+
* @param topic
723+
* Topic name
724+
* @return Topic properties
725+
*/
726+
Map<String, String> getProperties(String topic) throws PulsarAdminException;
727+
728+
/**
729+
* Get properties of a topic asynchronously.
730+
* @param topic
731+
* Topic name
732+
* @return a future that can be used to track when the topic properties is returned
733+
*/
734+
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);
735+
720736
/**
721737
* Delete a partitioned topic and its schemas.
722738
* <p/>

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java

+26
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,32 @@ public void failed(Throwable throwable) {
482482
return future;
483483
}
484484

485+
@Override
486+
public Map<String, String> getProperties(String topic) throws PulsarAdminException {
487+
return sync(() -> getPropertiesAsync(topic));
488+
}
489+
490+
@Override
491+
public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
492+
TopicName tn = validateTopic(topic);
493+
WebTarget path = topicPath(tn, "properties");
494+
final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
495+
asyncGetRequest(path,
496+
new InvocationCallback<Map<String, String>>() {
497+
498+
@Override
499+
public void completed(Map<String, String> response) {
500+
future.complete(response);
501+
}
502+
503+
@Override
504+
public void failed(Throwable throwable) {
505+
future.completeExceptionally(getApiException(throwable.getCause()));
506+
}
507+
});
508+
return future;
509+
}
510+
485511
@Override
486512
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
487513
deletePartitionedTopic(topic, false);

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

+14
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
117117
jcommander.addCommand("create", new CreateNonPartitionedCmd());
118118
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
119119
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
120+
jcommander.addCommand("get-properties", new GetPropertiesCmd());
120121

121122
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
122123
jcommander.addCommand("peek-messages", new PeekMessages());
@@ -605,6 +606,19 @@ void run() throws Exception {
605606
}
606607
}
607608

609+
@Parameters(commandDescription = "Get the topic properties.")
610+
private class GetPropertiesCmd extends CliCommand {
611+
612+
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
613+
private java.util.List<String> params;
614+
615+
@Override
616+
void run() throws Exception {
617+
String topic = validateTopicName(params);
618+
print(getTopics().getProperties(topic));
619+
}
620+
}
621+
608622
@Parameters(commandDescription = "Delete a partitioned topic. "
609623
+ "It will also delete all the partitions of the topic if it exists."
610624
+ "And the application is not able to connect to the topic(delete then re-create with same name) again "

0 commit comments

Comments
 (0)