Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 34 additions & 86 deletions streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor;

import org.apache.kafka.streams.errors.TaskIdFormatException;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand All @@ -26,21 +24,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo;

/**
* The task ID representation composed as topic group ID plus the assigned partition ID.
* The task ID representation composed as subtopology (aka topicGroupId) plus the assigned partition ID.
*/
public class TaskId implements Comparable<TaskId> {

private static final Logger LOG = LoggerFactory.getLogger(TaskId.class);

/** The ID of the topic group. */
/** The ID of the subtopology, aka topicGroupId. */
@Deprecated
public final int topicGroupId;
/** The ID of the partition. */
@Deprecated
public final int partition;

/** The namedTopology that this task belongs to, or null if it does not belong to one */
protected final String namedTopology;
private final String namedTopology;

public TaskId(final int topicGroupId, final int partition) {
this(topicGroupId, partition, null);
Expand All @@ -58,112 +60,58 @@ public TaskId(final int topicGroupId, final int partition, final String namedTop
}
}

@Override
public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
public int subtopology() {
return topicGroupId;
}

public String toTaskDirString() {
return topicGroupId + "_" + partition;
public int partition() {
return partition;
}

/**
* Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the
* optional namedTopology (may be null)
*
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
* Experimental feature -- will return null
*/
public static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) {
final int index = taskIdStr.indexOf('_');
if (index <= 0 || index + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}

try {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
final int partition = Integer.parseInt(taskIdStr.substring(index + 1));
public String namedTopology() {
return namedTopology;
}

return new TaskId(topicGroupId, partition, namedTopology);
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
@Override
public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
}

/**
* @throws IOException if cannot write to output stream
* @deprecated since 3.0, for internal use, will be removed
*/
@Deprecated
public void writeTo(final DataOutputStream out, final int version) throws IOException {
out.writeInt(topicGroupId);
out.writeInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (namedTopology != null) {
out.writeInt(namedTopology.length());
out.writeChars(namedTopology);
} else {
out.writeInt(0);
}
}
writeTaskIdTo(this, out, version);
}

/**
* @throws IOException if cannot read from input stream
* @deprecated since 3.0, for internal use, will be removed
*/
@Deprecated
public static TaskId readFrom(final DataInputStream in, final int version) throws IOException {
final int topicGroupId = in.readInt();
final int partition = in.readInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = in.readInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(in.readChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology));
return readTaskIdFrom(in, version);
}

/**
* @deprecated since 3.0, for internal use, will be removed
*/
@Deprecated
public void writeTo(final ByteBuffer buf, final int version) {
buf.putInt(topicGroupId);
buf.putInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (namedTopology != null) {
buf.putInt(namedTopology.length());
for (final char c : namedTopology.toCharArray()) {
buf.putChar(c);
}
} else {
buf.putInt(0);
}
}
}

public static TaskId readFrom(final ByteBuffer buf, final int version) {
final int topicGroupId = buf.getInt();
final int partition = buf.getInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = buf.getInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(buf.getChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology));
writeTaskIdTo(this, buf, version);
}

/**
* @return the namedTopology name, or null if the passed in namedTopology is null or the empty string
* @deprecated since 3.0, for internal use, will be removed
*/
private static String getNamedTopologyOrElseNull(final String namedTopology) {
return (namedTopology == null || namedTopology.length() == 0) ?
null :
namedTopology;
@Deprecated
public static TaskId readFrom(final ByteBuffer buf, final int version) {
return readTaskIdFrom(buf, version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public class TaskMetadata {

private final String taskId;
private final TaskId taskId;

private final Set<TopicPartition> topicPartitions;

Expand All @@ -40,7 +40,7 @@ public class TaskMetadata {

private final Optional<Long> timeCurrentIdlingStarted;

public TaskMetadata(final String taskId,
public TaskMetadata(final TaskId taskId,
final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets,
final Map<TopicPartition, Long> endOffsets,
Expand All @@ -52,10 +52,22 @@ public TaskMetadata(final String taskId,
this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
}

public String taskId() {
/**
* @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
return taskId;
}

/**
* @return the basic task metadata such as subtopology and partition id
* @deprecated please use {@link #getTaskId()} instead.
*/
@Deprecated
public String taskId() {
return taskId.toString();
}

public Set<TopicPartition> topicPartitions() {
return topicPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,

final LogContext logContext = getLogContext(taskId);

final ProcessorTopology topology = builder.buildSubtopology(taskId.topicGroupId);
final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology());

final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Expand Down Expand Up @@ -194,7 +194,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
inputPartitions,
consumer,
logContext,
builder.buildSubtopology(standbyTask.id.topicGroupId),
builder.buildSubtopology(standbyTask.id.subtopology()),
stateManager,
context
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setup() {
final Set<TopicPartition> changelogTopicPartitions = topicsInfo.stateChangelogTopics
.keySet()
.stream()
.map(topic -> new TopicPartition(topic, task.partition))
.map(topic -> new TopicPartition(topic, task.partition()))
.collect(Collectors.toSet());
changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions);
}
Expand All @@ -84,8 +84,8 @@ public void setup() {
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = UNKNOWN;
for (final TaskId task : topicGroupTasks) {
if (numPartitions < task.partition + 1) {
numPartitions = task.partition + 1;
if (numPartitions < task.partition() + 1) {
numPartitions = task.partition() + 1;
}
}
topicConfig.setNumberOfPartitions(numPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ private TopicPartition getStorePartition(final String storeName) {
// NOTE we assume the partition of the topic can always be inferred from the task id;
// if user ever use a custom partition grouper (deprecated in KIP-528) this would break and
// it is not a regression (it would always break anyways)
return new TopicPartition(changelogFor(storeName), taskId.partition);
return new TopicPartition(changelogFor(storeName), taskId.partition());
}

private boolean isLoggingEnabled(final String storeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre
final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();

final ProcessorTopology topology = builder.buildSubtopology(taskId.topicGroupId);
final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology());

if (topology.hasStateWithChangelogs()) {
final ProcessorStateManager stateManager = new ProcessorStateManager(
Expand Down Expand Up @@ -120,7 +120,7 @@ StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,
return createStandbyTask(
streamTask.id(),
inputPartitions,
builder.buildSubtopology(streamTask.id.topicGroupId),
builder.buildSubtopology(streamTask.id.subtopology()),
stateManager,
context
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.regex.Pattern;

import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;

/**
* Manages the directories where the state of Tasks owned by a {@link StreamThread} are
Expand Down Expand Up @@ -388,7 +389,7 @@ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
for (final File taskDir : listNonEmptyTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null);
final TaskId id = parseTaskDirectoryName(dirName, null);
if (!lockedTasksToOwner.containsKey(id)) {
try {
if (lock(id)) {
Expand Down Expand Up @@ -421,7 +422,7 @@ private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception {
final AtomicReference<Exception> firstException = new AtomicReference<>();
for (final File taskDir : listAllTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null);
final TaskId id = parseTaskDirectoryName(dirName, null);
try {
log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
Expand Down Expand Up @@ -152,4 +153,34 @@ static void closeStateManager(final Logger log,
throw exception;
}
}

/**
* Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the
* optional namedTopology (may be null)
*
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) {
final int index = taskIdStr.indexOf('_');
if (index <= 0 || index + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}

try {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
final int partition = Integer.parseInt(taskIdStr.substring(index + 1));

return new TaskId(topicGroupId, partition, namedTopology);
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
}

/**
* @return The string representation of the subtopology and partition metadata, ie the task id string without
* the named topology, which defines the innermost task directory name of this task's state
*/
static String toTaskDirString(final TaskId taskId) {
return taskId.subtopology() + "_" + taskId.partition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ private void updateThreadMetadata(final Map<TaskId, Task> activeTasks,
final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
for (final Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
activeTasksMetadata.add(new TaskMetadata(
task.getValue().id().toString(),
task.getValue().id(),
task.getValue().inputPartitions(),
task.getValue().committedOffsets(),
task.getValue().highWaterMark(),
Expand All @@ -1151,7 +1151,7 @@ private void updateThreadMetadata(final Map<TaskId, Task> activeTasks,
final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
for (final Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
standbyTasksMetadata.add(new TaskMetadata(
task.getValue().id().toString(),
task.getValue().id(),
task.getValue().inputPartitions(),
task.getValue().committedOffsets(),
task.getValue().highWaterMark(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTaskId;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

Expand Down Expand Up @@ -516,7 +515,7 @@ private void populateTasksForMaps(final Map<TopicPartition, TaskId> taskForParti
}
allAssignedPartitions.addAll(partitions);

tasksForTopicGroup.computeIfAbsent(new Subtopology(id.topicGroupId, NamedTaskId.namedTopology(id)), k -> new HashSet<>()).add(id);
tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.namedTopology()), k -> new HashSet<>()).add(id);
}

checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);
Expand Down
Loading