Skip to content

Commit 59cf4a6

Browse files
authored
Merge pull request #47863 from ozangunalp/kafka_streams_topics_check
Extract Kafka Streams topics to check from the topology
2 parents d17c7e8 + 54dfb1e commit 59cf4a6

File tree

11 files changed

+348
-142
lines changed

11 files changed

+348
-142
lines changed

extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java

Lines changed: 13 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,13 @@
55

66
import java.net.InetSocketAddress;
77
import java.time.Duration;
8-
import java.util.Collection;
98
import java.util.Collections;
10-
import java.util.HashSet;
119
import java.util.List;
1210
import java.util.Map;
1311
import java.util.Objects;
1412
import java.util.Optional;
1513
import java.util.Properties;
16-
import java.util.Set;
17-
import java.util.concurrent.ExecutionException;
1814
import java.util.concurrent.ExecutorService;
19-
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.TimeoutException;
2115
import java.util.function.Function;
2216
import java.util.stream.Collectors;
2317

@@ -31,7 +25,6 @@
3125
import org.apache.kafka.clients.CommonClientConfigs;
3226
import org.apache.kafka.clients.admin.Admin;
3327
import org.apache.kafka.clients.admin.AdminClientConfig;
34-
import org.apache.kafka.clients.admin.ListTopicsResult;
3528
import org.apache.kafka.common.config.SaslConfigs;
3629
import org.apache.kafka.common.config.SslConfigs;
3730
import org.apache.kafka.streams.KafkaClientSupplier;
@@ -60,15 +53,12 @@
6053
public class KafkaStreamsProducer {
6154

6255
private static final Logger LOGGER = Logger.getLogger(KafkaStreamsProducer.class.getName());
63-
private static volatile boolean shutdown = false;
6456

6557
private final ExecutorService executorService;
6658
private final StreamsConfig streamsConfig;
6759
private final KafkaStreams kafkaStreams;
68-
private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
60+
private final KafkaStreamsTopologyManager topologyManager;
6961
private final Admin kafkaAdminClient;
70-
private final Duration topicsTimeout;
71-
private final List<String> trimmedTopics;
7262

7363
@Inject
7464
public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig runtimeConfig,
@@ -77,17 +67,14 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
7767
@Identifier("default-kafka-broker") Instance<Map<String, Object>> defaultConfiguration,
7868
Instance<StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener,
7969
Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener) {
80-
shutdown = false;
8170
// No producer for Topology -> nothing to do
8271
if (topology.isUnsatisfied()) {
8372
LOGGER.warn("No Topology producer; Kafka Streams will not be started");
8473
this.executorService = null;
8574
this.streamsConfig = null;
8675
this.kafkaStreams = null;
87-
this.kafkaStreamsTopologyManager = null;
76+
this.topologyManager = null;
8877
this.kafkaAdminClient = null;
89-
this.topicsTimeout = null;
90-
this.trimmedTopics = null;
9178
return;
9279
}
9380

@@ -108,32 +95,23 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
10895
this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties));
10996

11097
this.executorService = executorService;
111-
112-
this.topicsTimeout = runtimeConfig.topicsTimeout();
113-
this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList();
11498
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
11599
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
116100
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener);
117-
this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient);
118-
}
119-
120-
private boolean isTopicsCheckEnabled() {
121-
return topicsTimeout.compareTo(Duration.ZERO) > 0;
101+
this.topologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient, topology.get(), runtimeConfig);
122102
}
123103

124104
public void onStartup(@Observes StartupEvent event, Event<KafkaStreams> kafkaStreamsEvent) {
125105
if (kafkaStreams != null) {
126106
kafkaStreamsEvent.fire(kafkaStreams);
127107
executorService.execute(() -> {
128-
if (isTopicsCheckEnabled()) {
129-
try {
130-
waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout);
131-
} catch (InterruptedException e) {
132-
Thread.currentThread().interrupt();
133-
return;
134-
}
108+
try {
109+
topologyManager.waitForTopicsToBeCreated();
110+
} catch (InterruptedException e) {
111+
Thread.currentThread().interrupt();
112+
return;
135113
}
136-
if (!shutdown) {
114+
if (!topologyManager.isClosed()) {
137115
LOGGER.debug("Starting Kafka Streams pipeline");
138116
kafkaStreams.start();
139117
}
@@ -159,11 +137,13 @@ public StreamsConfig getStreamsConfig() {
159137
@Singleton
160138
@Unremovable
161139
public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
162-
return kafkaStreamsTopologyManager;
140+
return topologyManager;
163141
}
164142

165143
void onStop(@Observes ShutdownEvent event) {
166-
shutdown = true;
144+
if (topologyManager != null) {
145+
topologyManager.close();
146+
}
167147
if (kafkaStreams != null) {
168148
LOGGER.debug("Stopping Kafka Streams pipeline");
169149
kafkaStreams.close();
@@ -329,37 +309,6 @@ private static String toHostPort(InetSocketAddress inetSocketAddress) {
329309
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
330310
}
331311

332-
private static void waitForTopicsToBeCreated(Admin adminClient, Collection<String> topicsToAwait, Duration timeout)
333-
throws InterruptedException {
334-
Set<String> lastMissingTopics = null;
335-
while (!shutdown) {
336-
try {
337-
ListTopicsResult topics = adminClient.listTopics();
338-
Set<String> existingTopics = topics.names().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
339-
340-
if (existingTopics.containsAll(topicsToAwait)) {
341-
LOGGER.debug("All expected topics created: " + topicsToAwait);
342-
return;
343-
} else {
344-
Set<String> missingTopics = new HashSet<>(topicsToAwait);
345-
missingTopics.removeAll(existingTopics);
346-
347-
// Do not spam warnings - topics may take time to be created by an operator like Strimzi
348-
if (missingTopics.equals(lastMissingTopics)) {
349-
LOGGER.debug("Waiting for topic(s) to be created: " + missingTopics);
350-
} else {
351-
LOGGER.warn("Waiting for topic(s) to be created: " + missingTopics);
352-
lastMissingTopics = missingTopics;
353-
}
354-
}
355-
} catch (ExecutionException | TimeoutException e) {
356-
LOGGER.error("Failed to get topic names from broker", e);
357-
} finally {
358-
Thread.sleep(1_000);
359-
}
360-
}
361-
}
362-
363312
private static Properties getAdminClientConfig(Properties properties) {
364313
Properties adminClientConfig = new Properties(properties);
365314
// include TLS config name if it has been configured

extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.time.Duration;
55
import java.util.List;
66
import java.util.Optional;
7-
import java.util.stream.Collectors;
87

98
import io.quarkus.runtime.annotations.ConfigPhase;
109
import io.quarkus.runtime.annotations.ConfigRoot;
@@ -47,6 +46,13 @@ public interface KafkaStreamsRuntimeConfig {
4746
*/
4847
Optional<List<String>> topics();
4948

49+
/**
50+
* A comma-separated list of topic name patterns.
51+
* The pipeline will only be started once all these topics are present in the Kafka cluster
52+
* and {@code ignore.topics} is set to false.
53+
*/
54+
Optional<List<String>> topicPatterns();
55+
5056
/**
5157
* Timeout to wait for topic names to be returned from admin client.
5258
* If set to 0 (or negative), {@code topics} check is ignored.
@@ -88,8 +94,4 @@ public interface KafkaStreamsRuntimeConfig {
8894
*/
8995
SslConfig ssl();
9096

91-
default List<String> getTrimmedTopics() {
92-
return topics().orElseThrow(() -> new IllegalArgumentException("Missing list of topics"))
93-
.stream().map(String::trim).collect(Collectors.toList());
94-
}
9597
}
Lines changed: 132 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,168 @@
11
package io.quarkus.kafka.streams.runtime;
22

33
import java.time.Duration;
4-
import java.util.Collection;
4+
import java.util.ArrayList;
55
import java.util.Collections;
6+
import java.util.HashSet;
67
import java.util.LinkedHashSet;
8+
import java.util.List;
79
import java.util.Set;
810
import java.util.concurrent.ExecutionException;
911
import java.util.concurrent.TimeUnit;
1012
import java.util.concurrent.TimeoutException;
13+
import java.util.regex.Pattern;
1114

1215
import org.apache.kafka.clients.admin.Admin;
1316
import org.apache.kafka.clients.admin.ListTopicsResult;
17+
import org.apache.kafka.streams.Topology;
18+
import org.apache.kafka.streams.TopologyDescription;
1419
import org.jboss.logging.Logger;
1520

1621
public class KafkaStreamsTopologyManager {
1722

1823
private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopologyManager.class.getName());
1924

2025
private final Admin adminClient;
26+
private final List<String> sourceTopics;
27+
private final List<Pattern> sourcePatterns;
28+
private final Duration topicsTimeout;
2129

22-
public KafkaStreamsTopologyManager(Admin adminClient) {
30+
private volatile boolean closed = false;
31+
32+
public KafkaStreamsTopologyManager(Admin adminClient, Topology topology, KafkaStreamsRuntimeConfig runtimeConfig) {
2333
this.adminClient = adminClient;
34+
this.topicsTimeout = runtimeConfig.topicsTimeout();
35+
if (isTopicsCheckEnabled()) {
36+
if (runtimeConfig.topics().isEmpty() && runtimeConfig.topicPatterns().isEmpty()) {
37+
Set<String> topics = new HashSet<>();
38+
Set<Pattern> patterns = new HashSet<>();
39+
extractSources(topology, topics, patterns);
40+
this.sourceTopics = new ArrayList<>(topics);
41+
this.sourcePatterns = new ArrayList<>(patterns);
42+
LOGGER.infof("Kafka Streams will wait for topics: %s and topics matching patterns: %s to be created",
43+
sourceTopics, sourcePatterns);
44+
} else {
45+
this.sourceTopics = runtimeConfig.topics().orElse(Collections.emptyList());
46+
this.sourcePatterns = runtimeConfig.topicPatterns().orElse(Collections.emptyList()).stream()
47+
.map(Pattern::compile)
48+
.toList();
49+
}
50+
if (sourceTopics.isEmpty() && sourcePatterns.isEmpty()) {
51+
throw new IllegalArgumentException(
52+
"No topics or topic patterns specified; cannot wait for topics to be created, " +
53+
"in order to disable topics creation check set `quarkus.kafka-streams.topics-check-timeout=0`");
54+
}
55+
} else {
56+
LOGGER.infof("Kafka Streams will not wait for topics to be created");
57+
this.sourceTopics = Collections.emptyList();
58+
this.sourcePatterns = Collections.emptyList();
59+
}
60+
}
61+
62+
public boolean isTopicsCheckEnabled() {
63+
return topicsTimeout.compareTo(Duration.ZERO) > 0;
64+
}
65+
66+
void close() {
67+
this.closed = true;
68+
}
69+
70+
public boolean isClosed() {
71+
return closed;
72+
}
73+
74+
// visible for testing
75+
public static void extractSources(Topology topo, Set<String> topics, Set<Pattern> patterns) {
76+
Set<String> sinkTopics = new HashSet<>();
77+
TopologyDescription topologyDescription = topo.describe();
78+
for (TopologyDescription.GlobalStore globalStore : topologyDescription.globalStores()) {
79+
TopologyDescription.Source source = globalStore.source();
80+
if (source.topicPattern() != null) {
81+
patterns.add(source.topicPattern());
82+
}
83+
if (source.topicSet() != null) {
84+
topics.addAll(source.topicSet());
85+
}
86+
}
87+
for (TopologyDescription.Subtopology subtopology : topologyDescription.subtopologies()) {
88+
for (TopologyDescription.Node node : subtopology.nodes()) {
89+
if (node instanceof TopologyDescription.Sink sink) {
90+
if (sink.topic() != null) {
91+
sinkTopics.add(sink.topic());
92+
}
93+
} else if (node instanceof TopologyDescription.Source source) {
94+
if (source.topicPattern() != null) {
95+
patterns.add(source.topicPattern());
96+
}
97+
if (source.topicSet() != null) {
98+
topics.addAll(source.topicSet());
99+
}
100+
}
101+
}
102+
}
103+
// remove topics used both in sinks ans sources
104+
topics.removeAll(sinkTopics);
105+
}
106+
107+
public List<String> getSourceTopics() {
108+
return sourceTopics;
24109
}
25110

26-
public Set<String> getMissingTopics(Collection<String> topicsToCheck) throws InterruptedException {
27-
return getMissingTopics(topicsToCheck, Duration.ofSeconds(10)); // keep defaults
111+
public List<Pattern> getSourcePatterns() {
112+
return sourcePatterns;
28113
}
29114

30-
public Set<String> getMissingTopics(Collection<String> topicsToCheck, Duration timeout) throws InterruptedException {
31-
Set<String> missing = new LinkedHashSet<>(topicsToCheck);
115+
public Set<String> getMissingTopics() throws InterruptedException {
116+
if (!isTopicsCheckEnabled()) {
117+
return Collections.emptySet();
118+
}
119+
120+
Set<String> missing = new LinkedHashSet<>(sourceTopics);
32121

33122
try {
34123
ListTopicsResult topics = adminClient.listTopics();
35-
Set<String> topicNames = topics.names().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
124+
Set<String> existingTopics = topics.names().get(topicsTimeout.toMillis(), TimeUnit.MILLISECONDS);
36125

37-
if (topicNames.containsAll(topicsToCheck)) {
38-
return Collections.emptySet();
39-
} else {
40-
missing.removeAll(topicNames);
41-
}
126+
missing.removeAll(existingTopics);
127+
missing.addAll(sourcePatterns.stream()
128+
.filter(p -> existingTopics.stream().noneMatch(p.asPredicate()))
129+
.map(Pattern::pattern)
130+
.toList());
42131
} catch (ExecutionException | TimeoutException e) {
43132
LOGGER.error("Failed to get topic names from broker", e);
44133
}
45134

46135
return missing;
47136
}
137+
138+
public void waitForTopicsToBeCreated() throws InterruptedException {
139+
Set<String> lastMissingTopics = null;
140+
while (!closed) {
141+
try {
142+
ListTopicsResult topics = adminClient.listTopics();
143+
Set<String> existingTopics = topics.names().get(topicsTimeout.toMillis(), TimeUnit.MILLISECONDS);
144+
145+
if (existingTopics.containsAll(sourceTopics)
146+
&& sourcePatterns.stream().allMatch(p -> existingTopics.stream().anyMatch(p.asPredicate()))) {
147+
LOGGER.debugf("All expected topics %s and topics matching patterns %s ", sourceTopics, sourcePatterns);
148+
return;
149+
} else {
150+
Set<String> missingTopics = new HashSet<>(sourceTopics);
151+
missingTopics.removeAll(existingTopics);
152+
153+
// Do not spam warnings - topics may take time to be created by an operator like Strimzi
154+
if (missingTopics.equals(lastMissingTopics)) {
155+
LOGGER.debug("Waiting for topic(s) to be created: " + missingTopics);
156+
} else {
157+
LOGGER.warn("Waiting for topic(s) to be created: " + missingTopics);
158+
lastMissingTopics = missingTopics;
159+
}
160+
}
161+
} catch (ExecutionException | TimeoutException e) {
162+
LOGGER.error("Failed to get topic names from broker", e);
163+
} finally {
164+
Thread.sleep(1_000);
165+
}
166+
}
167+
}
48168
}

0 commit comments

Comments
 (0)