-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-10199: Implement removing active and standby tasks from the state updater #12270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Implement removing active and standby tasks from the state updater #12270
Conversation
…te updater This PR adds removing of active and standby tasks from the default implementation of the state updater. The PR also includes refactorings to clean up the code.
1873054 to
04fd567
Compare
| private final AtomicBoolean isRunning = new AtomicBoolean(true); | ||
| private final Consumer<Set<TopicPartition>> offsetResetter; | ||
| private final Map<TaskId, Task> updatingTasks = new HashMap<>(); | ||
| private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To allow the main thread reading the map in thread-safe way.
| enum Action { | ||
| ADD | ||
| } | ||
|
|
||
| private static class TaskAndAction { | ||
| public final Task task; | ||
| public final Action action; | ||
|
|
||
| public TaskAndAction(final Task task, final Action action) { | ||
| this.task = task; | ||
| this.action = action; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to a new class to make construction of the object safer.
| public Set<StandbyTask> getUpdatingStandbyTasks() { | ||
| return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks())); | ||
| } | ||
|
|
||
| public Set<Task> getUpdatingTasks() { | ||
| return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingTasks())); | ||
| } | ||
|
|
||
| public Set<StreamTask> getRestoredActiveTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a couple of methods get* to improve unit testing. Those methods allow to look into the queues without draining them. They are not part of the StateUpdater interface.
| final List<ExceptionAndTasks> failedTasks = getFailedTasks(1); | ||
| assertEquals(1, failedTasks.size()); | ||
| final ExceptionAndTasks actualFailedTasks = failedTasks.get(0); | ||
| assertEquals(2, actualFailedTasks.tasks.size()); | ||
| assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1, task2))); | ||
| assertTrue(actualFailedTasks.exception instanceof StreamsException); | ||
| final StreamsException actualException = (StreamsException) actualFailedTasks.exception; | ||
| assertFalse(actualException.taskId().isPresent()); | ||
| assertEquals(expectedMessage, actualException.getMessage()); | ||
| assertTrue(stateUpdater.getAllTasks().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with a call to containsAll() in verifyExceptionsAndFailedTasks(). There reference equality is verified for exception and task which I think is fine in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, agreed.
| verifyRemovedTasks(task); | ||
| verifyRestoredActiveTasks(); | ||
| verifyUpdatingTasks(); | ||
| verifyExceptionsAndFailedTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, I verify that tasks are in the correct location.
| } | ||
|
|
||
| private void removeTask(final TaskId taskId) { | ||
| final Task task = updatingTasks.remove(taskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a comment: one of the bug fix we want to piggy-back in restoration is to write the new checkpoint when we stop restoring a task otherwise the restoration progress so-far would be lost. Also the restore callback should be triggered as well (KAFKA-10575).
I will try to add this in a separate PR, also a good exercise on how the state updater could interact with the processor state manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was also thinking about checkpointing, but was not clear about all the details. Here my thoughts about checkpointing:
We only checkpoint if we are not in EOS mode, because otherwise we would have a checkpoint file when we close dirty. On the other hand, also in EOS the offsets in that checkpoint file should be safe since it was written during restoration and not during a commit.
Is this correct or do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During processing: yes today we should not write checkpoint file when we commit.
During restoring: we can always write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened.
Also during restoring: when we complete restore or remove task, we should enforce a checkpoint as well (for failing cases though, we should not write a new one).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, my understanding was correct then. Thank you for the clarification!
|
|
||
| @Override | ||
| public List<ExceptionAndTasks> getFailedTasksAndExceptions() { | ||
| public Set<Task> drainRemovedTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we add the logic for recycling a task, which is to be done at the task manager still we would need two round-trips: first remove the task as active/standby, then after recycling it add the new task as standby/active. I'm still trying to flesh out the details here, just in case we would need to also have a timeout for removed tasks similar to restored-active-tasks, this data structure may also need to be transformed to a lock+condition+queue manner instead of a blocking-queue.
| } finally { | ||
| tasksAndActionsLock.unlock(); | ||
| } | ||
| public List<ExceptionAndTasks> getExceptionsAndFailedTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel some of the getters here would not end up needed in the non-testing code since we have the corresponding drainXX functions that are declared in the interface and would be used in those non-testing code. If that turns out true let's cluster them together and commented they are for testing only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we will use the interface StateUpdater in production code and not its implementation DefaultStateUpdater those getters should not be visible to production code as long as no casting is used.
I am not a fan of the for testing only comment since it does not enforce anything and only pollutes the code.
Moreover I think there are good chances that we will need those getters to expose the tasks to the task manager. But let's see what the future will bring us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Not advocating for the for testing only comment here, maybe just my paranoid gene that would like to see those functions clustered together but not feeling strong either way :)
| log.debug((task.isActive() ? "Active" : "Standby") | ||
| + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks."); | ||
| } else { | ||
| log.debug("Task " + taskId + " was not removed since it is not updating."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a meta comment: for those tasks that have been restored, or failed, should we still include them into the removed tasks to be returned in the drain function still?
The reason I'm wondering about it is that, the caller of updater.remove would likely expect to eventually see the task show up from the future drain functions (again here one example would be the recycle scenario). If we do not add them there then the caller's logic needs to be a bit complicated as to check the restored / failed set from the updater as well while checking when this task has been removed completely.
The down side of course is that a task can be shown in multiple of such channels, but I feel the caller's logic to "de-dup" such cases would be easier as long as there's a deterministic ordering of checking removed/completed/failed tasks from the updater.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that the one way is less complex than the other. In any case users need to keep track of the tasks they want to remove in some way or the other. However, I am open for changes here since it is not trivial how to give feedback users about what happened with the removed tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah fair enough, we can revisit this when we are about to close the loop for sure. Just want to bring this up to your radar earlier than later.
| * Get all tasks (active and standby) that are managed by the state updater. | ||
| * Drains the removed tasks (active and standbys) from the state updater. | ||
| * | ||
| * Removed tasks returned by this method are tasks extraordinarily removed from the state updater. These do not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the word "extraordinarily" :)
Jokes aside, I have a slight different thought about the semantics here, i.e. whether the drained removed and the restored/failed tasks should be exclusive or be overlapping possibly, mainly from how easily the caller could handle the overlapping scenarios. I left an early comment above.
| final List<ExceptionAndTasks> failedTasks = getFailedTasks(1); | ||
| assertEquals(1, failedTasks.size()); | ||
| final ExceptionAndTasks actualFailedTasks = failedTasks.get(0); | ||
| assertEquals(2, actualFailedTasks.tasks.size()); | ||
| assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1, task2))); | ||
| assertTrue(actualFailedTasks.exception instanceof StreamsException); | ||
| final StreamsException actualException = (StreamsException) actualFailedTasks.exception; | ||
| assertFalse(actualException.taskId().isPresent()); | ||
| assertEquals(expectedMessage, actualException.getMessage()); | ||
| assertTrue(stateUpdater.getAllTasks().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, agreed.
|
Merged to trunk, thanks @cadonna ! Please feel free to discuss the comments in follow-up PRs. |
…-2022 * apache/trunk: (52 commits) KAFKA-13967: Document guarantees for producer callbacks on transaction commit (apache#12264) [KAFKA-13848] Clients remain connected after SASL re-authentication f… (apache#12179) KAFKA-10000: Zombie fencing logic (apache#11779) KAFKA-13947: Use %d formatting for integers rather than %s (apache#12267) KAFKA-13929: Replace legacy File.createNewFile() with NIO.2 Files.createFile() (apache#12197) KAFKA-13780: Generate OpenAPI file for Connect REST API (apache#12067) KAFKA-13917: Avoid calling lookupCoordinator() in tight loop (apache#12180) KAFKA-10199: Implement removing active and standby tasks from the state updater (apache#12270) MINOR: Update Scala to 2.13.8 in gradle.properties (apache#12273) MINOR: add java 8/scala 2.12 deprecation info in doc (apache#12261) ... Conflicts: gradle.properties
This PR adds removing of active and standby tasks from the default implementation of the state updater. The PR also includes refactorings that clean up the code.
Committer Checklist (excluded from commit message)