Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 304d15a

Browse files
committedNov 15, 2021
PIP 110: Support Topic metadata (apache#12629).
1 parent 4cf4b85 commit 304d15a

File tree

16 files changed

+386
-50
lines changed

16 files changed

+386
-50
lines changed
 

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

+11
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public class ManagedLedgerConfig {
7777
private int newEntriesCheckDelayInMillis = 10;
7878
private Clock clock = Clock.systemUTC();
7979
private ManagedLedgerInterceptor managedLedgerInterceptor;
80+
private Map<String, String> topicMetadata;
8081

8182
public boolean isCreateIfMissing() {
8283
return createIfMissing;
@@ -623,6 +624,16 @@ public void setBookKeeperEnsemblePlacementPolicyProperties(
623624
this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
624625
}
625626

627+
628+
public Map<String, String> getTopicMetadata() {
629+
return topicMetadata;
630+
}
631+
632+
633+
public void setTopicMetadata(Map<String, String> topicMetadata) {
634+
this.topicMetadata = topicMetadata;
635+
}
636+
626637
public boolean isDeletionAtBatchIndexLevelEnabled() {
627638
return deletionAtBatchIndexLevelEnabled;
628639
}

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callbac
325325
log.info("Opening managed ledger {}", name);
326326

327327
// Fetch the list of existing ledgers in the managed ledger
328-
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback<ManagedLedgerInfo>() {
328+
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), config.getTopicMetadata(),
329+
new MetaStoreCallback<ManagedLedgerInfo>() {
329330
@Override
330331
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
331332
ledgersStat = stat;

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import java.util.List;
22+
import java.util.Map;
2223
import java.util.concurrent.CompletableFuture;
2324

2425
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -52,7 +53,24 @@ interface MetaStoreCallback<T> {
5253
* whether the managed ledger metadata should be created if it doesn't exist already
5354
* @throws MetaStoreException
5455
*/
55-
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback);
56+
default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
57+
MetaStoreCallback<ManagedLedgerInfo> callback) {
58+
getManagedLedgerInfo(ledgerName, createIfMissing, null, callback);
59+
}
60+
61+
/**
62+
* Get the metadata used by the ManagedLedger.
63+
*
64+
* @param ledgerName
65+
* the name of the ManagedLedger
66+
* @param createIfMissing
67+
* whether the managed ledger metadata should be created if it doesn't exist already
68+
* @param topicMetadata
69+
* topic metadata
70+
* @throws MetaStoreException
71+
*/
72+
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> topicMetadata,
73+
MetaStoreCallback<ManagedLedgerInfo> callback);
5674

5775
/**
5876
*

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Optional;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.CompletionException;
@@ -84,7 +85,7 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compr
8485
}
8586

8687
@Override
87-
public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
88+
public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> topicMetadata,
8889
MetaStoreCallback<ManagedLedgerInfo> callback) {
8990
// Try to get the content or create an empty node
9091
String path = PREFIX + ledgerName;
@@ -106,8 +107,17 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
106107

107108
store.put(path, new byte[0], Optional.of(-1L))
108109
.thenAccept(stat -> {
109-
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
110-
callback.operationComplete(info, stat);
110+
ManagedLedgerInfo.Builder ledgerBuilder = ManagedLedgerInfo.newBuilder();
111+
if (topicMetadata != null) {
112+
topicMetadata.forEach((k, v) -> {
113+
ledgerBuilder.addProperties(
114+
MLDataFormats.KeyValue.newBuilder()
115+
.setKey(k)
116+
.setValue(v)
117+
.build());
118+
});
119+
}
120+
callback.operationComplete(ledgerBuilder.build(), stat);
111121
}).exceptionally(ex -> {
112122
callback.operationFailed(getException(ex));
113123
return null;

‎managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -1632,6 +1632,36 @@ public void cursorReadsWithDiscardedEmptyLedgers() throws Exception {
16321632
assertEquals(c1.readEntries(1).size(), 0);
16331633
}
16341634

1635+
@Test
1636+
public void testSetTopicMetadata() throws Exception {
1637+
Map<String, String> properties = new HashMap<>();
1638+
properties.put("key1", "value1");
1639+
properties.put("key2", "value2");
1640+
final MetaStore store = factory.getMetaStore();
1641+
final CountDownLatch latch = new CountDownLatch(1);
1642+
final ManagedLedgerInfo[] storedMLInfo = new ManagedLedgerInfo[1];
1643+
store.getManagedLedgerInfo("my_test_ledger", true, properties, new MetaStoreCallback<ManagedLedgerInfo>() {
1644+
@Override
1645+
public void operationComplete(ManagedLedgerInfo result, Stat version) {
1646+
storedMLInfo[0] = result;
1647+
latch.countDown();
1648+
}
1649+
1650+
@Override
1651+
public void operationFailed(MetaStoreException e) {
1652+
latch.countDown();
1653+
fail("Should have failed here");
1654+
}
1655+
});
1656+
latch.await();
1657+
1658+
assertEquals(storedMLInfo[0].getPropertiesCount(), 2);
1659+
assertEquals(storedMLInfo[0].getPropertiesList().get(0).getKey(), "key1");
1660+
assertEquals(storedMLInfo[0].getPropertiesList().get(0).getValue(), "value1");
1661+
assertEquals(storedMLInfo[0].getPropertiesList().get(1).getKey(), "key2");
1662+
assertEquals(storedMLInfo[0].getPropertiesList().get(1).getValue(), "value2");
1663+
}
1664+
16351665
@Test
16361666
public void cursorReadsWithDiscardedEmptyLedgersStillListed() throws Exception {
16371667
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Optional;
2627
import java.util.Set;
2728
import java.util.concurrent.CompletableFuture;
@@ -574,6 +575,11 @@ protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
574575

575576
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
576577
boolean createLocalTopicOnly) {
578+
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly, null);
579+
}
580+
581+
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
582+
boolean createLocalTopicOnly, Map<String, String> topicMetadata) {
577583
Integer maxTopicsPerNamespace = null;
578584

579585
try {
@@ -640,7 +646,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
640646
return;
641647
}
642648

643-
provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
649+
provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly, topicMetadata)
644650
.thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
645651
.whenComplete((ignored, ex) -> {
646652
if (ex != null) {
@@ -679,7 +685,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
679685
((TopicsImpl) pulsar().getBrokerService()
680686
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
681687
.createPartitionedTopicAsync(
682-
topicName.getPartitionedTopicName(), numPartitions, true);
688+
topicName.getPartitionedTopicName(), numPartitions, true, null);
683689
})
684690
.exceptionally(throwable -> {
685691
log.error("Failed to create partition topic in cluster {}.", cluster, throwable);
@@ -718,13 +724,13 @@ protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName)
718724
});
719725
}
720726

721-
private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse,
722-
int numPartitions,
723-
boolean createLocalTopicOnly) {
727+
private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse, int numPartitions,
728+
boolean createLocalTopicOnly,
729+
Map<String, String> topicMetadata) {
724730
CompletableFuture<Void> future = new CompletableFuture<>();
725731
namespaceResources()
726732
.getPartitionedTopicResources()
727-
.createPartitionedTopicAsync(topicName, new PartitionedTopicMetadata(numPartitions))
733+
.createPartitionedTopicAsync(topicName, new PartitionedTopicMetadata(numPartitions, topicMetadata))
728734
.whenComplete((ignored, ex) -> {
729735
if (ex != null) {
730736
if (ex instanceof AlreadyExistsException) {

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ protected void internalRevokePermissionsOnTopic(String role) {
357357
revokePermissions(topicName.toString(), role);
358358
}
359359

360-
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
360+
protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> topicMetadata) {
361361
validateNonPartitionTopicName(topicName.getLocalName());
362362
if (topicName.isGlobal()) {
363363
validateGlobalNamespaceOwnership(namespaceName);
@@ -378,7 +378,7 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
378378
throw new RestException(Status.CONFLICT, "This topic already exists");
379379
}
380380

381-
Topic createdTopic = getOrCreateTopic(topicName);
381+
Topic createdTopic = getOrCreateTopic(topicName, topicMetadata);
382382
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
383383
} catch (Exception e) {
384384
if (e instanceof RestException) {
@@ -3750,8 +3750,12 @@ private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
37503750
}
37513751

37523752
private Topic getOrCreateTopic(TopicName topicName) {
3753-
return pulsar().getBrokerService().getTopic(
3754-
topicName.toString(), true).thenApply(Optional::get).join();
3753+
return getOrCreateTopic(topicName, null);
3754+
}
3755+
3756+
private Topic getOrCreateTopic(TopicName topicName, Map<String, String> topicMetadata) {
3757+
return pulsar().getBrokerService().getTopic(topicName.toString(), true, topicMetadata)
3758+
.thenApply(Optional::get).join();
37553759
}
37563760

37573761
/**

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java

+57-1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,33 @@ public void createPartitionedTopic(
171171
}
172172
}
173173

174+
@PUT
175+
@Path("/{property}/{cluster}/{namespace}/{topic}/partitions/topicMetadata")
176+
@ApiOperation(hidden = true, value = "Create a partitioned topic.",
177+
notes = "It needs to be called before creating a producer on a partitioned topic.")
178+
@ApiResponses(value = {
179+
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
180+
@ApiResponse(code = 403, message = "Don't have admin permission"),
181+
@ApiResponse(code = 406, message = "The number of partitions should be "
182+
+ "more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
183+
@ApiResponse(code = 409, message = "Partitioned topic already exist")})
184+
public void createPartitionedTopic(
185+
@Suspended final AsyncResponse asyncResponse,
186+
@PathParam("property") String property,
187+
@PathParam("cluster") String cluster,
188+
@PathParam("namespace") String namespace,
189+
@PathParam("topic") @Encoded String encodedTopic,
190+
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly,
191+
PartitionedTopicMetadata metadata) {
192+
try {
193+
validateTopicName(property, cluster, namespace, encodedTopic);
194+
internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly, metadata.topicMetadata);
195+
} catch (Exception e) {
196+
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
197+
resumeAsyncResponseExceptionally(asyncResponse, e);
198+
}
199+
}
200+
174201
@PUT
175202
@Path("/{tenant}/{cluster}/{namespace}/{topic}")
176203
@ApiOperation(value = "Create a non-partitioned topic.",
@@ -196,10 +223,39 @@ public void createNonPartitionedTopic(
196223
@PathParam("topic") @Encoded String encodedTopic,
197224
@ApiParam(value = "Is authentication required to perform this operation")
198225
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
226+
createNonPartitionedTopic(tenant, cluster, namespace, encodedTopic, authoritative, null);
227+
}
228+
229+
@PUT
230+
@Path("/{tenant}/{cluster}/{namespace}/{topic}/topicMetadata")
231+
@ApiOperation(value = "Create a non-partitioned topic.",
232+
notes = "This is the only REST endpoint from which non-partitioned topics could be created.")
233+
@ApiResponses(value = {
234+
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
235+
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
236+
@ApiResponse(code = 403, message = "Don't have admin permission"),
237+
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
238+
@ApiResponse(code = 412,
239+
message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
240+
@ApiResponse(code = 500, message = "Internal server error"),
241+
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
242+
})
243+
public void createNonPartitionedTopic(
244+
@ApiParam(value = "Specify the tenant", required = true)
245+
@PathParam("tenant") String tenant,
246+
@ApiParam(value = "Specify the cluster", required = true)
247+
@PathParam("cluster") String cluster,
248+
@ApiParam(value = "Specify the namespace", required = true)
249+
@PathParam("namespace") String namespace,
250+
@ApiParam(value = "Specify topic name", required = true)
251+
@PathParam("topic") @Encoded String encodedTopic,
252+
@ApiParam(value = "Is authentication required to perform this operation")
253+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
254+
@ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> topicMetadata) {
199255
validateNamespaceName(tenant, cluster, namespace);
200256
validateTopicName(tenant, cluster, namespace, encodedTopic);
201257
validateGlobalNamespaceOwnership();
202-
internalCreateNonPartitionedTopic(authoritative);
258+
internalCreateNonPartitionedTopic(authoritative, topicMetadata);
203259
}
204260

205261
/**

0 commit comments

Comments
 (0)
Please sign in to comment.