KAFKA-13676: Commit successfully processed tasks on error#11791
KAFKA-13676: Commit successfully processed tasks on error#11791ableegoldman merged 7 commits intoapache:trunkfrom wcarlson5:commit_on_error
Conversation
|
@guozhangwang @ableegoldman This is ready for a pass when you have a sec |
guozhangwang
left a comment
There was a problem hiding this comment.
I left some detailed comments in the PR.
One meta question though is that, committing offsets is not a free operation as it incurs one round-trip from the brokers, and we are trying to batch all tasks in a single call.
Say if we have 100 tasks, and the 2nd task in the list keep throwing an error while the 1st task could be processed successfully, we would end up keep consuming the 1st task only and leaving the rest 98 tasks untouched which hurts the performance a lot.
With that in mind, I think we need to consider this PR along with e.g. sorting the tasks to put the error-ed ones to the back of the list. So I'd suggest we only merge this PR after the other work is done. WDYT?
streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
Outdated
Show resolved
Hide resolved
| StringSerializer.class, | ||
| new Properties()), | ||
| 0L); | ||
| IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( |
There was a problem hiding this comment.
Should we also check that outputTopic never sees the record 1 -> A since that task kept throwing exception?
There was a problem hiding this comment.
I will count using peek after the exception
| totalProcessed += processTask(task, maxNumRecords, time); | ||
| } | ||
| } catch (final Exception e) { | ||
| tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed); |
There was a problem hiding this comment.
If an exception throws here, it means we should have not reached line 93 below and hence this task should not have been added to the set, and hence we do not need to ever call removeTaskFromCuccessfullyProcessedBeforeClosing right?
There was a problem hiding this comment.
Only if we hit the error in the first set of processing. We could have already processed it once this iteration and are coming back todo it again. I ran into this issue while testing and I m pretty sure that is why it is coming up again.
| } | ||
|
|
||
| private void commitSuccessfullyProcessedTasks() { | ||
| if (processingMode == AT_LEAST_ONCE && !tasks.successfullyProcessed().isEmpty()) { |
There was a problem hiding this comment.
The check on the processingMode happens in two places, which seems redundant to me: e.g. if it is not ALOS, then the successfullyProcessed() should always be empty here. I think we can simply this line to only consider the second condition.
There was a problem hiding this comment.
sounds good. I have no issues removing this check
| ), | ||
| outputTopic2, | ||
| Arrays.asList( | ||
| new KeyValue<>(1, "A"), |
There was a problem hiding this comment.
Also I think by just checking that 1->A and 1->B are there we do not guarantee there's no duplicates due to re-processing right? I think we should check that the offset on the input topic can be committed and also there's no duplicates in the output.
There was a problem hiding this comment.
Yeah good idea. I will check to make sure it doesn't get processed more than once.
| task.clearTaskTimeout(); | ||
| processed++; | ||
| } | ||
| if (processingMode == AT_LEAST_ONCE) { |
There was a problem hiding this comment.
Can be AT_LEAST_ONCE or EXACTLY_ONCE_V1
There was a problem hiding this comment.
How about !EXACTLY_ONCE_V2?
wcarlson5
left a comment
There was a problem hiding this comment.
@guozhangwang I understand the issue with performance and we wouldn't want to leave it that way for sure. But seeing that this is a strict improvement over the current behavior, as right now it won't make any progress in the case you describe, I don't think we need to hold off merging this. Also I think it should make writing tests for the backing off and reordering PR easier so it might make sense to merge it.
| totalProcessed += processTask(task, maxNumRecords, time); | ||
| } | ||
| } catch (final Exception e) { | ||
| tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed); |
There was a problem hiding this comment.
Only if we hit the error in the first set of processing. We could have already processed it once this iteration and are coming back todo it again. I ran into this issue while testing and I m pretty sure that is why it is coming up again.
| } | ||
|
|
||
| private void commitSuccessfullyProcessedTasks() { | ||
| if (processingMode == AT_LEAST_ONCE && !tasks.successfullyProcessed().isEmpty()) { |
There was a problem hiding this comment.
sounds good. I have no issues removing this check
| task.clearTaskTimeout(); | ||
| processed++; | ||
| } | ||
| if (processingMode == AT_LEAST_ONCE) { |
There was a problem hiding this comment.
How about !EXACTLY_ONCE_V2?
| StringSerializer.class, | ||
| new Properties()), | ||
| 0L); | ||
| IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( |
There was a problem hiding this comment.
I will count using peek after the exception
| ), | ||
| outputTopic2, | ||
| Arrays.asList( | ||
| new KeyValue<>(1, "A"), |
There was a problem hiding this comment.
Yeah good idea. I will check to make sure it doesn't get processed more than once.
| if (!tasks.successfullyProcessed().isEmpty()) { | ||
| log.info("Streams encountered an error when processing tasks." + | ||
| " Will commit all previously successfully processed tasks {}", | ||
| tasks.successfullyProcessed().toString()); |
There was a problem hiding this comment.
For future reference, you can skip the toString() -- the logger should make the conversion implicitly
ableegoldman
left a comment
There was a problem hiding this comment.
Nice, LGTM. Will merge once the build passes
To make sure that we do not introduce side effects beyond the named topology, what about we narrow the scope of this optimization to only named topology for now? Besides this, sounds reasonable to me. |
* apache-kafka/trunk: (49 commits) KAFKA-12738: send LeaveGroup request when thread dies to optimize replacement time (apache#11801) MINOR: Skip fsync on parent directory to start Kafka on ZOS (apache#11793) KAFKA-12738: track processing errors and implement constant-time task backoff (apache#11787) MINOR: Cleanup admin creation logic in integration tests (apache#11790) KAFKA-10199: Add interface for state updater (apache#11499) KAFKA-10000: Utils methods for overriding user-supplied properties and dealing with Enum types (apache#11774) KAFKA-10000: Add new metrics for source task transactions (apache#11772) KAFKA-13676: Commit successfully processed tasks on error (apache#11791) KAFKA-13511: Add support for different unix precisions in TimestampConverter SMT (apache#11575) MINOR: Improve Connect docs (apache#11642) ...
When we hit an exception when processing tasks we should save the work we have done so far.
This will only be relevant with ALOS and EOS-v1, not EOS-v2. It will actually reduce the number of duplicated record in ALOS because we will not be successfully processing tasks successfully more than once in many cases.
This is currently enabled only for named topologies.
The behavior was rather throughly tested. There is a
EmitOnChangeIntegrationTestthat makes sure if there is an exception in a task, after replacing the thread and if the exception does not throw again the record would still be processed. I just added another task into the test such that even if the other task keep throwing exceptions and hence never complete processing its record, this new task that could process successfully would be able to make progress and commit its offset.Committer Checklist (excluded from commit message)