KAFKA-10888: Sticky partition leads to uneven produce msg#12049
KAFKA-10888: Sticky partition leads to uneven produce msg#12049junrao merged 7 commits intoapache:trunkfrom
Conversation
artemlivshits
left a comment
There was a problem hiding this comment.
Added some annotation to explain code movements.
There was a problem hiding this comment.
Just added recordInfo here.
There was a problem hiding this comment.
Instead of flat map {tp -> batches}, it's now a map of maps {topic -> {partition -> {batches, etc.}}, for a couple reasons:
- For queue size statistics calculation is more efficient if partitions are grouped by topic.
- There are other per-topic data (partitioner info).
There was a problem hiding this comment.
This is mostly just moving part of the logic from append to a separate function.
There was a problem hiding this comment.
Just a new argument.
There was a problem hiding this comment.
Now it's a different loop because it's map of maps, but the body is the same.
There was a problem hiding this comment.
Changes here is just refactoring of existing logic.
There was a problem hiding this comment.
Needed to extend the mock to call the setPartition callback.
There was a problem hiding this comment.
This and testAdaptiveBuiltInPartitioner are new additions, the rest is just tweaks to reflect changes in function signature.
There was a problem hiding this comment.
This is a new addition, the rest is just updates to reflect function signatures / callback changes.
There was a problem hiding this comment.
Changes here are just to reflect function signature changes.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the PR. Made a pass of non-testing files. A few comments below.
There was a problem hiding this comment.
This can be a bit confusing since we don't run the full default partitioning logic here. Also, we probably want to refer to it as built-in partitioning since we haven't removed DefaultPartitioner.
There was a problem hiding this comment.
Could we just fold the logic in ProducerInterceptors.extractTopicPartition to topicPartition()?
There was a problem hiding this comment.
Could we add a comment that this is for testing purpose?
There was a problem hiding this comment.
We typically don't use getter. So this can just be loadStatsRangeEnd.
There was a problem hiding this comment.
Could we explain the difference between length and queueSizes.length here?
There was a problem hiding this comment.
We are adding additional logic to estimate the accumulated bytes, which duplicates some of the code in RecordAccumulator. I am wondering if we could track the accumulated bytes by reusing the estimation logic inside RecordAccumulator. All appends to a partition are already synchronized on the corresponding dq in RecordAccumulator. Maybe we could do the following.
- To append to RecordAccumulator, we find the current sticky partition instance and call tryAppend under dq lock.
- After the successful append, we update the accumulated bytes for this partition based on MemoryRecordsBuilder.estimatedBytesWritten(). If the accumulated bytes exceeds the sticky batch size, we call BuiltInPartitioner.nextPartition() to switch partition.
- A concurrent send() call may try to append to the previous partition after the partition is switched. We could have the caller pass in a reference to the expected partition instance to tryAppend(). Inside tryAppend), we could then check if the expected partition is the same as the current sticky partition. If not, the caller needs to obtain the current sticky partition and retry tryAppend().
There was a problem hiding this comment.
I think it could work, let me try to implement it. In the meanwhile, I've addressed all other comments.
There was a problem hiding this comment.
Re-implemented this logic as suggested 0013801.
There was a problem hiding this comment.
Could we annotate this method as deprecated?
There was a problem hiding this comment.
Since this is a public class, could we annotate it as deprecated?
There was a problem hiding this comment.
You've probably meant to annotate the DefaultPartitioner.
artemlivshits
left a comment
There was a problem hiding this comment.
@junrao thank you for review. I've addressed most comments except for changing record size estimator, I'll experiment and see if it works out.
There was a problem hiding this comment.
You've probably meant to annotate the DefaultPartitioner.
There was a problem hiding this comment.
Renamed to cumulativeFrequencyTable.
There was a problem hiding this comment.
I think it could work, let me try to implement it. In the meanwhile, I've addressed all other comments.
There was a problem hiding this comment.
This may go away if changing estimation logic works out.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. Made a pass of all files. A few more comments.
There was a problem hiding this comment.
This comment can be a bit confusing since we always call deallocate in the finally block.
There was a problem hiding this comment.
It seems this method doesn't throw InterruptedException?
There was a problem hiding this comment.
We used to check if the producer is closed here. Is that still needed?
There was a problem hiding this comment.
This check is moved to tryAppend.
There was a problem hiding this comment.
Should we assert that partitionInfo is not null?
There was a problem hiding this comment.
It can be null in the case we didn't use built-in partitioner.
There was a problem hiding this comment.
Should we assert that partitionInfo is not null?
There was a problem hiding this comment.
It can be null in the case we didn't use built-in partitioner.
There was a problem hiding this comment.
EMPTY_HEADERS is no longer referenced.
There was a problem hiding this comment.
In the adaptive case, it seems that we should also avoid selecting partitions with no leader, if possible?
There was a problem hiding this comment.
When we build the table, we exclude partitions with no leaders. Added a comment.
There was a problem hiding this comment.
Do we need an array since we only return one partition?
There was a problem hiding this comment.
Array is a way to capture the result from the callback.
There was a problem hiding this comment.
Perhaps it's more intuitive to use an AtomicInteger?
artemlivshits
left a comment
There was a problem hiding this comment.
@junrao thank you for comments, addressed.
There was a problem hiding this comment.
Yeah, that's a good question. The MockProducer doesn't have RecordAccumulator, so I just left the code as it was. Interestingly, this code never calls "onNewBatch", so the default partitioner never switches partition. But I think we should fix it as a separate issue.
There was a problem hiding this comment.
When we build the table, we exclude partitions with no leaders. Added a comment.
There was a problem hiding this comment.
It can be null in the case we didn't use built-in partitioner.
There was a problem hiding this comment.
It can be null in the case we didn't use built-in partitioner.
There was a problem hiding this comment.
Updated javadoc to use fully qualified name.
There was a problem hiding this comment.
I presume this would only work for keyed messages (partition is determined by the hash) because onNewBatch is never called, so a sticky partition would never switch. We should probably fix it, but as a separate change.
There was a problem hiding this comment.
Good point, added some tracing.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. Just a few more minor comments. A couple of other things.
- Could you rebase?
- It would be useful to see if we need to add any new metrics. For example, it might be useful to have a metric that tracks the number of nodes excluded due to high latency. This can be done in a separate PR.
There was a problem hiding this comment.
It seems that by default, kstreams just forwards the partitioning logic to DefaultPartitioner (which implements onNewBatch) and it should work for both keyed and non-keyed messages. Since the new built-in partition is strictly better, it seems that we should just switch the default to the built-in partitioner. I agree that we could do that in a separate PR.
There was a problem hiding this comment.
DefaultPartitioner implements onNewBatch, right? I agree that we could address this in a separate PR.
There was a problem hiding this comment.
Perhaps it's better to do the logging once inside nextPartition() instead of here and updatePartitionInfo()?
There was a problem hiding this comment.
Could we log what the meaning of the two lengths?
There was a problem hiding this comment.
Could we add a comment that partitions w/o leader are excluded from the input arrays?
There was a problem hiding this comment.
Added comments in params.
There was a problem hiding this comment.
Perhaps it's more intuitive to use an AtomicInteger?
The design is described in detail in KIP-794 https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner. Implementation notes: The default partitioning logic is moved to the BuiltInPartitioner class (there is one object per topic). The object keeps track of how many bytes are produced per-partition and once the amount exceeds batch.size, switches to the next partition (note that partition switch decision is decoupled from batching). The object also keeps track of probability weights that are based on the queue sizes (the larger the queue size is the less chance for the next partition to be chosen). The queue sizes are calculated in the RecordAccumulator in the `ready` method, the method already enumerates all partitions so we just add some extra logic into the existing O(N) method. The partition switch decision may take O(logN), where N is the number partitions per topic, but it happens only once per batch.size (and the logic is avoided when all queues are of equal size). Produce bytes accounting logic is lock-free. When partitioner.availability.timeout.ms is non-0, RecordAccumulator keeps stats on "node latency" which is defined as the difference between the last time the node had a batch waiting to be send and the last time the node was ready to take a new batch. If this difference exceeds partitioner.availability.timeout.ms we don't switch to that partition until the node is ready.
Address review comments except for size estimation.
Re-implement appendedSize accounting as suggested by Jun.
Address review comments.
Address review comments.
artemlivshits
left a comment
There was a problem hiding this comment.
@junrao thank you for review, addressed comments and rebased. Will file a ticket to add metrics once this PR is merged.
There was a problem hiding this comment.
Missed, now rephrased.
There was a problem hiding this comment.
DefaultPartitioner implements onNewBatch, but MockProducer doesn't call it. It only calls .partition, which for unkeyed messages returns the same partition until the onNewBatch is called.
There was a problem hiding this comment.
DefaultPartitioner implements onNewBatch, but DefaultStreamPartitioner doesn't seem to ever call it (the DefaultPartitioner is a private object). Without onNewBatch, the DefaultPartitioner.partition would return the same partition for unkeyed messages.
There was a problem hiding this comment.
Added comments in params.
|
Hmm, for some reason the update didn't trigger any build & test jobs [Update: they started after some time] |
Fix build break.
Fix failed test.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the latest PR. LGTM. Do you have additional test results to share?
There was a problem hiding this comment.
Hmm, onNewBatch() in DefaultPartitioner is not called by the application code. It's called inside Sender.
Converting DefaultStreamPartitioner to the new built-in partitioner is a bit tricky since kstreams calls the partitioner explicitly and then passes in the partition during send. We need to change kstreams code so that it lets the producer determine the partition. This can be done in a separate PR. cc @guozhangwang
| } | ||
|
|
||
| public TopicPartition topicPartition() { | ||
| if (record == null) |
There was a problem hiding this comment.
We have a bunch of code in send() that depends on record being not null. Perhaps it's better to assert non-null record early in send()?
There was a problem hiding this comment.
I've added it because there is a test that passes a null and expects some error handling. From that perspective, adding an assert would contradict the expectation that there is defined and tested error handling (in my mind, an assert means the behavior is undefined and all bets are off).
There was a problem hiding this comment.
Hmm, by returning a null, we are breaking the contract that RecordMetadata.topic() and RecordMetadata.partition() is never null in producer callback and interceptor. Is that a new or existing test?
There was a problem hiding this comment.
This is an existing test https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1041
It passes in null and expects that the error goes through the interceptor's callback, but topicPartition() is called to supply an argument to the callback (which can handle null topicPartition) and it was throwing NPE.
I personally would just declare record as non-null and treat it as a bug in the application (undefined behavior), but as defined currently, it's a tested run-time error condition with some expected behavior.
There was a problem hiding this comment.
Thanks for the explanation. Since this is the existing behavior, we could keep the code as it is in the PR and improve it in the future.
| * @return sticky partition info object | ||
| */ | ||
| StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) { | ||
| StickyPartitionInfo partitionInfo = stickyPartitionInfo.get(); |
There was a problem hiding this comment.
This is an existing issue with the DefaultPartitioner. It seems that if the sticky partition loses the leader, we will continue sending new records to this partition. Perhaps it's better to force the partition selection in that case. This can be addressed separately.
artemlivshits
left a comment
There was a problem hiding this comment.
@junrao I've re-done perf testing, the results are pretty much the same as published in the KIP-794.
| } | ||
|
|
||
| public TopicPartition topicPartition() { | ||
| if (record == null) |
There was a problem hiding this comment.
I've added it because there is a test that passes a null and expects some error handling. From that perspective, adding an assert would contradict the expectation that there is defined and tested error handling (in my mind, an assert means the behavior is undefined and all bets are off).
…ache#12049)" This reverts commit f7db603.
…ache#12049)" This reverts commit f7db603. The main PR for this is apache#12342
| assert partition != RecordMetadata.UNKNOWN_PARTITION; | ||
|
|
||
| // Update the current time in case the buffer allocation blocked above. | ||
| long nowMs = time.milliseconds(); |
There was a problem hiding this comment.
Hmm, this call now happens under the queue lock, which might be the root cause for KAFKA-14020. Will move it outside, just after allocation of new buffer at line 312.
There was a problem hiding this comment.
It seems this line doesn't need to be in the synchronized part. It just need to be after the free.allocate() call. However, this code is only called very time when a new batch is created.
There was a problem hiding this comment.
Yep, the previous code had it just after the allocate and while moving code around I accidentally moved it under the lock. Fix #12365.
The design is described in detail in KIP-794
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner.
Implementation notes:
The default partitioning logic is moved to the BuiltInPartitioner class
(there is one object per topic). The object keeps track of how many
bytes are produced per-partition and once the amount exceeds batch.size,
switches to the next partition (note that partition switch decision is
decoupled from batching). The object also keeps track of probability
weights that are based on the queue sizes (the larger the queue size
is the less chance for the next partition to be chosen). The queue
sizes are calculated in the RecordAccumulator in the
readymethod,the method already enumerates all partitions so we just add some extra
logic into the existing O(N) method. The partition switch decision may
take O(logN), where N is the number partitions per topic, but it happens
only once per batch.size (and the logic is avoided when all queues are
of equal size). Produce bytes accounting logic is lock-free.
When partitioner.availability.timeout.ms is non-0, RecordAccumulator
keeps stats on "node latency" which is defined as the difference between
the last time the node had a batch waiting to be send and the last time
the node was ready to take a new batch. If this difference exceeds
partitioner.availability.timeout.ms we don't switch to that partition
until the node is ready.
The corresponding unit tests are added / modified.
The perf test results are in the KIP-794.
Committer Checklist (excluded from commit message)