KAFKA-12738: track processing errors and implement constant-time task backoff#11787
Conversation
f7c4bc5 to
d703436
Compare
There was a problem hiding this comment.
Not sure whether it is or will be cleaner in the long run to have this separate class that now has to keep up with topology additions/removals vs just doing all this bookkeeping inside the TopologyMetadata/InternalTopologyBuilder classes -- but until we can carve out time for a real tech debt cleanup of those classes which are already pretty out of control, I felt it best to pull everything out even if it meant duplicated un/registration of topologies
There was a problem hiding this comment.
Thanks for bringing this up, I think we can come back and clean this up after we've gained confidence and is ready to extend beyond named topology later.
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass; just minor comment otherwise LGTM.
There was a problem hiding this comment.
Should we check under if that the topologyName should always be UNNAMED_TOPOLOGY, or null?
There was a problem hiding this comment.
See this -- we do check to make sure that if named topologies are used, then the topologyName is not null or UNNAMED_TOPOLOGY -- however for apps that do not use named topologies I think it's fair to just return "true" here and not require any assertions about the name, since it's a dummy name in that case anyways.
SG?
There was a problem hiding this comment.
I think it's a bit overkill to first organize all tasks into activeTasksByTopology also may have unexpected scheduling bias compared to what we did today as more or less random-roundrobin. What about just checking for each task, if canProcess(taskName) && canProcessTopology(task.topologyName())?
There was a problem hiding this comment.
Guess this was a case of premature optimization -- I'll update with your suggestion for this PR and work out a better solution that doesn't skew processing for the later PR where this matters (for some context, I did this in part because in one of the followups we will back off entire named topologies when one task is failing recurringly , to avoid getting out of sync, in which case it seemed wasteful to check each task in the topology if we already know it's not ready to process.
But we can revisit this when we get to that PR 🙂
There was a problem hiding this comment.
Thanks for bringing this up, I think we can come back and clean this up after we've gained confidence and is ready to extend beyond named topology later.
9466b32 to
efaad14
Compare
efaad14 to
3d90058
Compare
513d067 to
36560d2
Compare
| } | ||
|
|
||
| @Test | ||
| public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception { |
There was a problem hiding this comment.
Would like to add a test for the case of tasks in different named topologies, but perhaps in a followup PR to unblock this one
There was a problem hiding this comment.
A meta question: do we really need an integration test to bring up the full stack for this test coverage? I'd feel a unit test with mock time just on the task executor would be sufficient?
There was a problem hiding this comment.
I guess the reason I felt an integration test would be good was that there's some subtlety in how we may poll multiple times and also in how the thread replacement interplays with the backoff
Now that the integration test only takes .5s, WDYT about leaving it as an integration test, but moving it to the NamedTopologyIntergrationTest class since what does take a long time is bringing up the CLUSTER for each integration test?
There was a problem hiding this comment.
Sounds good to me --- indeed the setup time to bring a CLUSTER is the first concern I had, and the second being vulnerable to system time flakiness than a mock time. Since it seems this test is less exposed for the latter concern moving it into an existing test is sufficient.
| public void registerTaskError(final Task task, final Throwable t, final long now) { | ||
| if (hasNamedTopologies) { | ||
| final String topologyName = task.id().topologyName(); | ||
| topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName)) |
There was a problem hiding this comment.
Changed it so that we only track topologies in the map here if they have an active backoff/task in error, rather than registering and unregistering named topologies and trying to keep this in sync between the TopologyMetadata and the individual StreamThreads' view (which was starting to look pretty ugly)
Instead we just pop the topology's metadata into the map when one of its tasks hits a new error, and clear it if/when all tasks are healthy again
guozhangwang
left a comment
There was a problem hiding this comment.
Made a second pass.
My major comment is around integration test -- I think we are a bit overkill in adding integration tests for cases like this which not only would increase test time, but also vulnerable to flakiness. A unit test with mock time on task executor looks pretty sufficient to me. All others are minor.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
Show resolved
Hide resolved
| final Long errorTime = tasksToErrorTime.get(task.id()); | ||
| if (errorTime == null) { | ||
| return true; | ||
| } else if (now - errorTime > 15000L) { |
There was a problem hiding this comment.
Curious why the magic number of 15s?
There was a problem hiding this comment.
Because it was actually taking the thread 10s to come back up (in the integration test where we overrode session.timeout to 10s) before we had #11801
Now with that fix it takes .5 - 4s for the thread to be replaced, so there's no particular reason to have it be 15s. I think it makes sense to lower it to maybe 5s for now, and then when we have the true exponential backoff obviously it can start lower and grow from there.
| throw e; | ||
| } catch (final StreamsException e) { | ||
| log.error("Failed to process stream task {} due to the following error:", task.id(), e); | ||
| log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e); |
There was a problem hiding this comment.
Sorry to see log4j still have not figured out the way for both string param and exception in presentation..
| } | ||
|
|
||
| @Test | ||
| public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception { |
There was a problem hiding this comment.
A meta question: do we really need an integration test to bring up the full stack for this test coverage? I'd feel a unit test with mock time just on the task executor would be sufficient?
wcarlson5
left a comment
There was a problem hiding this comment.
LGTM, not sure why you moved the Int test to a separate file but I don't have anything real against it.
|
Test failures are unrelated (filed https://issues.apache.org/jira/browse/KAFKA-13690) Merging to trunk, but will do an immediate followup PR to:
|
…lacement time (#11801) Quick followup to #11787 to optimize the impact of the task backoff by reducing the time to replace a thread. When a thread is going through a dirty close, ie shutting down from an uncaught exception, we should be sending a LeaveGroup request to make sure the broker acknowledges the thread has died and won't wait up to the `session.timeout` for it to join the group if the user opts to `REPLACE_THREAD` in the handler Reviewers: Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>
* apache-kafka/trunk: (49 commits) KAFKA-12738: send LeaveGroup request when thread dies to optimize replacement time (apache#11801) MINOR: Skip fsync on parent directory to start Kafka on ZOS (apache#11793) KAFKA-12738: track processing errors and implement constant-time task backoff (apache#11787) MINOR: Cleanup admin creation logic in integration tests (apache#11790) KAFKA-10199: Add interface for state updater (apache#11499) KAFKA-10000: Utils methods for overriding user-supplied properties and dealing with Enum types (apache#11774) KAFKA-10000: Add new metrics for source task transactions (apache#11772) KAFKA-13676: Commit successfully processed tasks on error (apache#11791) KAFKA-13511: Add support for different unix precisions in TimestampConverter SMT (apache#11575) MINOR: Improve Connect docs (apache#11642) ...
… of PR #11787 (#11812) This PR addresses the remaining nits from the final review of #11787 It also deletes two integration test classes which had only one test in them, and moves the tests to another test class file to save on the time to bring up an entire embedded kafka cluster just for a single run Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Part 1 in the initial series of error handling for named topologies*:
Part 1: Track tasks with errors within a named topology & implement constant-time based task backoff
Part 2: Implement exponential task backoff to account for recurring errors
Part 3: Pause/backoff all tasks within a named topology in case of a long backoff/frequent errors for any individual task
*note: not sure whether some or all of this would require a KIP, so I'm limiting the error handling work to just named topologies for now