Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -57,7 +58,7 @@ private class StateUpdaterThread extends Thread {
private final ChangelogReader changelogReader;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final Consumer<Set<TopicPartition>> offsetResetter;
private final Map<TaskId, Task> updatingTasks = new HashMap<>();
private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow the main thread reading the map in thread-safe way.

private final Logger log;

public StateUpdaterThread(final String name,
Expand All @@ -72,7 +73,7 @@ public StateUpdaterThread(final String name,
log = logContext.logger(DefaultStateUpdater.class);
}

public Collection<Task> getAllUpdatingTasks() {
public Collection<Task> getUpdatingTasks() {
return updatingTasks.values();
}

Expand Down Expand Up @@ -117,11 +118,13 @@ private void performActionsOnTasks() {
tasksAndActionsLock.lock();
try {
for (final TaskAndAction taskAndAction : getTasksAndActions()) {
final Task task = taskAndAction.task;
final Action action = taskAndAction.action;
final Action action = taskAndAction.getAction();
switch (action) {
case ADD:
addTask(task);
addTask(taskAndAction.getTask());
break;
case REMOVE:
removeTask(taskAndAction.getTaskId());
break;
}
}
Expand Down Expand Up @@ -149,7 +152,7 @@ private void handleRuntimeException(final RuntimeException runtimeException) {
log.error("An unexpected error occurred within the state updater thread: " + runtimeException);
final ExceptionAndTasks exceptionAndTasks = new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException);
updatingTasks.clear();
failedTasks.add(exceptionAndTasks);
exceptionsAndFailedTasks.add(exceptionAndTasks);
isRunning.set(false);
}

Expand All @@ -164,7 +167,7 @@ private void handleTaskCorruptedException(final TaskCorruptedException taskCorru
}
corruptedTasks.add(corruptedTask);
}
failedTasks.add(new ExceptionAndTasks(corruptedTasks, taskCorruptedException));
exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, taskCorruptedException));
}

private void handleStreamsException(final StreamsException streamsException) {
Expand All @@ -175,7 +178,7 @@ private void handleStreamsException(final StreamsException streamsException) {
} else {
exceptionAndTasks = handleStreamsExceptionWithoutTask(streamsException);
}
failedTasks.add(exceptionAndTasks);
exceptionsAndFailedTasks.add(exceptionAndTasks);
}

private ExceptionAndTasks handleStreamsExceptionWithTask(final StreamsException streamsException) {
Expand Down Expand Up @@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {

private void addTask(final Task task) {
if (isStateless(task)) {
log.debug("Stateless active task " + task.id() + " was added to the state updater");
addTaskToRestoredTasks((StreamTask) task);
log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
} else {
updatingTasks.put(task.id(), task);
if (task.isActive()) {
updatingTasks.put(task.id(), task);
log.debug("Stateful active task " + task.id() + " was added to the state updater");
log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater");
changelogReader.enforceRestoreActive();
} else {
updatingTasks.put(task.id(), task);
log.debug("Standby task " + task.id() + " was added to the state updater");
log.debug("Standby task " + task.id() + " was added to the updating tasks of the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
}
}
}

private void removeTask(final TaskId taskId) {
final Task task = updatingTasks.remove(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment: one of the bug fix we want to piggy-back in restoration is to write the new checkpoint when we stop restoring a task otherwise the restoration progress so-far would be lost. Also the restore callback should be triggered as well (KAFKA-10575).

I will try to add this in a separate PR, also a good exercise on how the state updater could interact with the processor state manager.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was also thinking about checkpointing, but was not clear about all the details. Here my thoughts about checkpointing:
We only checkpoint if we are not in EOS mode, because otherwise we would have a checkpoint file when we close dirty. On the other hand, also in EOS the offsets in that checkpoint file should be safe since it was written during restoration and not during a commit.
Is this correct or do I miss something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During processing: yes today we should not write checkpoint file when we commit.
During restoring: we can always write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened.

Also during restoring: when we complete restore or remove task, we should enforce a checkpoint as well (for failing cases though, we should not write a new one).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, my understanding was correct then. Thank you for the clarification!

if (task != null) {
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
removedTasks.add(task);
log.debug((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
} else {
log.debug("Task " + taskId + " was not removed since it is not updating.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: for those tasks that have been restored, or failed, should we still include them into the removed tasks to be returned in the drain function still?

The reason I'm wondering about it is that, the caller of updater.remove would likely expect to eventually see the task show up from the future drain functions (again here one example would be the recycle scenario). If we do not add them there then the caller's logic needs to be a bit complicated as to check the restored / failed set from the updater as well while checking when this task has been removed completely.

The down side of course is that a task can be shown in multiple of such channels, but I feel the caller's logic to "de-dup" such cases would be easier as long as there's a deterministic ordering of checking removed/completed/failed tasks from the updater.

WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that the one way is less complex than the other. In any case users need to keep track of the tasks they want to remove in some way or the other. However, I am open for changes here since it is not trivial how to give feedback users about what happened with the removed tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah fair enough, we can revisit this when we are about to close the loop for sure. Just want to bring this up to your radar earlier than later.

}
}

private boolean isStateless(final Task task) {
return task.changelogPartitions().isEmpty() && task.isActive();
}
Expand Down Expand Up @@ -277,20 +292,6 @@ private void addTaskToRestoredTasks(final StreamTask task) {
}
}

enum Action {
ADD
}

private static class TaskAndAction {
public final Task task;
public final Action action;

public TaskAndAction(final Task task, final Action action) {
this.task = task;
this.action = action;
}
}

Comment on lines -280 to -293
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to a new class to make construction of the object safer.

private final Time time;
private final ChangelogReader changelogReader;
private final Consumer<Set<TopicPartition>> offsetResetter;
Expand All @@ -300,7 +301,8 @@ public TaskAndAction(final Task task, final Action action) {
private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
private final Lock restoredActiveTasksLock = new ReentrantLock();
private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
private final BlockingQueue<ExceptionAndTasks> failedTasks = new LinkedBlockingQueue<>();
private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>();
private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
private CountDownLatch shutdownGate;

private StateUpdaterThread stateUpdaterThread = null;
Expand All @@ -325,7 +327,7 @@ public void add(final Task task) {

tasksAndActionsLock.lock();
try {
tasksAndActions.add(new TaskAndAction(task, Action.ADD));
tasksAndActions.add(TaskAndAction.createAddTask(task));
tasksAndActionsCondition.signalAll();
} finally {
tasksAndActionsLock.unlock();
Expand All @@ -342,11 +344,18 @@ private void verifyStateFor(final Task task) {
}

@Override
public void remove(final Task task) {
public void remove(final TaskId taskId) {
tasksAndActionsLock.lock();
try {
tasksAndActions.add(TaskAndAction.createRemoveTask(taskId));
tasksAndActionsCondition.signalAll();
} finally {
tasksAndActionsLock.unlock();
}
}

@Override
public Set<StreamTask> getRestoredActiveTasks(final Duration timeout) {
public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
final long timeoutMs = timeout.toMillis();
final long startTime = time.milliseconds();
final long deadline = startTime + timeoutMs;
Expand Down Expand Up @@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final Duration timeout) {
}

@Override
public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
public Set<Task> drainRemovedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we add the logic for recycling a task, which is to be done at the task manager still we would need two round-trips: first remove the task as active/standby, then after recycling it add the new task as standby/active. I'm still trying to flesh out the details here, just in case we would need to also have a timeout for removed tasks similar to restored-active-tasks, this data structure may also need to be transformed to a lock+condition+queue manner instead of a blocking-queue.

final List<Task> result = new ArrayList<>();
removedTasks.drainTo(result);
return new HashSet<>(result);
}

@Override
public List<ExceptionAndTasks> drainExceptionsAndFailedTasks() {
final List<ExceptionAndTasks> result = new ArrayList<>();
failedTasks.drainTo(result);
exceptionsAndFailedTasks.drainTo(result);
return result;
}

@Override
public Set<Task> getAllTasks() {
tasksAndActionsLock.lock();
public Set<StandbyTask> getUpdatingStandbyTasks() {
return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks()));
}

public Set<Task> getUpdatingTasks() {
return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingTasks()));
}

public Set<StreamTask> getRestoredActiveTasks() {
Comment on lines +400 to +408
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple of methods get* to improve unit testing. Those methods allow to look into the queues without draining them. They are not part of the StateUpdater interface.

restoredActiveTasksLock.lock();
try {
final Set<Task> allTasks = new HashSet<>();
allTasks.addAll(tasksAndActions.stream()
.filter(t -> t.action == Action.ADD)
.map(t -> t.task)
.collect(Collectors.toList())
);
allTasks.addAll(stateUpdaterThread.getAllUpdatingTasks());
allTasks.addAll(restoredActiveTasks);
return Collections.unmodifiableSet(allTasks);
return Collections.unmodifiableSet(new HashSet<>(restoredActiveTasks));
} finally {
restoredActiveTasksLock.unlock();
tasksAndActionsLock.unlock();
}
}

@Override
public Set<StandbyTask> getStandbyTasks() {
tasksAndActionsLock.lock();
try {
final Set<StandbyTask> standbyTasks = new HashSet<>();
standbyTasks.addAll(tasksAndActions.stream()
.filter(t -> t.action == Action.ADD)
.filter(t -> !t.task.isActive())
.map(t -> (StandbyTask) t.task)
.collect(Collectors.toList())
);
standbyTasks.addAll(getUpdatingStandbyTasks());
return Collections.unmodifiableSet(standbyTasks);
} finally {
tasksAndActionsLock.unlock();
}
public List<ExceptionAndTasks> getExceptionsAndFailedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel some of the getters here would not end up needed in the non-testing code since we have the corresponding drainXX functions that are declared in the interface and would be used in those non-testing code. If that turns out true let's cluster them together and commented they are for testing only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we will use the interface StateUpdater in production code and not its implementation DefaultStateUpdater those getters should not be visible to production code as long as no casting is used.
I am not a fan of the for testing only comment since it does not enforce anything and only pollutes the code.
Moreover I think there are good chances that we will need those getters to expose the tasks to the task manager. But let's see what the future will bring us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Not advocating for the for testing only comment here, maybe just my paranoid gene that would like to see those functions clustered together but not feeling strong either way :)

return Collections.unmodifiableList(new ArrayList<>(exceptionsAndFailedTasks));
}

public Set<StandbyTask> getUpdatingStandbyTasks() {
return Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks()));
public Set<Task> getRemovedTasks() {
return Collections.unmodifiableSet(new HashSet<>(removedTasks));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,101 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.processor.TaskId;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;

public interface StateUpdater {

class ExceptionAndTasks {
public final Set<Task> tasks;
public final RuntimeException exception;
private final Set<Task> tasks;
private final RuntimeException exception;

public ExceptionAndTasks(final Set<Task> tasks, final RuntimeException exception) {
this.tasks = tasks;
this.exception = exception;
this.tasks = Objects.requireNonNull(tasks);
this.exception = Objects.requireNonNull(exception);
}

public Set<Task> tasks() {
return Collections.unmodifiableSet(tasks);
}

public RuntimeException exception() {
return exception;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof ExceptionAndTasks)) return false;
final ExceptionAndTasks that = (ExceptionAndTasks) o;
return tasks.equals(that.tasks) && exception.equals(that.exception);
}

@Override
public int hashCode() {
return Objects.hash(tasks, exception);
}
}

/**
* Adds a task (active or standby) to the state updater.
*
* This method does not block until the task is added to the state updater.
*
* @param task task to add
*/
void add(final Task task);

/**
* Removes a task (active or standby) from the state updater.
* Removes a task (active or standby) from the state updater and adds the removed task to the removed tasks.
*
* This method does not block until the removed task is removed from the state updater.
*
* @param task task ro remove
* The task to be removed is not removed from the restored active tasks and the failed tasks.
* Stateless tasks will never be added to the removed tasks since they are immediately added to the
* restored active tasks.
*
* @param taskId ID of the task to remove
*/
void remove(final Task task);
void remove(final TaskId taskId);

/**
* Gets restored active tasks from state restoration/update
* Drains the restored active tasks from the state updater.
*
* The returned active tasks are removed from the state updater.
*
* @param timeout duration how long the calling thread should wait for restored active tasks
*
* @return set of active tasks with up-to-date states
*/
Set<StreamTask> getRestoredActiveTasks(final Duration timeout);
Set<StreamTask> drainRestoredActiveTasks(final Duration timeout);

/**
* Gets failed tasks and the corresponding exceptions
*
* @return list of failed tasks and the corresponding exceptions
*/
List<ExceptionAndTasks> getFailedTasksAndExceptions();

/**
* Get all tasks (active and standby) that are managed by the state updater.
* Drains the removed tasks (active and standbys) from the state updater.
*
* Removed tasks returned by this method are tasks extraordinarily removed from the state updater. These do not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the word "extraordinarily" :)

Jokes aside, I have a slight different thought about the semantics here, i.e. whether the drained removed and the restored/failed tasks should be exclusive or be overlapping possibly, mainly from how easily the caller could handle the overlapping scenarios. I left an early comment above.

* include restored or failed tasks.
*
* The returned removed tasks are removed from the state updater
*
* @return set of tasks managed by the state updater
* @return set of tasks removed from the state updater
*/
Set<Task> getAllTasks();
Set<Task> drainRemovedTasks();

/**
* Get standby tasks that are managed by the state updater.
* Drains the failed tasks and the corresponding exceptions.
*
* @return set of standby tasks managed by the state updater
* The returned failed tasks are removed from the state updater
*
* @return list of failed tasks and the corresponding exceptions
*/
Set<StandbyTask> getStandbyTasks();
List<ExceptionAndTasks> drainExceptionsAndFailedTasks();

/**
* Shuts down the state updater.
Expand Down
Loading