Skip to content

Commit

Permalink
[Issue 11496][C++] Allow partitioned producers to start lazily (#11570)
Browse files Browse the repository at this point in the history
Fixes #11496 also matches part of PIP 79.

C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279

### Motivation

Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided.

PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in apache/pulsar#10279.

### Modifications

Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner
lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner
broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these
lookups and producer registrations are a waste except for the single chosen partition.

This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config
in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations).
The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered.

The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
  • Loading branch information
Vanlightly authored Aug 16, 2021
1 parent cc9f6e4 commit 8521729
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ def create_producer(self, topic,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
lazy_start_partitioned_producers=False,
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
Expand Down Expand Up @@ -518,6 +519,17 @@ def create_producer(self, topic,
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
* `lazy_start_partitioned_producers`:
This config affects producers of partitioned topics only. It controls whether
producers register and connect immediately to the owner broker of each partition
or start lazily on demand. The internal producer of one partition is always
started eagerly, chosen by the routing policy, but the internal producers of
any additional partitions are started on demand, upon receiving their first
message.
Using this mode can reduce the strain on brokers for topics with large numbers of
partitions and when the SinglePartition routing policy is used without keyed messages.
Because producer connection can be on demand, this can produce extra send latency
for the first messages of a given partition.
* `properties`:
Sets the properties for the producer. The properties associated with a producer
can be used for identify a producer at broker side.
Expand Down Expand Up @@ -558,6 +570,7 @@ def create_producer(self, topic,
_check_type(BatchingType, batching_type, 'batching_type')
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')

conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
Expand All @@ -571,6 +584,7 @@ def create_producer(self, topic,
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
conf.partitions_routing_mode(message_routing_mode)
conf.batching_type(batching_type)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
Expand Down
2 changes: 2 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ void export_config() {
.def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
.def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
.def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
.def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers)
.def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>())
.def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy<copy_const_reference>())
.def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
.def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy<copy_const_reference>())
Expand Down

0 comments on commit 8521729

Please sign in to comment.