-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP 79][client] Add lazy-loading feature to PartitionedProducer #10279
Conversation
/pulsarbot run-failure-checks |
164d1b6
to
d49aab4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks this awesome proposal.
I left some early feedback after doing a first pass.
I am not sure about concurrent aspects during producer creation. I will take another look soon
@@ -1521,6 +1522,17 @@ public void testMaxTopicsPerNamespace() throws Exception { | |||
} | |||
|
|||
// check producer/consumer auto create partitioned topic | |||
final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you modifying an existing test?
It is okay to refactor and reuse an existing test but in this case probably you are altering the behaviour of the existing guest and also you are not sure that you are testing your feature properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you modifying an existing test?
Because this feature is modifying existing behavior. Currently, a partitioned producer connects to all partitions. Some cases of tests suppose "partitioned producer connects to all partitions(and also create all partition automatically)".
In this change, I modified this procedure to lazy-loading and modified test cases to follow it(here is a part of test codes for this feature).
If we should not change existing behavior, I'll change this feature to be able to stop.
log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition, | ||
createException); | ||
try { | ||
producers.remove(partition).close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not sure that we are removing the newly create producer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for it is to recreate internal producer when producer creation is failed. This behavior is tested here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that we should test the result for "producers.remove(partition)" for equality ('==') with prod
@sijie @eolivelli @merlimat Though there are conflicts, we want to confirm the motivation and solution make sense before resolving conflicts. |
I have submitted a PR for lazy producers in the C++ client (#11570), I did the work before seeing this PIP-79. The approach I took was to create the producers without starting the lookup and connection procedure. So the collection of producers is as big as the number of partitions, and changes when the number partitions change, but the lookup and connect procedure is only started on a producer's first message. It does not block in sendAsync but kicks off the procedure asynchronously allowing messages to be buffered until the producer is connected. In the C++ client, deadlocks were an issue so I avoided any blocking code. I don't know if that approach makes sense for the Java client as I have not studied how the Java client works in detail yet. But that seems to be the main difference between my C++ changes and this Java client change. |
Fixes #11496 also matches part of PIP 79. C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279 ### Motivation Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided. PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in #10279. ### Modifications Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these lookups and producer registrations are a waste except for the single chosen partition. This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations). The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered. The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
d49aab4
to
844cca7
Compare
return producers.get(0).getProducerName(); | ||
return producers.get(firstPartitionIndex).getProducerName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change breaks existing behavior. If it is not approved, I'll be changed to create the partition-0
producer for backward compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't be the same producerName for all the producers ?
why not computing the name in the constructor and cache it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't be the same producerName for all the producers ?
In my understanding, currently producerName is calculated for each partition if doesn't set it at ProducerBuilder.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1110 to 1111 in fe4cd09
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() | |
: service.generateUniqueProducerName(); |
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 1364 to 1366 in fe4cd09
if (this.producerName == null) { | |
this.producerName = producerName; | |
} |
why not computing the name in the constructor and cache it ?
If needed, I'll add the behavior that compute and cache producerName to below.
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
Lines 134 to 160 in fe4cd09
producer.producerCreatedFuture().handle((prod, createException) -> { | |
if (createException != null) { | |
setState(State.Failed); | |
createFail.compareAndSet(null, createException); | |
} | |
// we mark success if all the partitions are created | |
// successfully, else we throw an exception | |
// due to any | |
// failure in one of the partitions and close the successfully | |
// created partitions | |
if (completed.incrementAndGet() == topicMetadata.numPartitions()) { | |
if (createFail.get() == null) { | |
setState(State.Ready); | |
log.info("[{}] Created partitioned producer", topic); | |
producerCreatedFuture().complete(PartitionedProducerImpl.this); | |
} else { | |
log.error("[{}] Could not create partitioned producer.", topic, createFail.get().getCause()); | |
closeAsync().handle((ok, closeException) -> { | |
producerCreatedFuture().completeExceptionally(createFail.get()); | |
client.cleanupProducer(this); | |
return null; | |
}); | |
} | |
} | |
return null; | |
}); |
/pulsarbot run-failure-checks |
Rebased to current master commit |
@eolivelli @Vanlightly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left another round of comments
@merlimat @codelipenghui @rdhabalia can you please take a look ?
for (int i = 0; i < 3; i++) { | ||
try { | ||
producer.newMessage().value("msg".getBytes()).send(); | ||
} catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test will fail anyway, no need to catch the exception
p.newMessage().value("msg".getBytes()).send(); | ||
} catch (Throwable e) { | ||
log.info("Exception: ", e); | ||
fail(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about
catch (Exception e) {
log.info("Exception: ", e);
throw new RuntimeException(e);
}
for (int i = 0; i < 2; i++) { | ||
try { | ||
p.newMessage().value("msg".getBytes()).send(); | ||
} catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
.newProducer(Schema.JSON(Schemas.BytesRecord.class)) | ||
.topic(topic) | ||
.enableBatching(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to change this test ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, I'll introduce internal producer lazy-loading feature. When internal producer is elected by MessageRouter and isn't created yet, create it lazily.
https://github.com/equanz/pulsar/blob/844cca7cf84b76b7d27f12de4dbf36e23cbf29e9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L193-L211
Therefore, if it isn't elected by MessageRouter, doesn't create producer and also topic in this test case.
For above reason, we should add Producer#send
.
When enableBatching, not easier to elect all of internal producers at client side.
Lines 82 to 84 in fe4cd09
if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary. | |
long currentMs = clock.millis(); | |
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions()); |
@@ -418,7 +418,11 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { | |||
long resetTimeInMillis = TimeUnit.SECONDS | |||
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr)); | |||
admin.topics().createPartitionedTopic(topicName, partitions); | |||
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); | |||
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to change this test ? (enableBatching(false)
)
@@ -863,7 +863,7 @@ public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() | |||
.messageRouter(new MessageRouter() { | |||
@Override | |||
public int choosePartition(Message<?> msg, TopicMetadata metadata) { | |||
return Integer.parseInt(msg.getKey()) % metadata.numPartitions(); | |||
return msg.hasKey() ? Integer.parseInt(msg.getKey()) % metadata.numPartitions() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change looks unrelated to this patch, please revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add this behavior for authn/authz (please see also #11570 (comment) ). Elect first partition index by MessageRouter with blank message.
https://github.com/equanz/pulsar/blob/844cca7cf84b76b7d27f12de4dbf36e23cbf29e9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L87-L88
If we shouldn't change this test and behavior,
- add pseudo message key like
xxx
to blank message - handle exceptions in PartitionedProducerImpl
- select first partition index without MessageRouter (e.g. randomly select)
- if message router elects partition from part of partitions like singlepartition routing mode, create redundant producer for this change
- etc.
@@ -182,7 +182,7 @@ public void testPublishCompactAndConsumePartitionedTopics(Supplier<String> servi | |||
.messageRouter(new MessageRouter() { | |||
@Override | |||
public int choosePartition(Message<?> msg, TopicMetadata metadata) { | |||
return Integer.parseInt(msg.getKey()) % metadata.numPartitions(); | |||
return msg.hasKey() ? Integer.parseInt(msg.getKey()) % metadata.numPartitions() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to change this test ?
return producers.get(0).getProducerName(); | ||
return producers.get(firstPartitionIndex).getProducerName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't be the same producerName for all the producers ?
why not computing the name in the constructor and cache it ?
log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition, | ||
createException); | ||
try { | ||
producers.remove(partition).close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that we should test the result for "producers.remove(partition)" for equality ('==') with prod
@eolivelli |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that the changes in the test cases are showing too many behaviour changes.
We should add a configuration option to enable this behaviour explicitly
8c20c02
to
fc83a5d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@eolivelli Thank you for your review! @massakam @k2la Rebased to resolve conflicts. Also, add fc83a5d commit to follow #12287 feature even if lazy-loading feature is enabled. PTAL. |
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
Outdated
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
@massakam PTAL |
…e#11570) Fixes apache#11496 also matches part of PIP 79. C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279 ### Motivation Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided. PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in apache#10279. ### Modifications Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these lookups and producer registrations are a waste except for the single chosen partition. This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations). The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered. The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
…che#10279) * feat: add lazy loading feature to PartitionedProducerImpl * feat: add partial round robin routing mode * test: add tests for lazy-loading * fix: fix producer closing code at lazy-loading * test: remove unnecessary handling, fail from test codes * feat: add enableLazyStartPartitionedProducers config * test: fix test for lazy-loading config * fix: address comments * fix: add partition-change interceptor * fix: address comments
Fixes #11496 also matches part of PIP 79. C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279 ### Motivation Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided. PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in apache/pulsar#10279. ### Modifications Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these lookups and producer registrations are a waste except for the single chosen partition. This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations). The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered. The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
Master Issue: https://github.com/apache/pulsar/wiki/PIP-79%3A-Reduce-redundant-producers-from-partitioned-producer
Motivation
Please see the PIP document.
This is a part of implementations.
I will submit the next PR about PartitionedTopicStats later.#10534 is the next PR.Modifications
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation