diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a406f0353ce97..985f9f8656624 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -881,17 +881,15 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) { + log.info("StreamThread has detected an update to the topology"); + taskManager.handleTopologyUpdates(); - log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); - if (topologyMetadata.isEmpty()) { - mainConsumer.unsubscribe(); - } - topologyMetadata.maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(getName()); topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); // We don't need to manually trigger a rebalance to pick up tasks from the new topology, as // a rebalance will always occur when the metadata is updated after a change in subscription + log.info("Updating consumer subscription following topology update"); subscribeConsumer(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 9cb2b9b2862ce..d0f4a460dc3e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1129,13 +1129,25 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon * added NamedTopology and create them if so, then close any tasks whose named topology no longer exists */ void handleTopologyUpdates() { - tasks.maybeCreateTasksFromNewTopologies(); + final Set currentNamedTopologies = topologyMetadata.updateThreadTopologyVersion(Thread.currentThread().getName()); + tasks.maybeCreateTasksFromNewTopologies(currentNamedTopologies); + maybeCloseTasksFromRemovedTopologies(currentNamedTopologies); + + if (topologyMetadata.isEmpty()) { + log.info("Proactively unsubscribing from all topics due to empty topology"); + mainConsumer.unsubscribe(); + } + + topologyMetadata.maybeNotifyTopologyVersionListeners(); + } + + void maybeCloseTasksFromRemovedTopologies(final Set currentNamedTopologies) { try { final Set activeTasksToRemove = new HashSet<>(); final Set standbyTasksToRemove = new HashSet<>(); for (final Task task : tasks.allTasks()) { - if (!topologyMetadata.namedTopologiesView().contains(task.id().topologyName())) { + if (!currentNamedTopologies.contains(task.id().topologyName())) { if (task.isActive()) { activeTasksToRemove.add(task); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 2740791f8f628..c4aec35d4e970 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -93,8 +93,7 @@ void handleNewAssignmentAndCreateTasks(final Map> ac createTasks(activeTasksToCreate, standbyTasksToCreate); } - void maybeCreateTasksFromNewTopologies() { - final Set currentNamedTopologies = topologyMetadata.namedTopologiesView(); + void maybeCreateTasksFromNewTopologies(final Set currentNamedTopologies) { createTasks( activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies), standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index 15e73aee09624..53940bfeb12a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -84,14 +84,14 @@ public static class TopologyVersion { public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version public ReentrantLock topologyLock = new ReentrantLock(); public Condition topologyCV = topologyLock.newCondition(); - public List activeTopologyWaiters = new LinkedList<>(); + public List activeTopologyUpdateListeners = new LinkedList<>(); } - public static class TopologyVersionWaiters { + public static class TopologyVersionListener { final long topologyVersion; // the (minimum) version to wait for these threads to cross final KafkaFutureImpl future; // the future waiting on all threads to be updated - public TopologyVersionWaiters(final long topologyVersion, final KafkaFutureImpl future) { + public TopologyVersionListener(final long topologyVersion, final KafkaFutureImpl future) { this.topologyVersion = topologyVersion; this.future = future; } @@ -161,28 +161,36 @@ public void registerThread(final String threadName) { public void unregisterThread(final String threadName) { threadVersions.remove(threadName); - maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName); + maybeNotifyTopologyVersionListeners(); } public TaskExecutionMetadata taskExecutionMetadata() { return taskExecutionMetadata; } - public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) { + public Set updateThreadTopologyVersion(final String threadName) { try { - lock(); - final Iterator iterator = version.activeTopologyWaiters.listIterator(); - TopologyVersionWaiters topologyVersionWaiters; + version.topologyLock.lock(); threadVersions.put(threadName, topologyVersion()); + return namedTopologiesView(); + } finally { + version.topologyLock.unlock(); + } + } + + public void maybeNotifyTopologyVersionListeners() { + try { + lock(); + final long minThreadVersion = getMinimumThreadVersion(); + final Iterator iterator = version.activeTopologyUpdateListeners.listIterator(); + TopologyVersionListener topologyVersionListener; while (iterator.hasNext()) { - topologyVersionWaiters = iterator.next(); - final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion; - if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) { - if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) { - topologyVersionWaiters.future.complete(null); - iterator.remove(); - log.info("All threads are now on topology version {}", topologyVersionWaiters.topologyVersion); - } + topologyVersionListener = iterator.next(); + final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion; + if (minThreadVersion >= topologyVersionWaitersVersion) { + topologyVersionListener.future.complete(null); + iterator.remove(); + log.info("All threads are now on topology version {}", topologyVersionListener.topologyVersion); } } } finally { @@ -190,6 +198,10 @@ public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(fin } } + private long getMinimumThreadVersion() { + return threadVersions.values().stream().min(Long::compare).get(); + } + public void wakeupThreads() { try { lock(); @@ -224,9 +236,9 @@ public void registerAndBuildNewTopology(final KafkaFutureImpl future, fina try { lock(); buildAndVerifyTopology(newTopologyBuilder); - log.info("New NamedTopology passed validation and will be added {}, old topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); + log.info("New NamedTopology {} passed validation and will be added, old topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); version.topologyVersion.incrementAndGet(); - version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), future)); + version.activeTopologyUpdateListeners.add(new TopologyVersionListener(topologyVersion(), future)); builders.put(newTopologyBuilder.topologyName(), newTopologyBuilder); wakeupThreads(); log.info("Added NamedTopology {} and updated topology version to {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); @@ -247,7 +259,7 @@ public KafkaFuture unregisterTopology(final KafkaFutureImpl removeTo lock(); log.info("Beginning removal of NamedTopology {}, old topology version is {}", topologyName, version.topologyVersion.get()); version.topologyVersion.incrementAndGet(); - version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), removeTopologyFuture)); + version.activeTopologyUpdateListeners.add(new TopologyVersionListener(topologyVersion(), removeTopologyFuture)); final InternalTopologyBuilder removedBuilder = builders.remove(topologyName); removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove); removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 920c08c6a9bc6..8625965467b1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -612,38 +612,40 @@ public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTop @Test public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology() throws Exception { CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT); - // Build up named topology with two stateful subtopologies - final KStream inputStream1 = topology1Builder.stream(INPUT_STREAM_1); - inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT); - inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); - streams.start(); - final NamedTopology namedTopology = topology1Builder.build(); - streams.addNamedTopology(namedTopology).all().get(); - - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); - streams.removeNamedTopology("topology-1", true).all().get(); - streams.cleanUpNamedTopology("topology-1"); - - CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> { - try { - CLUSTER.deleteTopicAndWait(t); - } catch (final InterruptedException e) { - e.printStackTrace(); - } - }); + try { + // Build up named topology with two stateful subtopologies + final KStream inputStream1 = topology1Builder.stream(INPUT_STREAM_1); + inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT); + inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); + streams.start(); + final NamedTopology namedTopology = topology1Builder.build(); + streams.addNamedTopology(namedTopology).all().get(); - final KStream inputStream = topology1BuilderDup.stream(INPUT_STREAM_1); - inputStream.groupByKey().count().toStream().to(COUNT_OUTPUT); - inputStream.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); + streams.removeNamedTopology("topology-1", true).all().get(); + streams.cleanUpNamedTopology("topology-1"); + + CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> { + try { + CLUSTER.deleteTopicAndWait(t); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + }); - final NamedTopology namedTopologyDup = topology1BuilderDup.build(); - streams.addNamedTopology(namedTopologyDup).all().get(); + final KStream inputStream = topology1BuilderDup.stream(INPUT_STREAM_1); + inputStream.groupByKey().count().toStream().to(COUNT_OUTPUT); + inputStream.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); + final NamedTopology namedTopologyDup = topology1BuilderDup.build(); + streams.addNamedTopology(namedTopologyDup).all().get(); - CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); + } finally { + CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); + } } @Test