Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
<p>
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p>

<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
</p>
<p>
We removed the following deprecated APIs:
</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.apache.kafka.streams.processor;

import org.apache.kafka.streams.errors.TaskIdFormatException;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,6 +83,35 @@ public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
}

/**
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
public static TaskId parse(final String taskIdStr) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this has been (re)moved since 2.8 so I put it back with updates to handle named topologies, plus tests which it did not seem to have

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it was removed (and release in 2.8), why add it back? Seems we don't need it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that would be a breaking change by removing a non-deprecated API, no? And in this case I actually believe we should not deprecate it -- if toString is part of the public TaskId API (and it should be) then this parse method which does the reverse should be as well. As I discussed with some during the KIP-740 debacle debate, part of the public contract of TaskId is in its string representation since that is what ends up in logs, metrics, etc. So imo it does make sense to provide a String-to-TaskId API

final int firstIndex = taskIdStr.indexOf('_');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I missed the "name topology" change. What is a topology name? How to set it? And do we ensure that we don't allow _ in its name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we plan to restrict the _ character. If we want to loosen that up later we can just parse this from the back, but I think it's reasonable to just disallow _ completely.

What is a topology name?

Great question. Not necessarily a short answer but I can try -- basically an independent and isolated piece of a topology that can be added/removed/etc at will, even on a running app.

How to set it?

The skeleton API was merged in #10615, it has/is evolving a bit since then but the basic idea holds -- each NamedTopology is built up with a special builder called the NamedTopologyStreamsBuilder. And a dedicated KafkaStreams wrapper is the entry point for starting up an app using NamedTopologies. All currently under the internals package while it's under the experimental phase so it should not be possible for a user to end up with anything NamedTopology through public APIs.

final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1);
if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}

try {
// If only one copy of '_' exists, there is no named topology in the string
if (secondIndex < 0) {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, firstIndex));
final int partition = Integer.parseInt(taskIdStr.substring(firstIndex + 1));

return new TaskId(topicGroupId, partition);
} else {
final String namedTopology = taskIdStr.substring(0, firstIndex);
final int topicGroupId = Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex));
final int partition = Integer.parseInt(taskIdStr.substring(secondIndex + 1));

return new TaskId(topicGroupId, partition, namedTopology);
}
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
}

/**
* @throws IOException if cannot write to output stream
* @deprecated since 3.0, for internal use, will be removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ public class TaskMetadata {

private final Optional<Long> timeCurrentIdlingStarted;

/**
* @deprecated since 3.0, not intended for public use
*/
@Deprecated
public TaskMetadata(final String taskId,
final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets,
final Map<TopicPartition, Long> endOffsets,
final Optional<Long> timeCurrentIdlingStarted) {
this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted);
}

// For internal use -- not a public API
public TaskMetadata(final TaskId taskId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this protected and add a internal "impl" class? (And also mark as non-public so we can eventually move to an interface?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we would still need a public constructor no matter what even with an internal impl class. I'll add a comment to clarify that it's not intended for public use and maybe file a followup ticket to migrate this to an interface in case someone wants to get into that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we would still need a public constructor

Why can't we make it protected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant it seems like we should need it as long as TaskMetadata continues to be a public class, ie if we take away the constructor then we've essentially all but made it into an interface already -- which was not what was agreed on in KIP-740, and I don't want to drag this out further if there is any pushback...

Imo it's sufficient to just leave the constructor as public but not present it as a public API, and leave the rest for the KIP for KAFKA-12849. It's a simple KIP so I'm optimistic someone new to kafka may want to pick it up 🙂

All that said I don't feel too strongly (in fact I agree with you) so I can be convinced if you do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact we already got a bite on KAFKA-12849 -- if it's all the same to you then I'd prefer to just let this be handled by that KIP. Assuming we get it in by 3.0 then there won't ever be a new public constructor published for TaskMetadata

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. Thanks for the details.

final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ private void assertPermissions(final File file) {
}
}

@Test
public void shouldParseUnnamedTaskId() {
final TaskId task = new TaskId(1, 0);
assertThat(TaskId.parse(task.toString()), equalTo(task));
}

@Test
public void shouldParseNamedTaskId() {
final TaskId task = new TaskId(1, 0, "namedTopology");
assertThat(TaskId.parse(task.toString()), equalTo(task));
}

@Test
public void shouldCreateTaskStateDirectory() {
final TaskId taskId = new TaskId(0, 0);
Expand Down