Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -881,17 +881,15 @@ private void initializeAndRestorePhase() {
// Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
private void checkForTopologyUpdates() {
if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) {
log.info("StreamThread has detected an update to the topology");

taskManager.handleTopologyUpdates();
log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment");
if (topologyMetadata.isEmpty()) {
mainConsumer.unsubscribe();
}
topologyMetadata.maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(getName());
Comment on lines -885 to -889
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this has been moved into taskManager#handleTopologyUpdates


topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);

// We don't need to manually trigger a rebalance to pick up tasks from the new topology, as
// a rebalance will always occur when the metadata is updated after a change in subscription
log.info("Updating consumer subscription following topology update");
subscribeConsumer();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,13 +1129,25 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon
* added NamedTopology and create them if so, then close any tasks whose named topology no longer exists
*/
void handleTopologyUpdates() {
tasks.maybeCreateTasksFromNewTopologies();
final Set<String> currentNamedTopologies = topologyMetadata.updateThreadTopologyVersion(Thread.currentThread().getName());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the main fix, but we were playing a little fast and loose with the topology version we were reporting having ack'ed -- tightened this up by first atomically updating the topology version and saving the set of current named topologies, then doing the actual update handling, and then checking the listeners and completing any finished add/remove topology requests


tasks.maybeCreateTasksFromNewTopologies(currentNamedTopologies);
maybeCloseTasksFromRemovedTopologies(currentNamedTopologies);

if (topologyMetadata.isEmpty()) {
log.info("Proactively unsubscribing from all topics due to empty topology");
mainConsumer.unsubscribe();
}

topologyMetadata.maybeNotifyTopologyVersionListeners();
}

void maybeCloseTasksFromRemovedTopologies(final Set<String> currentNamedTopologies) {
try {
final Set<Task> activeTasksToRemove = new HashSet<>();
final Set<Task> standbyTasksToRemove = new HashSet<>();
for (final Task task : tasks.allTasks()) {
if (!topologyMetadata.namedTopologiesView().contains(task.id().topologyName())) {
if (!currentNamedTopologies.contains(task.id().topologyName())) {
if (task.isActive()) {
activeTasksToRemove.add(task);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ void handleNewAssignmentAndCreateTasks(final Map<TaskId, Set<TopicPartition>> ac
createTasks(activeTasksToCreate, standbyTasksToCreate);
}

void maybeCreateTasksFromNewTopologies() {
final Set<String> currentNamedTopologies = topologyMetadata.namedTopologiesView();
void maybeCreateTasksFromNewTopologies(final Set<String> currentNamedTopologies) {
createTasks(
activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies),
standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ public static class TopologyVersion {
public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version
public ReentrantLock topologyLock = new ReentrantLock();
public Condition topologyCV = topologyLock.newCondition();
public List<TopologyVersionWaiters> activeTopologyWaiters = new LinkedList<>();
public List<TopologyVersionListener> activeTopologyUpdateListeners = new LinkedList<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renamed from waiters to listeners

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also another quick question regarding why we need to keep topologyVersion an atomic long? Seems besides the getters all of its updators are under the lock as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find, yeah I believe it no longer needs to be an AtomicLong, I'll change back to long

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right actually no, we. do still need it to be an AtomicLong as we check it in the StreamThread main loop when looking for topology updates. And obviously we don't want to have to grab the full lock for that

}

public static class TopologyVersionWaiters {
public static class TopologyVersionListener {
final long topologyVersion; // the (minimum) version to wait for these threads to cross
final KafkaFutureImpl<Void> future; // the future waiting on all threads to be updated

public TopologyVersionWaiters(final long topologyVersion, final KafkaFutureImpl<Void> future) {
public TopologyVersionListener(final long topologyVersion, final KafkaFutureImpl<Void> future) {
this.topologyVersion = topologyVersion;
this.future = future;
}
Expand Down Expand Up @@ -161,35 +161,47 @@ public void registerThread(final String threadName) {

public void unregisterThread(final String threadName) {
threadVersions.remove(threadName);
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
maybeNotifyTopologyVersionListeners();
}

public TaskExecutionMetadata taskExecutionMetadata() {
return taskExecutionMetadata;
}

public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) {
public Set<String> updateThreadTopologyVersion(final String threadName) {
try {
lock();
final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
TopologyVersionWaiters topologyVersionWaiters;
version.topologyLock.lock();
threadVersions.put(threadName, topologyVersion());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wcarlson5 / @guozhangwang / @vvcephei This is the main fix -- we need to split out the version update where we add the current thread with the latest topology version to this threadVersions map, since this of course should only be done when we're reacting to a topology update.

The other function of the method was to check whether we could complete any of the queued listeners, which is why we were invoking this when shutting down a thread. Splitting this out into a separate method avoids ghost threads being left behind in the threadVersions map

return namedTopologiesView();
} finally {
version.topologyLock.unlock();
}
}

public void maybeNotifyTopologyVersionListeners() {
try {
lock();
final long minThreadVersion = getMinimumThreadVersion();
final Iterator<TopologyVersionListener> iterator = version.activeTopologyUpdateListeners.listIterator();
TopologyVersionListener topologyVersionListener;
while (iterator.hasNext()) {
topologyVersionWaiters = iterator.next();
final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion;
if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) {
if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) {
topologyVersionWaiters.future.complete(null);
iterator.remove();
log.info("All threads are now on topology version {}", topologyVersionWaiters.topologyVersion);
}
topologyVersionListener = iterator.next();
final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion;
if (minThreadVersion >= topologyVersionWaitersVersion) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also refactored this slightly to optimize/clean up this method. It's less about the optimization as we should generally not have too many threads per KafkaStreams runtime, but I found it much easier to follow the logic by computing the minimum version across all threads and then completing all futures listening for the topology to be updated up to that version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also want to remove the listeners for threads that were removed as well right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listeners are for the caller threads not stream threads right? I thought since the thread is removed, it would not be counted in the getMinimumThreadVersion() and hence would not block the listeners to be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the listeners are for stream threads. They get added in the task manager. Once all threads are at the version the future blocking the calling thread is completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... just to make sure we are talking about version.activeTopologyUpdateListeners right? These listeners are for the calling thread of the removeNamedTopology / addNamedTopology / start, which would get the wraped futures these listeners are constructed on.

Anyways, my understanding is that when a thread is removed, the getMinimumThreadVersion returned version would not take that removed thread into consideration, so that even the removed thread's version is low it would not block the future being completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah @guozhangwang yeah the getMinimumThreadVersion should take care of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think this was already resolved but just to clarify for anyone else reading this/ourselves in the future, yes, the listeners are for the callers of add/removeNamedTopology 👍

topologyVersionListener.future.complete(null);
iterator.remove();
log.info("All threads are now on topology version {}", topologyVersionListener.topologyVersion);
}
}
} finally {
unlock();
}
}

private long getMinimumThreadVersion() {
return threadVersions.values().stream().min(Long::compare).get();
}

public void wakeupThreads() {
try {
lock();
Expand Down Expand Up @@ -224,9 +236,9 @@ public void registerAndBuildNewTopology(final KafkaFutureImpl<Void> future, fina
try {
lock();
buildAndVerifyTopology(newTopologyBuilder);
log.info("New NamedTopology passed validation and will be added {}, old topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get());
log.info("New NamedTopology {} passed validation and will be added, old topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get());
version.topologyVersion.incrementAndGet();
version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), future));
version.activeTopologyUpdateListeners.add(new TopologyVersionListener(topologyVersion(), future));
builders.put(newTopologyBuilder.topologyName(), newTopologyBuilder);
wakeupThreads();
log.info("Added NamedTopology {} and updated topology version to {}", newTopologyBuilder.topologyName(), version.topologyVersion.get());
Expand All @@ -247,7 +259,7 @@ public KafkaFuture<Void> unregisterTopology(final KafkaFutureImpl<Void> removeTo
lock();
log.info("Beginning removal of NamedTopology {}, old topology version is {}", topologyName, version.topologyVersion.get());
version.topologyVersion.incrementAndGet();
version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), removeTopologyFuture));
version.activeTopologyUpdateListeners.add(new TopologyVersionListener(topologyVersion(), removeTopologyFuture));
final InternalTopologyBuilder removedBuilder = builders.remove(topologyName);
removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove);
removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,38 +612,40 @@ public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTop
@Test
public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology() throws Exception {
CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
// Build up named topology with two stateful subtopologies
final KStream<String, Long> inputStream1 = topology1Builder.stream(INPUT_STREAM_1);
inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT);
inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
streams.start();
final NamedTopology namedTopology = topology1Builder.build();
streams.addNamedTopology(namedTopology).all().get();

assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology("topology-1", true).all().get();
streams.cleanUpNamedTopology("topology-1");

CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> {
try {
CLUSTER.deleteTopicAndWait(t);
} catch (final InterruptedException e) {
e.printStackTrace();
}
});
try {
// Build up named topology with two stateful subtopologies
final KStream<String, Long> inputStream1 = topology1Builder.stream(INPUT_STREAM_1);
inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT);
inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
streams.start();
final NamedTopology namedTopology = topology1Builder.build();
streams.addNamedTopology(namedTopology).all().get();

final KStream<String, Long> inputStream = topology1BuilderDup.stream(INPUT_STREAM_1);
inputStream.groupByKey().count().toStream().to(COUNT_OUTPUT);
inputStream.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology("topology-1", true).all().get();
streams.cleanUpNamedTopology("topology-1");

CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> {
try {
CLUSTER.deleteTopicAndWait(t);
} catch (final InterruptedException e) {
e.printStackTrace();
}
});

final NamedTopology namedTopologyDup = topology1BuilderDup.build();
streams.addNamedTopology(namedTopologyDup).all().get();
final KStream<String, Long> inputStream = topology1BuilderDup.stream(INPUT_STREAM_1);
inputStream.groupByKey().count().toStream().to(COUNT_OUTPUT);
inputStream.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);

assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
final NamedTopology namedTopologyDup = topology1BuilderDup.build();
streams.addNamedTopology(namedTopologyDup).all().get();

CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
}
}

@Test
Expand Down