Skip to content

Commit 353ffcb

Browse files
Technoboy-nicoloboschi
authored andcommitted
[feature][admin] Support to get topic properties. (apache#15944)
As apache#12818 has supported creating topics with metadata, this patch is adding a `get` API to support getting topic properties. (cherry picked from commit 1ebe4ee) (cherry picked from commit 6648f99)
1 parent f12f067 commit 353ffcb

File tree

8 files changed

+156
-0
lines changed

8 files changed

+156
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

+4
Original file line numberDiff line numberDiff line change
@@ -854,4 +854,8 @@ protected static boolean isRedirectException(Throwable ex) {
854854
&& ((WebApplicationException) realCause).getResponse().getStatus()
855855
== Status.TEMPORARY_REDIRECT.getStatusCode();
856856
}
857+
858+
protected static String getTopicNotFoundErrorMessage(String topic) {
859+
return String.format("Topic %s not found", topic);
860+
}
857861
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

+28
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,34 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author
568568
validateClientVersion();
569569
}
570570
return metadata;
571+
}
572+
573+
protected CompletableFuture<Map<String, String>> internalGetPropertiesAsync(boolean authoritative) {
574+
return validateTopicOwnershipAsync(topicName, authoritative)
575+
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA))
576+
.thenCompose(__ -> {
577+
if (topicName.isPartitioned()) {
578+
return getPropertiesAsync();
579+
}
580+
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
581+
.thenCompose(metadata -> {
582+
if (metadata.partitions == 0) {
583+
return getPropertiesAsync();
584+
}
585+
return CompletableFuture.completedFuture(metadata.properties);
586+
});
587+
});
588+
}
589+
590+
private CompletableFuture<Map<String, String>> getPropertiesAsync() {
591+
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
592+
.thenApply(opt -> {
593+
if (!opt.isPresent()) {
594+
throw new RestException(Status.NOT_FOUND,
595+
getTopicNotFoundErrorMessage(topicName.toString()));
596+
}
597+
return ((PersistentTopic) opt.get()).getManagedLedger().getProperties();
598+
});
571599
}
572600

573601
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

+34
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,40 @@ public PartitionedTopicMetadata getPartitionedMetadata(
861861
return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
862862
}
863863

864+
@GET
865+
@Path("/{tenant}/{namespace}/{topic}/properties")
866+
@ApiOperation(value = "Get topic properties.")
867+
@ApiResponses(value = {
868+
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
869+
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
870+
@ApiResponse(code = 403, message = "Don't have admin permission"),
871+
@ApiResponse(code = 404, message = "Topic does not exist"),
872+
@ApiResponse(code = 409, message = "Concurrent modification"),
873+
@ApiResponse(code = 412, message = "Topic name is invalid"),
874+
@ApiResponse(code = 500, message = "Internal server error")
875+
})
876+
public void getProperties(
877+
@Suspended final AsyncResponse asyncResponse,
878+
@ApiParam(value = "Specify the tenant", required = true)
879+
@PathParam("tenant") String tenant,
880+
@ApiParam(value = "Specify the namespace", required = true)
881+
@PathParam("namespace") String namespace,
882+
@ApiParam(value = "Specify topic name", required = true)
883+
@PathParam("topic") @Encoded String encodedTopic,
884+
@ApiParam(value = "Is authentication required to perform this operation")
885+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
886+
validatePersistentTopicName(tenant, namespace, encodedTopic);
887+
internalGetPropertiesAsync(authoritative)
888+
.thenAccept(asyncResponse::resume)
889+
.exceptionally(ex -> {
890+
if (!isRedirectException(ex)) {
891+
log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
892+
}
893+
resumeAsyncResponseExceptionally(asyncResponse, ex);
894+
return null;
895+
});
896+
}
897+
864898
@DELETE
865899
@Path("/{tenant}/{namespace}/{topic}/partitions")
866900
@ApiOperation(value = "Delete a partitioned topic.",

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

+21
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,27 @@ public void testPersistentTopicList() throws Exception {
858858
assertEquals(topicsInNs.size(), 0);
859859
}
860860

861+
@Test
862+
public void testCreateAndGetTopicProperties() throws Exception {
863+
final String namespace = "prop-xyz/ns2";
864+
final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
865+
admin.namespaces().createNamespace(namespace, 20);
866+
Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
867+
nonPartitionedTopicProperties.put("key1", "value1");
868+
admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
869+
Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
870+
Assert.assertNotNull(properties11);
871+
Assert.assertEquals(properties11.get("key1"), "value1");
872+
873+
final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
874+
Map<String, String> partitionedTopicProperties = new HashMap<>();
875+
partitionedTopicProperties.put("key2", "value2");
876+
admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
877+
Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
878+
Assert.assertNotNull(properties22);
879+
Assert.assertEquals(properties22.get("key2"), "value2");
880+
}
881+
861882
@Test
862883
public void testNonPersistentTopics() throws Exception {
863884
final String namespace = "prop-xyz/ns2";

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,13 @@ public void testCreateNonPartitionedTopic() {
424424
PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
425425
testTenant, testNamespace, topicName2, true, false);
426426
Assert.assertNull(pMetadata2.properties);
427+
AsyncResponse metaResponse = mock(AsyncResponse.class);
428+
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
429+
persistentTopics.getProperties(metaResponse,
430+
testTenant, testNamespace, topicName2, true);
431+
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
432+
Assert.assertNotNull(metaResponseCaptor2.getValue());
433+
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
427434
}
428435

429436
@Test
@@ -447,6 +454,12 @@ public void testCreatePartitionedTopic() {
447454
Assert.assertEquals(pMetadata2.properties.size(), 1);
448455
Assert.assertEquals(pMetadata2.properties, topicMetadata);
449456
});
457+
AsyncResponse response3 = mock(AsyncResponse.class);
458+
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
459+
persistentTopics.getProperties(response3, testTenant, testNamespace, topicName2, true);
460+
verify(response3, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
461+
Assert.assertNotNull(metaResponseCaptor2.getValue());
462+
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
450463
}
451464

452465
@Test

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java

+16
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,22 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
663663
*/
664664
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic);
665665

666+
/**
667+
* Get properties of a topic.
668+
* @param topic
669+
* Topic name
670+
* @return Topic properties
671+
*/
672+
Map<String, String> getProperties(String topic) throws PulsarAdminException;
673+
674+
/**
675+
* Get properties of a topic asynchronously.
676+
* @param topic
677+
* Topic name
678+
* @return a future that can be used to track when the topic properties is returned
679+
*/
680+
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);
681+
666682
/**
667683
* Delete a partitioned topic.
668684
* <p/>

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java

+26
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,32 @@ public void failed(Throwable throwable) {
448448
return future;
449449
}
450450

451+
@Override
452+
public Map<String, String> getProperties(String topic) throws PulsarAdminException {
453+
return sync(() -> getPropertiesAsync(topic));
454+
}
455+
456+
@Override
457+
public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
458+
TopicName tn = validateTopic(topic);
459+
WebTarget path = topicPath(tn, "properties");
460+
final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
461+
asyncGetRequest(path,
462+
new InvocationCallback<Map<String, String>>() {
463+
464+
@Override
465+
public void completed(Map<String, String> response) {
466+
future.complete(response);
467+
}
468+
469+
@Override
470+
public void failed(Throwable throwable) {
471+
future.completeExceptionally(getApiException(throwable.getCause()));
472+
}
473+
});
474+
return future;
475+
}
476+
451477
@Override
452478
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
453479
deletePartitionedTopic(topic, false);

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

+14
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
118118
jcommander.addCommand("create", new CreateNonPartitionedCmd());
119119
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
120120
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
121+
jcommander.addCommand("get-properties", new GetPropertiesCmd());
121122

122123
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
123124
jcommander.addCommand("peek-messages", new PeekMessages());
@@ -591,6 +592,19 @@ void run() throws Exception {
591592
}
592593
}
593594

595+
@Parameters(commandDescription = "Get the topic properties.")
596+
private class GetPropertiesCmd extends CliCommand {
597+
598+
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
599+
private java.util.List<String> params;
600+
601+
@Override
602+
void run() throws Exception {
603+
String topic = validateTopicName(params);
604+
print(getTopics().getProperties(topic));
605+
}
606+
}
607+
594608
@Parameters(commandDescription = "Delete a partitioned topic. "
595609
+ "It will also delete all the partitions of the topic if it exists.")
596610
private class DeletePartitionedCmd extends CliCommand {

0 commit comments

Comments
 (0)