Topic auto-creation
- The partitioned topic auto-creation is dependent on
pulsarClient.getPartitionsForTopic
- It triggers partitioned metadata creation by
pulsarClient.getPartitionsForTopic
- And triggers the topic partition creation by producers' registration and consumers' registration.
- It triggers partitioned metadata creation by
- When calling
pulsarClient.getPartitionsForTopic(topicName)
, Pulsar will automatically create the partitioned topic metadata if it does not exist, either usingHttpLookupService
orBinaryProtoLookupService
.
Now pulsarClient.getPartitionsForTopic
's behavior
case | broker allow auto-create |
param allow create if not exists |
non-partitioned topic | partitioned topic | current behavior |
---|---|---|---|---|---|
1 | true/false |
true/false |
exists: true |
REST API: partitions: 0 Binary API: partitions: 0 |
|
2 | true/false |
true/false |
exists: true partitions: 3 |
REST API: partitions: 3 Binary API: partitions: 3 |
|
3 | true |
true |
REST API: - create new: true - partitions: 3 Binary API: - create new: true - partitions: 3 |
||
4 | true |
false |
REST API: - create new: false - partitions: 0 Binary API: not support |
||
5 | false |
true |
REST API: - create new: false - partitions: 0 Binary API: - create new: false - partitions: 0 |
- Broker allows
auto-create
: see also the configallowAutoTopicCreation
inbroker.conf
. - Param allow
create if not exists
- Regarding the HTTP API
PersistentTopics.getPartitionedMetadata
, it is an optional param which namedcheckAllowAutoCreation,
and the default value isfalse
. - Regarding the
pulsar-admin
API, it depends on the HTTP APIPersistentTopics.getPartitionedMetadata
, and it always sets the paramcheckAllowAutoCreation
tofalse
and can not be set manually. - Regarding the client API
HttpLookupService.getPartitionedTopicMetadata
, it depends on the HTTP APIPersistentTopics.getPartitionedMetadata
, and it always sets the paramcheckAllowAutoCreation
totrue
and can not be set manually. - Regarding the client API
BinaryProtoLookupService.getPartitionedTopicMetadata
, it always tries to create partitioned metadata.
- Regarding the HTTP API
REST API & HTTP API
: Since there are only two implementations of the 4 ways to get partitioned metadata, we call HTTP APIPersistentTopics.getPartitionedMetadata
,pulsar-admin
, andHttpLookupService.getPartitionedTopicMetadata
HTTP API, and callBinaryProtoLookupService.getPartitionedTopicMetadata
Binary API.
The param create if not exists
of the Binary API is always true.
- For case 4 of
pulsarClient.getPartitionsForTopic
's behavior, it always tries to create the partitioned metadata, but the API name isgetxxx
. - For case 5 of
pulsarClient.getPartitionsForTopic
's behavior, it returns a0
partitioned metadata, but the topic does not exist. For the correct behavior of this case, we had discussed here before. - BTW, flink-connector-pulsar is using this API to create partitioned topic metadata.
- Regarding the case 4: Add a new API
PulsarClient.getPartitionsForTopic(String, boolean)
to support the feature that just get partitioned topic metadata and do not try to create one. See detail below. - Regarding the case 5: Instead of returning a
0
partitioned metadata, respond to a not found error when callingpulsarClient.getPartitionsForTopic(String)
if the topic does not exist.
When you call the public API pulsarClient.getPartitionsForTopic
, pulsar will not create the partitioned metadata anymore.
LookupService.java
- CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName);
/**
* 1. Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; return "{partition: 0}" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
* @version 3.3.0
*/
+ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName, boolean createIfAutoCreationEnabled);
The behavior of the new API LookupService.getPartitionedTopicMetadata(TopicName, boolean)
.
case | client-side param: createIfAutoCreationEnabled |
non-partitioned topic | partitioned topic | broker-side: topic auto-creation strategy | current behavior |
---|---|---|---|---|---|
1 | true/false |
exists: true |
REST/Binary API: {partitions: 0} |
||
2 | true/false |
exists: true partitions: 2 |
REST/Binary API: {partitions: 2} |
||
3 | true |
allowAutoTopicCreation : true allowAutoTopicCreationType : non-partitioned |
REST/Binary API: - create new: true - {partitions: 0} |
||
4 | true |
allowAutoTopicCreation : true allowAutoTopicCreationType : partitioned defaultNumPartitions : 2 |
REST/Binary API: - create new: true - {partitions: 2} |
||
5 | false |
allowAutoTopicCreation : true |
REST/Binary API: - Not found error |
||
6 | true |
allowAutoTopicCreation : false |
REST/Binary API: - Not found error |
PulsarClient.java
// This API existed before. Not change it, thus ensuring compatibility.
+ @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(TopicName, boolean)}.
- CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+ default CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
+ getPartitionsForTopic(topic, true);
+ }
/**
* 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
* @version 3.3.0
*/
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean createIfAutoCreationEnabled);
The behavior of the new API PulsarClient.getPartitionsForTopic(String, boolean)
.
case | client-side param: createIfAutoCreationEnabled |
non-partitioned topic | partitioned topic | broker-side: topic autp-creation strategy | current behavior |
---|---|---|---|---|---|
1 | true/false |
exists: true |
REST/Binary API: ["{tenat}/{ns}/topic"] |
||
2 | true/false |
exists: true partitions: 2 |
REST/Binary API : ["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"] |
||
3 | true |
allowAutoTopicCreation : true allowAutoTopicCreationType : non-partitioned |
REST/Binary API: - create new: true - ["{tenat}/{ns}/topic"] |
||
4 | true |
allowAutoTopicCreation : true allowAutoTopicCreationType : partitioned defaultNumPartitions : 2 |
REST/Binary API: - create new: true - ["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"] |
||
5 | false |
allowAutoTopicCreation : true |
REST/Binary API: - Not found error |
||
5 | true |
allowAutoTopicCreation : false |
REST/Binary API: - Not found error |
CommandPartitionedTopicMetadata
message CommandPartitionedTopicMetadata {
+ optional bool metadata_auto_creation_enabled = 6 [default = true];
}
FeatureFlags
message FeatureFlags {
+ optional bool supports_binary_api_get_partitioned_meta_with_param_created_false = 5 [default = false];
}
-
Old version client and New version Broker: The client will call the old API.
-
New version client and Old version Broker: The feature flag
supports_binary_api_get_partitioned_meta_with_param_created_false
will befalse
. The client will get a not-support error if the paramcreateIfAutoCreationEnabled
is false.