Skip to content

Commit ab2e6a8

Browse files
authored
Removed usage of sample/standalone/ns1 namespaces in standalone (#15186)
1 parent 40d7169 commit ab2e6a8

File tree

17 files changed

+63
-103
lines changed

17 files changed

+63
-103
lines changed

bin/pulsar-managed-ledger-admin

+4-4
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ mlPath : str
195195
managed-ledger path
196196
197197
eg:
198-
print-managed-ledger --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test
198+
print-managed-ledger --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test
199199
'''
200200
def printManagedLedgerCommand(zk, mlPath):
201201
print(getManagedLedgerInfo(zk, mlPath))
@@ -213,7 +213,7 @@ cursorName : str
213213
managed-cursor path
214214
215215
eg:
216-
print-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1
216+
print-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1
217217
'''
218218
def printManagedCursorCommand(zk, mlPath, cursorName):
219219
try:
@@ -236,7 +236,7 @@ mlPath : str
236236
deleteLedgerIds : str
237237
comma separated deleting ledger-ids (eg: 123,124)
238238
eg:
239-
delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --ledgerIds 3
239+
delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --ledgerIds 3
240240
'''
241241
def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds):
242242
try:
@@ -266,7 +266,7 @@ markDeletePosition: str
266266
markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1)
267267
268268
eg:
269-
update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1
269+
update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1 --cursorMarkDelete 0:1
270270
'''
271271
def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition):
272272
try:

faq.md

+12-12
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ There is regex subscription coming up in Pulsar 2.0. See [PIP-13](https://github
9494
### Does Pulsar have, or plan to have, a concept of log compaction where only the latest message with the same key will be kept ?
9595
Yes, see [PIP-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction) for more details.
9696

97-
### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"?
98-
On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics.
97+
### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"?
98+
On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics.
9999
The “subscription” concept is roughly similar to a “consumer-group” in Kafka. You can have multiple of them in the same topic, with different names.
100100

101101
If you use “exclusive”, a consumer will try to consume from all partitions, or fail if any partition is already being consumed.
@@ -105,7 +105,7 @@ The mode similar to Kafka is “failover” subscription. In this case, you have
105105
### What is the proxy component?
106106
It’s a component that was introduced recently. Essentially it’s a stateless proxy that speaks that Pulsar binary protocol. The motivation is to avoid (or overcome the impossibility) of direct connection between clients and brokers.
107107

108-
---
108+
---
109109

110110
## Usage and Configuration
111111
### Can I manually change the number of bundles after creating namespaces?
@@ -119,7 +119,7 @@ Yes, you can use the cli tool `bin/pulsar-admin persistent unsubscribe $TOPIC -s
119119

120120
### How are subscription modes set? Can I create new subscriptions over the WebSocket API?
121121
Yes, you can set most of the producer/consumer configuration option in websocket, by passing them as HTTP query parameters like:
122-
`ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub?subscriptionType=Shared`
122+
`ws://localhost:8080/ws/consumer/persistent/public/default/my-topic/my-sub?subscriptionType=Shared`
123123

124124
see [the doc](http://pulsar.apache.org/docs/latest/clients/WebSocket/#RunningtheWebSocketservice-1fhsvp).
125125

@@ -153,7 +153,7 @@ There is no currently "infinite" retention, other than setting to very high valu
153153
The key is that you should use different subscriptions for each consumer. Each subscription is completely independent from others.
154154

155155
### The default when creating a consumer, is it to "tail" from "now" on the topic, or from the "last acknowledged" or something else?
156-
So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now").
156+
So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now").
157157

158158
Once you reconnect, the subscription will still be there and it will be positioned on the last acknowledged messages from the previous session.
159159

@@ -190,16 +190,16 @@ What’s your use case for timeout on the `receiveAsync()`? Could that be achiev
190190
### Why do we choose to use bookkeeper to store consumer offset instead of zookeeper? I mean what's the benefits?
191191
ZooKeeper is a “consensus” system that while it exposes a key/value interface is not meant to support a large volume of writes per second.
192192

193-
ZK is not an “horizontally scalable” system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single “log” that is replicated consistently across the participants.
193+
ZK is not an “horizontally scalable” system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single “log” that is replicated consistently across the participants.
194194

195-
The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it..
195+
The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it..
196196

197197
To store consumers cursor positions, we need to write potentially a large number of updates per second. Typically we persist the cursor every 1 second, though the rate is configurable and if you want to reduce the amount of potential duplicates, you can increase the persistent frequency.
198198

199199
With BookKeeper it’s very efficient to have a large throughput across a huge number of different “logs”. In our case, we use 1 log per cursor, and it becomes feasible to persist every single cursor update.
200200

201201
### I'm facing some issue using `.receiveAsync` that it seems to be related with `UnAckedMessageTracker` and `PartitionedConsumerImpl`. We are consuming messages with `receiveAsync`, doing instant `acknowledgeAsync` when message is received, after that the process will delay the next execution of itself. In such scenario we are consuming a lot more messages (repeated) than the num of messages produced. We are using Partitioned topics with setAckTimeout 30 seconds and I believe this issue could be related with `PartitionedConsumerImpl` because the same test in a non-partitioned topic does not generate any repeated message.
202-
PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue.
202+
PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue.
203203

204204
The thing is that the unacked message tracker works at the partition level.So when the timeout happens, it’s able to request redelivery for the messages and clear them from the queue when that happens,
205205
but if the messages were already pushed into the shared queue, the “clearing” part will not happen.
@@ -229,8 +229,8 @@ A final option is to check the topic stats. This is a tiny bit involved, because
229229
There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use `-1` for that). The only option now is to use INT_MAX for `retentionTimeInMinutes` and LONG_MAX for `retentionSizeInMB`. It’s not “infinite” but 4085 years of retention should probably be enough!
230230

231231
### Is there a profiling option in Pulsar, so that we can breakdown the time costed in every stage? For instance, message A stay in queue 1ms, bk writing time 2ms(interval between sending to bk and receiving ack from bk) and so on.
232-
There are latency stats at different stages. In the client (eg: reported every 1min in info logs).
233-
In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics.
232+
There are latency stats at different stages. In the client (eg: reported every 1min in info logs).
233+
In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics.
234234

235235
In broker there’s just the write latency on BK, because there is no other queuing involved in the write path.
236236

@@ -242,7 +242,7 @@ you can create reader with `MessageId.earliest`
242242
yes, broker performs auth&auth while creating producer/consumer and this information presents under namespace policies.. so, if auth is enabled then broker does validation
243243

244244
### From what I’ve seen so far, it seems that I’d instead want to do a partitioned topic when I want a firehose/mix of data, and shuffle that firehose in to specific topics per entity when I’d have more discrete consumers. Is that accurate?
245-
Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are:
245+
Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are:
246246

247247
- Partitions -> Maintain a single “logical” topic but scale throughput to multiple machines. Also, ability to consume in order for a “partition” of the keys. In general, consumers are assigned a partition (and thus a subset of keys) without specifying anything.
248248

@@ -258,7 +258,7 @@ Main difference: a reader can be used when manually managing the offset/messageI
258258

259259

260260
### Hey, question on routing mode for partitioned topics. What is the default configuration and what is used in the Kafka adaptor?
261-
The default is to use the hash of the key on a message. If the message has no key, the producer will use a “default” partition (picks 1 random partition and use it for all the messages it publishes).
261+
The default is to use the hash of the key on a message. If the message has no key, the producer will use a “default” partition (picks 1 random partition and use it for all the messages it publishes).
262262

263263
This is to maintain the same ordering guarantee when no partitions are there: per-producer ordering.
264264

pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java

+21-58
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,20 @@
2424
import com.google.common.collect.Sets;
2525
import java.io.File;
2626
import java.nio.file.Paths;
27-
import java.util.List;
27+
import java.util.Collections;
2828
import java.util.Optional;
2929
import org.apache.bookkeeper.conf.ServerConfiguration;
3030
import org.apache.logging.log4j.LogManager;
3131
import org.apache.pulsar.broker.PulsarService;
3232
import org.apache.pulsar.broker.ServiceConfiguration;
3333
import org.apache.pulsar.broker.resources.NamespaceResources;
34+
import org.apache.pulsar.broker.resources.TenantResources;
3435
import org.apache.pulsar.client.admin.PulsarAdmin;
35-
import org.apache.pulsar.client.admin.PulsarAdminException;
36+
import org.apache.pulsar.common.naming.NamespaceName;
3637
import org.apache.pulsar.common.naming.TopicName;
3738
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
38-
import org.apache.pulsar.common.policies.data.ClusterData;
39+
import org.apache.pulsar.common.policies.data.Policies;
3940
import org.apache.pulsar.common.policies.data.TenantInfo;
40-
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
4141
import org.apache.pulsar.functions.worker.WorkerConfig;
4242
import org.apache.pulsar.functions.worker.WorkerService;
4343
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
@@ -301,18 +301,11 @@ public void start() throws Exception {
301301

302302
admin = broker.getAdminClient();
303303

304-
ClusterData clusterData = ClusterData.builder()
305-
.serviceUrl(broker.getWebServiceAddress())
306-
.serviceUrlTls(broker.getWebServiceAddressTls())
307-
.brokerServiceUrl(broker.getBrokerServiceUrl())
308-
.brokerServiceUrlTls(broker.getBrokerServiceUrlTls())
309-
.build();
310-
createSampleNameSpace(clusterData, cluster);
311-
312304
//create default namespace
313-
createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE);
305+
createNameSpace(cluster, TopicName.PUBLIC_TENANT,
306+
NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE));
314307
//create pulsar system namespace
315-
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString());
308+
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE);
316309
if (config.isTransactionCoordinatorEnabled()) {
317310
NamespaceResources.PartitionedTopicResources partitionedTopicResources =
318311
broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
@@ -327,52 +320,22 @@ public void start() throws Exception {
327320
log.debug("--- setup completed ---");
328321
}
329322

330-
private void createNameSpace(String cluster, String publicTenant, String defaultNamespace) {
331-
try {
332-
if (!admin.tenants().getTenants().contains(publicTenant)) {
333-
admin.tenants().createTenant(publicTenant,
334-
TenantInfo.builder()
335-
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
336-
.allowedClusters(Sets.newHashSet(cluster))
337-
.build());
338-
}
339-
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
340-
admin.namespaces().createNamespace(defaultNamespace);
341-
admin.namespaces().setNamespaceReplicationClusters(
342-
defaultNamespace, Sets.newHashSet(config.getClusterName()));
343-
}
344-
} catch (PulsarAdminException e) {
345-
log.info(e.getMessage(), e);
346-
}
347-
}
323+
private void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception {
324+
TenantResources tr = broker.getPulsarResources().getTenantResources();
325+
NamespaceResources nsr = broker.getPulsarResources().getNamespaceResources();
348326

349-
private void createSampleNameSpace(ClusterData clusterData, String cluster) {
350-
// Create a sample namespace
351-
final String tenant = "sample";
352-
final String globalCluster = "global";
353-
final String namespace = tenant + "/ns1";
354-
try {
355-
List<String> clusters = admin.clusters().getClusters();
356-
if (!clusters.contains(cluster)) {
357-
admin.clusters().createCluster(cluster, clusterData);
358-
} else {
359-
admin.clusters().updateCluster(cluster, clusterData);
360-
}
361-
// Create marker for "global" cluster
362-
if (!clusters.contains(globalCluster)) {
363-
admin.clusters().createCluster(globalCluster, ClusterData.builder().build());
364-
}
365-
366-
if (!admin.tenants().getTenants().contains(tenant)) {
367-
admin.tenants().createTenant(tenant,
368-
new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
369-
}
327+
if (!tr.tenantExists(publicTenant)) {
328+
tr.createTenant(publicTenant,
329+
TenantInfo.builder()
330+
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
331+
.allowedClusters(Sets.newHashSet(cluster))
332+
.build());
333+
}
370334

371-
if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
372-
admin.namespaces().createNamespace(namespace);
373-
}
374-
} catch (PulsarAdminException e) {
375-
log.warn(e.getMessage(), e);
335+
if (!nsr.namespaceExists(ns)) {
336+
Policies nsp = new Policies();
337+
nsp.replication_clusters = Collections.singleton(config.getClusterName());
338+
nsr.createPolicies(ns, nsp);
376339
}
377340
}
378341

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1457,8 +1457,8 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
14571457
// Example of lines are
14581458
// jvm_threads_current{cluster="standalone",} 203.0
14591459
// or
1460-
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
1461-
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
1460+
// pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
1461+
// topic="persistent://public/default/test-2"} 0.0 1517945780897
14621462
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$");
14631463
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
14641464

pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ public static Map<String, Metric> parseMetrics(String metrics) {
7373
// Example of lines are
7474
// jvm_threads_current{cluster="standalone",} 203.0
7575
// or
76-
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
77-
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
76+
// pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
77+
// topic="persistent://public/default/test-2"} 0.0 1517945780897
7878
Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$");
7979
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
8080
Arrays.asList(metrics.split("\n")).forEach(line -> {

pulsar-client-cpp/docs/MainPage.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ $ make
111111
Client client("pulsar://localhost:6650");
112112

113113
Consumer consumer;
114-
Result result = client.subscribe("persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name", consumer);
114+
Result result = client.subscribe("persistent://public/default/my-topic", "my-subscribtion-name", consumer);
115115
if (result != ResultOk) {
116116
LOG_ERROR("Failed to subscribe: " << result);
117117
return -1;
@@ -136,7 +136,7 @@ client.close();
136136
Client client("pulsar://localhost:6650");
137137
138138
Producer producer;
139-
Result result = client.createProducer("persistent://sample/standalone/ns1/my-topic", producer);
139+
Result result = client.createProducer("persistent://public/default/my-topic", producer);
140140
if (result != ResultOk) {
141141
LOG_ERROR("Error creating producer: " << result);
142142
return -1;

pulsar-client-cpp/pulsar-test-service-start.sh

+3-5
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,10 @@ $PULSAR_DIR/bin/pulsar-admin clusters create \
7474
--broker-url pulsar://localhost:6650/ \
7575
--broker-url-secure pulsar+ssl://localhost:6651/
7676

77-
# Create "public" tenant
78-
$PULSAR_DIR/bin/pulsar-admin tenants create public -r "anonymous" -c "standalone"
77+
# Update "public" tenant
78+
$PULSAR_DIR/bin/pulsar-admin tenants update public -r "anonymous" -c "standalone"
7979

80-
# Create "public/default" with no auth required
81-
$PULSAR_DIR/bin/pulsar-admin namespaces create public/default \
82-
--clusters standalone
80+
# Update "public/default" with no auth required
8381
$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default \
8482
--actions produce,consume \
8583
--role "anonymous"

pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class DefaultSchemasTest {
3636
private PulsarClient client;
3737

38-
private static final String TEST_TOPIC = "persistent://sample/standalone/ns1/test-topic";
38+
private static final String TEST_TOPIC = "test-topic";
3939

4040
@BeforeClass
4141
public void setup() throws PulsarClientException {

pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
@Slf4j
7676
public class PulsarSinkTest {
7777

78-
private static final String TOPIC = "persistent://sample/standalone/ns1/test_result";
78+
private static final String TOPIC = "test_result";
7979

8080
public static class TestSerDe implements SerDe<String> {
8181

0 commit comments

Comments
 (0)