KAFKA-14020: Performance regression in Producer#12365
Conversation
As part of KAFKA-10888 work a call to time.milliseconds() got moved under the queue lock, moving it back outside the lock. The call may be expensive and cause lock contention.
|
@artemlivshits : Thanks for the PR. Does this resolve all the perf regression reported in #12342 ? |
|
Haven't checked the streams benchmark yet. But it is a regression that is visible in the lock profile, so from that perspective seems to be a net positive. |
| // Update the current time in case the buffer allocation blocked above. | ||
| // NOTE: getting time may be expensive, so calling it under a lock | ||
| // should be avoided. | ||
| nowMs = time.milliseconds(); |
There was a problem hiding this comment.
This change has some other side-effects to the code logic:
-
In case of retries (multiple iteration of while loop) when buffer allocation blocks, prior to this change
tryAppendon line 283 was being called with oldernowMs. After this change it's a more recent time. This is a positive change. -
In case of retries, when buffer allocation does not occur, prior to this change the time was computed inside
appendNewBatchhence, was guaranteed to the latest. After this change, there might be threads blocked onsynchronizedor the time consumed by previous retry isn't factored in thenowMsbeing passed to appendNewBatch. Hence,nowMsbeing passed toappendNewBatchmight be stale by some amount (depending on how long threads were waiting to acquire the block). Is that acceptable?
There was a problem hiding this comment.
@divijvaidya : For 2, before KAFKA-10888, nowMs is also computed before synchronized. So, it has the same behavior as this PR.
Looking at the code, I am not sure if nowMs is strictly needed. nowMs is used to populate ProducerBatch.lastAppendTime. However, since KAFKA-5886, expiration is based on createTime and not on lastAppendTime. lastAppendTime is only used to upper bound lastAttemptMs. This may not be needed. @hachikuji : Could we just get right of ProducerBatch.lastAppendTime?
There was a problem hiding this comment.
Did we reach a conclusion regarding this?
There was a problem hiding this comment.
I synced up with Jun offline, in this change it makes sense to preserve the current behavior (too close to release).
There was a problem hiding this comment.
Do we want to file a JIRA for changing this for the next release?
As part of KAFKA-10888 work the reference to ProducerRecord was held in the batch completion callback, so it was kept alive as long as the batch was alive, which may increase the amount of memory in certain scenario and cause excessive GC work. Now the reference is reset early, so the ProducerRecord lifetime isn't bound to the batch lifetime.
|
Hey @artemlivshits , I just re-ran the Streams benchmarks that originally found the regression. It looks like it's resolved, as of your latest commit! As a reminder, this was the baseline for "good" performance: And when I ran the same benchmark on 3a6500b , I got: |
junrao
left a comment
There was a problem hiding this comment.
@vvcephei : Thanks for the update.
@artemlivshits : Thanks for the updated PR. Just one minor comment.
| private final Callback userCallback; | ||
| private final ProducerInterceptors<K, V> interceptors; | ||
| private final ProducerRecord<K, V> record; | ||
| private ProducerRecord<K, V> record; |
There was a problem hiding this comment.
Should we make record volatile since it's being updated and read by different threads?
There was a problem hiding this comment.
Will do. And partition then should be volatile as well, then.
Fix test failures.
| this.record = record; | ||
| // Note a record would be null only if the client application has a bug, but we don't want to | ||
| // have NPE here, because the interceptors would not be notified (see .doSend). | ||
| topic = record != null ? record.topic() : null; |
There was a problem hiding this comment.
Can you elaborate on this? What kind of application bug would surface itself in a silent way like this?
There was a problem hiding this comment.
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1041 has a test, which effectively codifies the contract. I would agree that it's weird to have contract about null handling, but at this point I'd rather preserve whatever behavior is codified.
There was a problem hiding this comment.
I don't think this answered my question. What application bug would result in this?
There was a problem hiding this comment.
Would passing null record not be a bug? I've changed the comment to not mention that it would be a bug.
There was a problem hiding this comment.
If a user passes in a null record in send(), we will be throwing a NullPointException somewhere. So, we probably could just throw an exception early in that case without going through the callback and fix the test accordingly. We probably could do that in a separate PR in trunk.
There was a problem hiding this comment.
I looked at the test and it seems to check that an exception is thrown? As @junrao said, this can be done by validating what send receives instead of polluting the whole codebase. I'm OK if we file a JIRA for that and do it as a separate PR. But we should remove this code when we do that.
There was a problem hiding this comment.
It checks that the exception is thrown and then it checks that interceptors are called. Probably the test is just sloppy and could use a different error condition. KAFKA-14086
| ? ProducerInterceptors.extractTopicPartition(record) | ||
| : new TopicPartition(record.topic(), partition); | ||
| if (partition != RecordMetadata.UNKNOWN_PARTITION) | ||
| return new TopicPartition(topic, partition); |
There was a problem hiding this comment.
It's a bit surprising to allocate every time a method like this is called. Can we not allocate the topic partition once and reuse it?
There was a problem hiding this comment.
The topicPartition is called once in success case (maybe twice in error case). I'll add a comment.
There was a problem hiding this comment.
Even so, the way this method is used can change over time. And then you end up with a lot of unexpected allocation. The way I suggested is more robust.
| } | ||
|
|
||
| // Reset record to null here so that it doesn't have to be alive as long as the batch is. | ||
| record = null; |
There was a problem hiding this comment.
It's a bit surprising that a method called setPartition resets the record. Maybe we can make the method name clearer. It would also be useful for the comment to state why we no longer need the record after this.
There was a problem hiding this comment.
The method overrides a callback that is called setPartition to reflect what the caller does with it (it sets partition). I agree that it's a little non-intuitive to do a state transition here but there doesn't seem to be a better place to do it if we want to preserve the behavior -- we need record until here to do the tracing and we cannot do tracing earlier because we may not know the partition; at the same time, we don't want to keep it longer.
There was a problem hiding this comment.
We can add another callback method since this is an internal interface, right? This kind of thing leads to a lot of maintenance pain down the line.
There was a problem hiding this comment.
I think it would be non-intuitive to control record lifetime from the RecordAccumulator.append (that calls the callback) -- here we know that we don't need the record once partition is set, but the RecordAccumulator.append doesn't know it (in fact, it doesn't even know what we have the record). But I can add change it if you think this would make it easier to understand.
There was a problem hiding this comment.
Thanks for the explanation. Is there a possibility the trace could be done in the caller context? Or is that missing some of the required information?
There was a problem hiding this comment.
It's missing the partition info. Previously, partition was calculated before doing RecordAccumulator.append so we could do tracing in the doSend, but now the partition may be calculated at the beginning of RecordAccumulator.append, so tracing needs to happen after it's known, but before the actual append proceeds.
There was a problem hiding this comment.
I find that the code complexity to achieve this trace logging is a bit high. I have some ideas on how to improve it, but we can leave that for latter. A simple suggestion for now would be to change setPartition to onPartitionAssigned or something like that. This would indicate a general callback that can do anything once the partition is known.
There was a problem hiding this comment.
Since the only info we need from record is record.partition(), could we keep record.partition() in the instance instead of the whole record? Since record.partition() is much smaller, maybe there is no need to nullify it in setPartition()?
There was a problem hiding this comment.
Updated to extract all record info in the constructor.
| @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { | |||
|
|
|||
| if (log.isTraceEnabled()) { | |||
There was a problem hiding this comment.
A couple of lines above we use a language level assert. In Kafka, we typically use assert like methods like the Objects class since the language level asserts are disabled by default.
There was a problem hiding this comment.
The intention of an assert is to run in tests, but be disabled in prod, so if my understanding is correct, this is the proper usage.
There was a problem hiding this comment.
That's not considered best practice in Java and we don't typically do it in Kafka. What would be the reason to run it in tests only?
There was a problem hiding this comment.
All my previous life I was using asserts extensively in C/C++, they provide both validation and contract documentation. They do redundant validation in builds that are used in system tests without adding perf cost in prod. I can remove it, if it's not compatible with style, though I don't think this is just style -- using asserts makes a material difference in early bug detection and in code comprehension.
There was a problem hiding this comment.
It seems we have been mostly using Objects.requireNonNull for null assertion in our code. It doesn't seem to add too much overhead and helps identify issues in production early on. For consistency, perhaps we could use Objects.requireNonNull instead of assert.
@ijuma : What do you recommend that we use for assertions like assert partitionInfo == stickyPartitionInfo.get()?
There was a problem hiding this comment.
The point is that you can run these checks in prod without measurable cost. Then why limit it to tests?
There was a problem hiding this comment.
Correct @junrao, Objects.requireNonNull would be the recommended way to assert non null. The reference equality check is less common, we could add our own utility method in Utils for that or inline it.
The main thing is to get the appropriate signal if this happens in prod when the cost is low (both examples would be in that category).
There was a problem hiding this comment.
Ok, so what's the suggestion in this change? Should I leave the code as is or remove assert? Creating a new utility seems to be out of scope for this change. We could have an offline discussion about asserts, I would be happy to see them used more often in Kafka.
There was a problem hiding this comment.
For the line in this method, you could do something like:
if (partition < 0)
throw new IllegalArgumentException("partition should be positive, but it was " + partition):Which is more informative and idiomatic and checks the more general case that we expect partitions to be positive. But I see that we have sprinkled the same check in other methods. So, having a assertPartitionIsPositive would probably be a better approach. In any case, since this code was introduced in a different change, we can file a JIRA and do it as a separate PR.
I am happy to discuss more, but we should be clear about terminology. Language level asserts in Java aren't used much. Checking preconditions through API boundaries is useful. Within a given boundary, it's best to use the type system to avoid having noise all over the code.
| // Update the current time in case the buffer allocation blocked above. | ||
| // NOTE: getting time may be expensive, so calling it under a lock | ||
| // should be avoided. | ||
| nowMs = time.milliseconds(); |
There was a problem hiding this comment.
Did we reach a conclusion regarding this?
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR. A few comments below.
Address review comments.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. LGTM. Waiting for the tests to pass.
|
@jsancio : We plan to cherry-pick this to 3.3 branch since this fixes a performance issue in KAFKA-10888. |
Address review comments.
artemlivshits
left a comment
There was a problem hiding this comment.
Addressed @ijuma comments.
| this.record = record; | ||
| // Note a record would be null only if the client application has a bug, but we don't want to | ||
| // have NPE here, because the interceptors would not be notified (see .doSend). | ||
| topic = record != null ? record.topic() : null; |
There was a problem hiding this comment.
Would passing null record not be a bug? I've changed the comment to not mention that it would be a bug.
| @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { | |||
|
|
|||
| if (log.isTraceEnabled()) { | |||
There was a problem hiding this comment.
All my previous life I was using asserts extensively in C/C++, they provide both validation and contract documentation. They do redundant validation in builds that are used in system tests without adding perf cost in prod. I can remove it, if it's not compatible with style, though I don't think this is just style -- using asserts makes a material difference in early bug detection and in code comprehension.
| } | ||
|
|
||
| // Reset record to null here so that it doesn't have to be alive as long as the batch is. | ||
| record = null; |
There was a problem hiding this comment.
I think it would be non-intuitive to control record lifetime from the RecordAccumulator.append (that calls the callback) -- here we know that we don't need the record once partition is set, but the RecordAccumulator.append doesn't know it (in fact, it doesn't even know what we have the record). But I can add change it if you think this would make it easier to understand.
| ? ProducerInterceptors.extractTopicPartition(record) | ||
| : new TopicPartition(record.topic(), partition); | ||
| if (partition != RecordMetadata.UNKNOWN_PARTITION) | ||
| return new TopicPartition(topic, partition); |
| // Update the current time in case the buffer allocation blocked above. | ||
| // NOTE: getting time may be expensive, so calling it under a lock | ||
| // should be avoided. | ||
| nowMs = time.milliseconds(); |
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. A couple of more comments.
| this.record = record; | ||
| // Note a record would be null only if the client application has a bug, but we don't want to | ||
| // have NPE here, because the interceptors would not be notified (see .doSend). | ||
| topic = record != null ? record.topic() : null; |
There was a problem hiding this comment.
If a user passes in a null record in send(), we will be throwing a NullPointException somewhere. So, we probably could just throw an exception early in that case without going through the callback and fix the test accordingly. We probably could do that in a separate PR in trunk.
| @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { | |||
|
|
|||
| if (log.isTraceEnabled()) { | |||
There was a problem hiding this comment.
It seems we have been mostly using Objects.requireNonNull for null assertion in our code. It doesn't seem to add too much overhead and helps identify issues in production early on. For consistency, perhaps we could use Objects.requireNonNull instead of assert.
@ijuma : What do you recommend that we use for assertions like assert partitionInfo == stickyPartitionInfo.get()?
Address review comments.
| this.record = record; | ||
| // Note a record would be null only if the client application has a bug, but we don't want to | ||
| // have NPE here, because the interceptors would not be notified (see .doSend). | ||
| topic = record != null ? record.topic() : null; |
There was a problem hiding this comment.
It checks that the exception is thrown and then it checks that interceptors are called. Probably the test is just sloppy and could use a different error condition. KAFKA-14086
| @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { | |||
|
|
|||
| if (log.isTraceEnabled()) { | |||
| } | ||
|
|
||
| // Reset record to null here so that it doesn't have to be alive as long as the batch is. | ||
| record = null; |
There was a problem hiding this comment.
Updated to extract all record info in the constructor.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. Just a minor comment.
| this.interceptors = interceptors; | ||
| this.record = record; | ||
| // Extract record info as we don't want to keep a reference to the record during | ||
| // whole lifetime of the batch. |
There was a problem hiding this comment.
Could we move these two lines to the immediate line before where we set recordPartition?
There was a problem hiding this comment.
I think it applies to all 3 fields: topic, recordPartition and recordLogString - we extract all this info from the record, so the comment is before we do that (in the PR it's kind of hard to see because of the inline discussion). Let me know if you think otherwise.
There was a problem hiding this comment.
Thanks for the explanation. Sounds good.
Sounds good @junrao . I set the fix version for KAFKA-14020 to 3.3.0. |
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the explanation. LGTM. Waiting for the tests to pass.
| this.interceptors = interceptors; | ||
| this.record = record; | ||
| // Extract record info as we don't want to keep a reference to the record during | ||
| // whole lifetime of the batch. |
There was a problem hiding this comment.
Thanks for the explanation. Sounds good.
|
@artemlivshits : Are the test failures related to the PR? @ijuma : Do you have any other comments? |
Fix test break.
Yes, just pushed the fix. |
|
Looked at the failed tests, seem unrelated and pass locally. |
|
@junrao the updated version looks good to me. Thanks @artemlivshits for the patience and iterations. |
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the latest PR. LGTM
As part of KAFKA-10888 work, there were a couple regressions introduced: A call to time.milliseconds() got moved under the queue lock, moving it back outside the lock. The call may be expensive and cause lock contention. Now the call is moved back outside of the lock. The reference to ProducerRecord was held in the batch completion callback, so it was kept alive as long as the batch was alive, which may increase the amount of memory in certain scenario and cause excessive GC work. Now the reference is reset early, so the ProducerRecord lifetime isn't bound to the batch lifetime. Tested via manually crafted benchmark, lock profile shows ~15% lock contention on the ArrayQueue lock without the fix and ~5% lock contention with the fix (which is also consistent with pre-KAFKA-10888 profile). Alloc profile shows ~10% spent in ProducerBatch.completeFutureAndFireCallbacks without the fix vs. ~0.25% with the fix (which is also consistent with pre-KAFKA-10888 profile). Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
|
cherry-picked the PR to 3.3. |
|
Hello Artem @artemlivshits, I was studying the PR/ Happy to make a PR in case you see it reasonable. Regards, Eugene |
|
Hi @etolbakov, making this method private sounds reasonable to me. Thank you for suggestion. |
As part of KAFKA-10888 work, there were a couple regressions introduced:
A call to time.milliseconds() got moved under the queue lock, moving it back outside the lock. The call may be expensive and cause lock contention. Now the call is moved back outside of the lock.
The reference to ProducerRecord was held in the batch completion callback, so it was kept alive as long as the batch was alive, which may increase the amount of memory in certain scenario and cause excessive GC work. Now the reference is reset early, so the ProducerRecord lifetime isn't bound to the batch lifetime.
Tested via manually crafted benchmark, lock profile shows ~15% lock contention on the ArrayQueue lock without the fix and ~5% lock contention with the fix (which is also consistent with pre-KAFKA-10888 profile).
Alloc profile shows ~10% spent in ProducerBatch.completeFutureAndFireCallbacks without the fix vs. ~0.25% with the fix (which is also consistent with pre-KAFKA-10888 profile).
Will add a proper jmh benchmark for producer (looks like we don't have one) in a follow-up change.
Committer Checklist (excluded from commit message)