Skip to content

Conversation

@cadonna
Copy link
Member

@cadonna cadonna commented Nov 15, 2021

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

*
* @param task task
*/
void add(final Task task);
Copy link
Member Author

@cadonna cadonna Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each time an active or standby task transits to state RESTORING it needs to be added to the state updater.

*
* @param task tasks to remove
*/
void remove(final Task task);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each time a task is suspended in state RESTORING, we need to remove it from the state updater.

*
* @return list of active tasks with up-to-date states
*/
List<StreamTask> getRestoredActiveTasks(final Duration timeout);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the restoration of an active task is completed, the task is returned call to this method. The calling thread can decide how long to wait for the next restored active tasks.

*
* @return failed tasks
*/
List<Task> getFailedTasks();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task fails during restoration it is returned by this method. The exception that caused the failure can be retrieved from the task itself with Task#getException().

import java.time.Duration;
import java.util.List;

public interface StateUpdater {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An implementation of the state updater is passed to the task manager in its constructor. Since the threading model of the state updater is encapsulated and one can pass the same state updater to multiple task managers there is no fixed 1:1 relationship between a restoration thread and a stream thread.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

*
* @return exception that caused the failure of the task
*/
Optional<RuntimeException> getException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can narrow down the scope of this exception, if it is only going to be used during restoration time. But nevertheless we can discuss about this later.

@cadonna cadonna changed the title [WIP] KAFKA-10199: Add interface for state updater KAFKA-10199: Add interface for state updater Feb 23, 2022
@guozhangwang guozhangwang merged commit 8d88b20 into apache:trunk Feb 23, 2022
@guozhangwang
Copy link
Contributor

Thanks @cadonna , merged to trunk.

yyu1993 added a commit to confluentinc/kafka that referenced this pull request Feb 25, 2022
* apache-kafka/trunk: (49 commits)
  KAFKA-12738: send LeaveGroup request when thread dies to optimize replacement time (apache#11801)
  MINOR: Skip fsync on parent directory to start Kafka on ZOS (apache#11793)
  KAFKA-12738: track processing errors and implement constant-time task backoff (apache#11787)
  MINOR: Cleanup admin creation logic in integration tests (apache#11790)
  KAFKA-10199: Add interface for state updater (apache#11499)
  KAFKA-10000: Utils methods for overriding user-supplied properties and dealing with Enum types (apache#11774)
  KAFKA-10000: Add new metrics for source task transactions (apache#11772)
  KAFKA-13676: Commit successfully processed tasks on error (apache#11791)
  KAFKA-13511: Add support for different unix precisions in TimestampConverter SMT (apache#11575)
  MINOR: Improve Connect docs (apache#11642)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants