Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,23 @@ public ClientState(final Set<TaskId> previousActiveTasks,
final Map<TaskId, Long> taskLagTotals,
final Map<String, String> clientTags,
final int capacity) {
this(previousActiveTasks, previousStandbyTasks, taskLagTotals, clientTags, capacity, null);
}

// For testing only
public ClientState(final Set<TaskId> previousActiveTasks,
final Set<TaskId> previousStandbyTasks,
final Map<TaskId, Long> taskLagTotals,
final Map<String, String> clientTags,
final int capacity,
final UUID processId) {
this.previousStandbyTasks.taskIds(unmodifiableSet(new TreeSet<>(previousStandbyTasks)));
this.previousActiveTasks.taskIds(unmodifiableSet(new TreeSet<>(previousActiveTasks)));
taskOffsetSums = emptyMap();
this.taskLagTotals = unmodifiableMap(taskLagTotals);
this.capacity = capacity;
this.clientTags = unmodifiableMap(clientTags);
this.processId = processId;
}

int capacity() {
Expand Down Expand Up @@ -133,6 +144,10 @@ public void assignActiveTasks(final Collection<TaskId> tasks) {
assignedActiveTasks.taskIds().addAll(tasks);
}

public void assignStandbyTasks(final Collection<TaskId> tasks) {
assignedStandbyTasks.taskIds().addAll(tasks);
}

public void assignActiveToConsumer(final TaskId task, final String consumer) {
if (!assignedActiveTasks.taskIds().contains(task)) {
throw new IllegalStateException("added not assign active task " + task + " to this client state.");
Expand Down Expand Up @@ -206,6 +221,10 @@ boolean hasStandbyTask(final TaskId taskId) {
return assignedStandbyTasks.taskIds().contains(taskId);
}

boolean hasActiveTask(final TaskId taskId) {
return assignedActiveTasks.taskIds().contains(taskId);
}

int standbyTaskCount() {
return assignedStandbyTasks.taskIds().size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand Down Expand Up @@ -156,6 +159,48 @@ public boolean isAllowedTaskMovement(final ClientState source, final ClientState
return true;
}

/**
* Whether one task can be moved from source to destination. If the number of distinct tags including active
* and standby after the movement isn't decreased, then we can move the task. Otherwise, we can not move
* the task.
* @param source Source client
* @param destination Destination client
* @param sourceTask Task to move
* @param clientStateMap All client metadata
* @return If the task can be moved
*/
@Override
public boolean isAllowedTaskMovement(final ClientState source,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I grog the semantics of this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is to check whether a specific task can be moved between two clients. The method on line 149 checks if any task can be moved between two clients.

About the implementation in tag aware case, a task can be moved between two clients if the number of distinct tags after the movement is equal to or more than the number of tags before the movement. So the reliability constraint doesn't get worse.

final ClientState destination,
final TaskId sourceTask,
final Map<UUID, ClientState> clientStateMap) {

final BiConsumer<ClientState, Set<KeyValue<String, String>>> addTags = (cs, tagSet) -> {
final Map<String, String> tags = clientTagFunction.apply(cs.processId(), cs);
if (tags != null) {
tagSet.addAll(tags.entrySet().stream()
.map(entry -> KeyValue.pair(entry.getKey(), entry.getValue()))
.collect(Collectors.toList())
);
}
};

final Set<KeyValue<String, String>> tagsWithSource = new HashSet<>();
final Set<KeyValue<String, String>> tagsWithDestination = new HashSet<>();
for (final ClientState clientState : clientStateMap.values()) {
if (clientState.hasAssignedTask(sourceTask)
&& !clientState.processId().equals(source.processId())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I can follow? Does clientState.hasAssignedTask(sourceTask) not imply that the processId of clientState is the same as the "source client"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasAssignedTask checks both active and standby. Since standby can also have multiple replicas, there couldn't multiple processId having the task

&& !clientState.processId().equals(destination.processId())) {
addTags.accept(clientState, tagsWithSource);
addTags.accept(clientState, tagsWithDestination);
}
}
addTags.accept(source, tagsWithSource);
addTags.accept(destination, tagsWithDestination);

return tagsWithDestination.size() >= tagsWithSource.size();
}

// Visible for testing
void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
Expand Down
Loading