diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 3ae4573d2071f..892d025be4459 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -117,6 +117,14 @@

Streams API

We removed the default implementation of RocksDBConfigSetter#close().

+ +

+ The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new TaskId.subtopology() + (which replaces topicGroupId) and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated + and will be removed, as they were never intended for public use. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadata constructor. + These have been replaced with APIs that better represent the task id as an actual TaskId object instead of a String. Please migrate to the new TaskMetadata#getTaskId + method. See KIP-740 for more details. +

We removed the following deprecated APIs:

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index d53630225cc15..f4d8349eadd95 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -16,11 +16,14 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.errors.TaskIdFormatException; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +83,35 @@ public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } + /** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ + public static TaskId parse(final String taskIdStr) { + final int firstIndex = taskIdStr.indexOf('_'); + final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1); + if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) { + throw new TaskIdFormatException(taskIdStr); + } + + try { + // If only one copy of '_' exists, there is no named topology in the string + if (secondIndex < 0) { + final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, firstIndex)); + final int partition = Integer.parseInt(taskIdStr.substring(firstIndex + 1)); + + return new TaskId(topicGroupId, partition); + } else { + final String namedTopology = taskIdStr.substring(0, firstIndex); + final int topicGroupId = Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex)); + final int partition = Integer.parseInt(taskIdStr.substring(secondIndex + 1)); + + return new TaskId(topicGroupId, partition, namedTopology); + } + } catch (final Exception e) { + throw new TaskIdFormatException(taskIdStr); + } + } + /** * @throws IOException if cannot write to output stream * @deprecated since 3.0, for internal use, will be removed diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java index a35b7281118d7..f5a5a695bfdf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java @@ -40,6 +40,19 @@ public class TaskMetadata { private final Optional timeCurrentIdlingStarted; + /** + * @deprecated since 3.0, not intended for public use + */ + @Deprecated + public TaskMetadata(final String taskId, + final Set topicPartitions, + final Map committedOffsets, + final Map endOffsets, + final Optional timeCurrentIdlingStarted) { + this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted); + } + + // For internal use -- not a public API public TaskMetadata(final TaskId taskId, final Set topicPartitions, final Map committedOffsets, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index b17169c64f143..fb1a486b573ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -148,6 +148,18 @@ private void assertPermissions(final File file) { } } + @Test + public void shouldParseUnnamedTaskId() { + final TaskId task = new TaskId(1, 0); + assertThat(TaskId.parse(task.toString()), equalTo(task)); + } + + @Test + public void shouldParseNamedTaskId() { + final TaskId task = new TaskId(1, 0, "namedTopology"); + assertThat(TaskId.parse(task.toString()), equalTo(task)); + } + @Test public void shouldCreateTaskStateDirectory() { final TaskId taskId = new TaskId(0, 0);