diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index 59e7ef4bbc841..d53630225cc15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.processor; -import org.apache.kafka.streams.errors.TaskIdFormatException; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -26,21 +24,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION; +import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom; +import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo; /** - * The task ID representation composed as topic group ID plus the assigned partition ID. + * The task ID representation composed as subtopology (aka topicGroupId) plus the assigned partition ID. */ public class TaskId implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(TaskId.class); - /** The ID of the topic group. */ + /** The ID of the subtopology, aka topicGroupId. */ + @Deprecated public final int topicGroupId; /** The ID of the partition. */ + @Deprecated public final int partition; + /** The namedTopology that this task belongs to, or null if it does not belong to one */ - protected final String namedTopology; + private final String namedTopology; public TaskId(final int topicGroupId, final int partition) { this(topicGroupId, partition, null); @@ -58,112 +60,58 @@ public TaskId(final int topicGroupId, final int partition, final String namedTop } } - @Override - public String toString() { - return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; + public int subtopology() { + return topicGroupId; } - public String toTaskDirString() { - return topicGroupId + "_" + partition; + public int partition() { + return partition; } /** - * Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the - * optional namedTopology (may be null) - * - * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + * Experimental feature -- will return null */ - public static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) { - final int index = taskIdStr.indexOf('_'); - if (index <= 0 || index + 1 >= taskIdStr.length()) { - throw new TaskIdFormatException(taskIdStr); - } - - try { - final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index)); - final int partition = Integer.parseInt(taskIdStr.substring(index + 1)); + public String namedTopology() { + return namedTopology; + } - return new TaskId(topicGroupId, partition, namedTopology); - } catch (final Exception e) { - throw new TaskIdFormatException(taskIdStr); - } + @Override + public String toString() { + return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } /** * @throws IOException if cannot write to output stream + * @deprecated since 3.0, for internal use, will be removed */ + @Deprecated public void writeTo(final DataOutputStream out, final int version) throws IOException { - out.writeInt(topicGroupId); - out.writeInt(partition); - if (version >= MIN_NAMED_TOPOLOGY_VERSION) { - if (namedTopology != null) { - out.writeInt(namedTopology.length()); - out.writeChars(namedTopology); - } else { - out.writeInt(0); - } - } + writeTaskIdTo(this, out, version); } /** * @throws IOException if cannot read from input stream + * @deprecated since 3.0, for internal use, will be removed */ + @Deprecated public static TaskId readFrom(final DataInputStream in, final int version) throws IOException { - final int topicGroupId = in.readInt(); - final int partition = in.readInt(); - final String namedTopology; - if (version >= MIN_NAMED_TOPOLOGY_VERSION) { - final int numNamedTopologyChars = in.readInt(); - final StringBuilder namedTopologyBuilder = new StringBuilder(); - for (int i = 0; i < numNamedTopologyChars; ++i) { - namedTopologyBuilder.append(in.readChar()); - } - namedTopology = namedTopologyBuilder.toString(); - } else { - namedTopology = null; - } - return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology)); + return readTaskIdFrom(in, version); } + /** + * @deprecated since 3.0, for internal use, will be removed + */ + @Deprecated public void writeTo(final ByteBuffer buf, final int version) { - buf.putInt(topicGroupId); - buf.putInt(partition); - if (version >= MIN_NAMED_TOPOLOGY_VERSION) { - if (namedTopology != null) { - buf.putInt(namedTopology.length()); - for (final char c : namedTopology.toCharArray()) { - buf.putChar(c); - } - } else { - buf.putInt(0); - } - } - } - - public static TaskId readFrom(final ByteBuffer buf, final int version) { - final int topicGroupId = buf.getInt(); - final int partition = buf.getInt(); - final String namedTopology; - if (version >= MIN_NAMED_TOPOLOGY_VERSION) { - final int numNamedTopologyChars = buf.getInt(); - final StringBuilder namedTopologyBuilder = new StringBuilder(); - for (int i = 0; i < numNamedTopologyChars; ++i) { - namedTopologyBuilder.append(buf.getChar()); - } - namedTopology = namedTopologyBuilder.toString(); - } else { - namedTopology = null; - } - return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology)); + writeTaskIdTo(this, buf, version); } /** - * @return the namedTopology name, or null if the passed in namedTopology is null or the empty string + * @deprecated since 3.0, for internal use, will be removed */ - private static String getNamedTopologyOrElseNull(final String namedTopology) { - return (namedTopology == null || namedTopology.length() == 0) ? - null : - namedTopology; + @Deprecated + public static TaskId readFrom(final ByteBuffer buf, final int version) { + return readTaskIdFrom(buf, version); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java index 63129dacc203e..a35b7281118d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java @@ -30,7 +30,7 @@ */ public class TaskMetadata { - private final String taskId; + private final TaskId taskId; private final Set topicPartitions; @@ -40,7 +40,7 @@ public class TaskMetadata { private final Optional timeCurrentIdlingStarted; - public TaskMetadata(final String taskId, + public TaskMetadata(final TaskId taskId, final Set topicPartitions, final Map committedOffsets, final Map endOffsets, @@ -52,10 +52,22 @@ public TaskMetadata(final String taskId, this.timeCurrentIdlingStarted = timeCurrentIdlingStarted; } - public String taskId() { + /** + * @return the basic task metadata such as subtopology and partition id + */ + public TaskId getTaskId() { return taskId; } + /** + * @return the basic task metadata such as subtopology and partition id + * @deprecated please use {@link #getTaskId()} instead. + */ + @Deprecated + public String taskId() { + return taskId.toString(); + } + public Set topicPartitions() { return topicPartitions; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 788ae737dc00f..322ff56e74d00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -143,7 +143,7 @@ Collection createTasks(final Consumer consumer, final LogContext logContext = getLogContext(taskId); - final ProcessorTopology topology = builder.buildSubtopology(taskId.topicGroupId); + final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology()); final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, @@ -194,7 +194,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask, inputPartitions, consumer, logContext, - builder.buildSubtopology(standbyTask.id.topicGroupId), + builder.buildSubtopology(standbyTask.id.subtopology()), stateManager, context ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java index c498bd030f9c8..a2ad123dab73e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java @@ -75,7 +75,7 @@ public void setup() { final Set changelogTopicPartitions = topicsInfo.stateChangelogTopics .keySet() .stream() - .map(topic -> new TopicPartition(topic, task.partition)) + .map(topic -> new TopicPartition(topic, task.partition())) .collect(Collectors.toSet()); changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions); } @@ -84,8 +84,8 @@ public void setup() { // the expected number of partitions is the max value of TaskId.partition + 1 int numPartitions = UNKNOWN; for (final TaskId task : topicGroupTasks) { - if (numPartitions < task.partition + 1) { - numPartitions = task.partition + 1; + if (numPartitions < task.partition() + 1) { + numPartitions = task.partition() + 1; } } topicConfig.setNumberOfPartitions(numPartitions); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 2b75d8ddcb579..9d646676335b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -615,7 +615,7 @@ private TopicPartition getStorePartition(final String storeName) { // NOTE we assume the partition of the topic can always be inferred from the task id; // if user ever use a custom partition grouper (deprecated in KIP-528) this would break and // it is not a regression (it would always break anyways) - return new TopicPartition(changelogFor(storeName), taskId.partition); + return new TopicPartition(changelogFor(storeName), taskId.partition()); } private boolean isLoggingEnabled(final String storeName) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 56d4220bf70a9..3f0dd22f1bda2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -74,7 +74,7 @@ Collection createTasks(final Map> tasksToBeCre final TaskId taskId = newTaskAndPartitions.getKey(); final Set partitions = newTaskAndPartitions.getValue(); - final ProcessorTopology topology = builder.buildSubtopology(taskId.topicGroupId); + final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology()); if (topology.hasStateWithChangelogs()) { final ProcessorStateManager stateManager = new ProcessorStateManager( @@ -120,7 +120,7 @@ StandbyTask createStandbyTaskFromActive(final StreamTask streamTask, return createStandbyTask( streamTask.id(), inputPartitions, - builder.buildSubtopology(streamTask.id.topicGroupId), + builder.buildSubtopology(streamTask.id.subtopology()), stateManager, context ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 5f83b6a00546f..f3a24aa0b52da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -46,6 +46,7 @@ import java.util.regex.Pattern; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; /** * Manages the directories where the state of Tasks owned by a {@link StreamThread} are @@ -388,7 +389,7 @@ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { for (final File taskDir : listNonEmptyTaskDirectories()) { final String dirName = taskDir.getName(); - final TaskId id = TaskId.parseTaskDirectoryName(dirName, null); + final TaskId id = parseTaskDirectoryName(dirName, null); if (!lockedTasksToOwner.containsKey(id)) { try { if (lock(id)) { @@ -421,7 +422,7 @@ private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception { final AtomicReference firstException = new AtomicReference<>(); for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); - final TaskId id = TaskId.parseTaskDirectoryName(dirName, null); + final TaskId id = parseTaskDirectoryName(dirName, null); try { log.info("{} Deleting state directory {} for task {} as user calling cleanup.", logPrefix(), dirName, id); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index d616d60dc530f..a1c8ca84125f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.TaskType; @@ -152,4 +153,34 @@ static void closeStateManager(final Logger log, throw exception; } } + + /** + * Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the + * optional namedTopology (may be null) + * + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ + static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) { + final int index = taskIdStr.indexOf('_'); + if (index <= 0 || index + 1 >= taskIdStr.length()) { + throw new TaskIdFormatException(taskIdStr); + } + + try { + final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index)); + final int partition = Integer.parseInt(taskIdStr.substring(index + 1)); + + return new TaskId(topicGroupId, partition, namedTopology); + } catch (final Exception e) { + throw new TaskIdFormatException(taskIdStr); + } + } + + /** + * @return The string representation of the subtopology and partition metadata, ie the task id string without + * the named topology, which defines the innermost task directory name of this task's state + */ + static String toTaskDirString(final TaskId taskId) { + return taskId.subtopology() + "_" + taskId.partition(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fed8015521327..da79c77c4f7b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1141,7 +1141,7 @@ private void updateThreadMetadata(final Map activeTasks, final Set activeTasksMetadata = new HashSet<>(); for (final Map.Entry task : activeTasks.entrySet()) { activeTasksMetadata.add(new TaskMetadata( - task.getValue().id().toString(), + task.getValue().id(), task.getValue().inputPartitions(), task.getValue().committedOffsets(), task.getValue().highWaterMark(), @@ -1151,7 +1151,7 @@ private void updateThreadMetadata(final Map activeTasks, final Set standbyTasksMetadata = new HashSet<>(); for (final Map.Entry task : standbyTasks.entrySet()) { standbyTasksMetadata.add(new TaskMetadata( - task.getValue().id().toString(), + task.getValue().id(), task.getValue().inputPartitions(), task.getValue().committedOffsets(), task.getValue().highWaterMark(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 6c3d6571def24..16d73838b9104 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -50,7 +50,6 @@ import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; -import org.apache.kafka.streams.processor.internals.namedtopology.NamedTaskId; import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; @@ -516,7 +515,7 @@ private void populateTasksForMaps(final Map taskForParti } allAssignedPartitions.addAll(partitions); - tasksForTopicGroup.computeIfAbsent(new Subtopology(id.topicGroupId, NamedTaskId.namedTopology(id)), k -> new HashSet<>()).add(id); + tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.namedTopology()), k -> new HashSet<>()).add(id); } checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 7fa624a0ac1bd..5d5217e4b1c45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -64,6 +64,7 @@ import static org.apache.kafka.common.utils.Utils.intersection; import static org.apache.kafka.common.utils.Utils.union; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_V2; @@ -683,7 +684,7 @@ private void tryToLockAllNonEmptyTaskDirectories() { for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) { try { - final TaskId id = TaskId.parseTaskDirectoryName(dir.getName(), null); + final TaskId id = parseTaskDirectoryName(dir.getName(), null); if (stateDirectory.lock(id)) { lockedTaskDirectories.add(id); if (!tasks.owned(id)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 4fea12b4544fe..6ecc4ada3827c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -41,6 +41,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom; +import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; @@ -207,14 +209,14 @@ private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) th // encode active tasks out.writeInt(activeTasks.size()); for (final TaskId id : activeTasks) { - id.writeTo(out, usedVersion); + writeTaskIdTo(id, out, usedVersion); } // encode standby tasks out.writeInt(standbyTasks.size()); for (final Map.Entry> entry : standbyTasks.entrySet()) { final TaskId id = entry.getKey(); - id.writeTo(out, usedVersion); + writeTaskIdTo(id, out, usedVersion); final Set partitions = entry.getValue(); writeTopicPartitions(out, partitions); @@ -383,7 +385,7 @@ private static void decodeActiveTasks(final AssignmentInfo assignmentInfo, final int count = in.readInt(); assignmentInfo.activeTasks = new ArrayList<>(count); for (int i = 0; i < count; i++) { - assignmentInfo.activeTasks.add(TaskId.readFrom(in, assignmentInfo.usedVersion)); + assignmentInfo.activeTasks.add(readTaskIdFrom(in, assignmentInfo.usedVersion)); } } @@ -392,7 +394,7 @@ private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo, final int count = in.readInt(); assignmentInfo.standbyTasks = new HashMap<>(count); for (int i = 0; i < count; i++) { - final TaskId id = TaskId.readFrom(in, assignmentInfo.usedVersion); + final TaskId id = readTaskIdFrom(in, assignmentInfo.usedVersion); assignmentInfo.standbyTasks.put(id, readTopicPartitions(in)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java new file mode 100644 index 0000000000000..f54aca7115455 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION; + +/** + * Utility class for common assignment or consumer protocol utility methods such as de/serialization + */ +public class ConsumerProtocolUtils { + + /** + * @throws IOException if cannot write to output stream + */ + public static void writeTaskIdTo(final TaskId taskId, final DataOutputStream out, final int version) throws IOException { + out.writeInt(taskId.subtopology()); + out.writeInt(taskId.partition()); + if (version >= MIN_NAMED_TOPOLOGY_VERSION) { + if (taskId.namedTopology() != null) { + out.writeInt(taskId.namedTopology().length()); + out.writeChars(taskId.namedTopology()); + } else { + out.writeInt(0); + } + } + } + + /** + * @throws IOException if cannot read from input stream + */ + public static TaskId readTaskIdFrom(final DataInputStream in, final int version) throws IOException { + final int subtopology = in.readInt(); + final int partition = in.readInt(); + final String namedTopology; + if (version >= MIN_NAMED_TOPOLOGY_VERSION) { + final int numNamedTopologyChars = in.readInt(); + final StringBuilder namedTopologyBuilder = new StringBuilder(); + for (int i = 0; i < numNamedTopologyChars; ++i) { + namedTopologyBuilder.append(in.readChar()); + } + namedTopology = namedTopologyBuilder.toString(); + } else { + namedTopology = null; + } + return new TaskId(subtopology, partition, getNamedTopologyOrElseNull(namedTopology)); + } + + public static void writeTaskIdTo(final TaskId taskId, final ByteBuffer buf, final int version) { + buf.putInt(taskId.subtopology()); + buf.putInt(taskId.partition()); + if (version >= MIN_NAMED_TOPOLOGY_VERSION) { + if (taskId.namedTopology() != null) { + buf.putInt(taskId.namedTopology().length()); + for (final char c : taskId.namedTopology().toCharArray()) { + buf.putChar(c); + } + } else { + buf.putInt(0); + } + } + } + + public static TaskId readTaskIdFrom(final ByteBuffer buf, final int version) { + final int subtopology = buf.getInt(); + final int partition = buf.getInt(); + final String namedTopology; + if (version >= MIN_NAMED_TOPOLOGY_VERSION) { + final int numNamedTopologyChars = buf.getInt(); + final StringBuilder namedTopologyBuilder = new StringBuilder(); + for (int i = 0; i < numNamedTopologyChars; ++i) { + namedTopologyBuilder.append(buf.getChar()); + } + namedTopology = namedTopologyBuilder.toString(); + } else { + namedTopology = null; + } + return new TaskId(subtopology, partition, getNamedTopologyOrElseNull(namedTopology)); + } + + /** + * @return the namedTopology name, or null if the passed in namedTopology is null or the empty string + */ + private static String getNamedTopologyOrElseNull(final String namedTopology) { + return (namedTopology == null || namedTopology.length() == 0) ? + null : + namedTopology; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 5f6935f41dcb7..971439caa014d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -129,9 +129,9 @@ private void setTaskOffsetSumDataFromTaskOffsetSumMap(final Map ta final Map> topicGroupIdToPartitionOffsetSum = new HashMap<>(); for (final Map.Entry taskEntry : taskOffsetSums.entrySet()) { final TaskId task = taskEntry.getKey(); - topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.topicGroupId, t -> new ArrayList<>()).add( + topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.subtopology(), t -> new ArrayList<>()).add( new SubscriptionInfoData.PartitionToOffsetSum() - .setPartition(task.partition) + .setPartition(task.partition()) .setOffsetSum(taskEntry.getValue())); } @@ -157,14 +157,14 @@ private void setPrevAndStandbySetsFromParsedTaskOffsetSumMap(final Map { final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId(); - taskId.setTopicGroupId(t.topicGroupId); - taskId.setPartition(t.partition); + taskId.setTopicGroupId(t.subtopology()); + taskId.setPartition(t.partition()); return taskId; }).collect(Collectors.toList())); data.setStandbyTasks(standbyTasks.stream().map(t -> { final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId(); - taskId.setTopicGroupId(t.topicGroupId); - taskId.setPartition(t.partition); + taskId.setTopicGroupId(t.subtopology()); + taskId.setPartition(t.partition()); return taskId; }).collect(Collectors.toList())); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTaskId.java deleted file mode 100644 index 82be29cdb9e6f..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTaskId.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals.namedtopology; - -import org.apache.kafka.streams.processor.TaskId; - -public class NamedTaskId extends TaskId { - public NamedTaskId(final int topicGroupId, final int partition, final String namedTopology) { - super(topicGroupId, partition, namedTopology); - if (namedTopology == null) { - throw new IllegalStateException("NamedTopology is required for a NamedTaskId"); - } - } - - public String namedTopology() { - return namedTopology; - } - - public static String namedTopology(final TaskId taskId) { - if (taskId instanceof NamedTaskId) { - return ((NamedTaskId) taskId).namedTopology(); - } else { - return null; - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 3099e9ca3a45c..8d0a2a876f9f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -232,7 +232,7 @@ private void init(final StateStore root) { changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); updateBufferMetrics(); open = true; - partition = context.taskId().partition; + partition = context.taskId().partition(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index df541d3e2c9dc..7dd796e66ff8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -54,7 +54,7 @@ public List stores(final StoreQueryParameters storeQueryParams) { if (storeQueryParams.partition() != null) { for (final Task task : tasks) { - if (task.id().partition == storeQueryParams.partition() && + if (task.id().partition() == storeQueryParams.partition() && task.getStore(storeName) != null && storeName.equals(task.getStore(storeName).name())) { final T typedStore = validateAndCastStores(task.getStore(storeName), queryableStoreType, storeName, task.id()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index cf6c61a9a17f8..aa5ac9b2ac127 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -499,7 +499,7 @@ public void shouldGetChangelogPartitionForRegisteredStore() { final TopicPartition changelogPartition = stateMgr.registeredChangelogPartitionFor(persistentStoreName); assertThat(changelogPartition.topic(), is(persistentStoreTopicName)); - assertThat(changelogPartition.partition(), is(taskId.partition)); + assertThat(changelogPartition.partition(), is(taskId.partition())); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 71e6af16bf088..ef535628c11d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1682,7 +1682,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { final ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState()); - assertTrue(metadata.activeTasks().contains(new TaskMetadata(task1.toString(), Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); + assertTrue(metadata.activeTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); assertTrue(metadata.standbyTasks().isEmpty()); assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", @@ -1735,7 +1735,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { final ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); - assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1.toString(), Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); + assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); assertTrue(threadMetadata.activeTasks().isEmpty()); thread.taskManager().shutdown(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 3ef892a7c803d..860ed7383b089 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -830,7 +830,7 @@ private static Set tasksForState(final String storeName, if (stateChangelogTopics.contains(changelogTopic)) { for (final TaskId id : tasks) { - if (id.topicGroupId == entry.getKey().nodeGroupId) { + if (id.subtopology() == entry.getKey().nodeGroupId) { ids.add(id); } } @@ -2109,7 +2109,7 @@ private static AssignmentInfo checkAssignment(final Set expectedTopics, final Set partitions = entry.getValue(); for (final TopicPartition partition : partitions) { // since default grouper, taskid.partition == partition.partition() - assertEquals(id.partition, partition.partition()); + assertEquals(id.partition(), partition.partition()); standbyTopics.add(partition.topic()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 1afa76f22a6bf..0338fa838c547 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -375,7 +375,7 @@ static TaskSkewReport analyzeTaskAssignmentBalance(final Map final UUID client = entry.getKey(); final ClientState clientState = entry.getValue(); for (final TaskId task : clientState.activeTasks()) { - final int subtopology = task.topicGroupId; + final int subtopology = task.subtopology(); subtopologyToClientsWithPartition .computeIfAbsent(subtopology, initialClientCounts) .get(client) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java index cdd651f239b47..5de0c222a5c32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom; +import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import org.apache.kafka.streams.errors.TaskAssignmentException; @@ -168,7 +170,7 @@ public static void encodeTasks(final ByteBuffer buf, final int version) { buf.putInt(taskIds.size()); for (final TaskId id : taskIds) { - id.writeTo(buf, version); + writeTaskIdTo(id, buf, version); } } @@ -233,7 +235,7 @@ private static Set decodeTasks(final ByteBuffer data, final int version) final Set prevTasks = new HashSet<>(); final int numPrevTasks = data.getInt(); for (int i = 0; i < numPrevTasks; i++) { - prevTasks.add(TaskId.readFrom(data, version)); + prevTasks.add(readTaskIdFrom(data, version)); } return prevTasks; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 47ca31922301e..7536ad2eb5914 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -733,7 +733,7 @@ private void assertActiveTaskTopicGroupIdsEvenlyDistributed() { final List topicGroupIds = new ArrayList<>(); final Set activeTasks = clientStateEntry.getValue().activeTasks(); for (final TaskId activeTask : activeTasks) { - topicGroupIds.add(activeTask.topicGroupId); + topicGroupIds.add(activeTask.subtopology()); } Collections.sort(topicGroupIds); assertThat(topicGroupIds, equalTo(expectedTopicGroupIds)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 5f85803812974..2a16547f780bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -399,7 +399,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, final TaskId taskId) { final Metrics metrics = new Metrics(); final LogContext logContext = new LogContext("test-stream-task "); - final Set partitions = Collections.singleton(new TopicPartition(topicName, taskId.partition)); + final Set partitions = Collections.singleton(new TopicPartition(topicName, taskId.partition())); final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.ACTIVE, diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 55f22a2529e8b..0754829ff49e0 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -425,7 +425,7 @@ public void logChange(final String storeName, key, value, null, - taskId().partition, + taskId().partition(), timestamp, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER);