diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 1527263af5cef..fbfd9e173a44b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -314,16 +314,17 @@ private void mergeDuplicateSourceNodes() { if (graphNode instanceof StreamSourceNode) { final StreamSourceNode currentSourceNode = (StreamSourceNode) graphNode; - if (currentSourceNode.topicPattern() != null) { - if (!patternsToSourceNodes.containsKey(currentSourceNode.topicPattern())) { - patternsToSourceNodes.put(currentSourceNode.topicPattern(), currentSourceNode); + if (currentSourceNode.topicPattern().isPresent()) { + final Pattern topicPattern = currentSourceNode.topicPattern().get(); + if (!patternsToSourceNodes.containsKey(topicPattern)) { + patternsToSourceNodes.put(topicPattern, currentSourceNode); } else { - final StreamSourceNode mainSourceNode = patternsToSourceNodes.get(currentSourceNode.topicPattern()); + final StreamSourceNode mainSourceNode = patternsToSourceNodes.get(topicPattern); mainSourceNode.merge(currentSourceNode); root.removeChild(graphNode); } } else { - for (final String topic : currentSourceNode.topicNames()) { + for (final String topic : currentSourceNode.topicNames().get()) { if (!topicsToSourceNodes.containsKey(topic)) { topicsToSourceNodes.put(topic, currentSourceNode); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java index affce1bc0e68d..05052279b5721 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.common.serialization.Serde; @@ -51,12 +52,12 @@ public SourceGraphNode(final String nodeName, this.consumedInternal = consumedInternal; } - public Set topicNames() { - return Collections.unmodifiableSet(topicNames); + public Optional> topicNames() { + return topicNames == null ? Optional.empty() : Optional.of(Collections.unmodifiableSet(topicNames)); } - public Pattern topicPattern() { - return topicPattern; + public Optional topicPattern() { + return Optional.ofNullable(topicPattern); } public ConsumedInternal consumedInternal() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java index d4adc894de5e0..e68a9c6039b4a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java @@ -63,8 +63,8 @@ public void merge(final StreamSourceNode other) { @Override public String toString() { return "StreamSourceNode{" + - "topicNames=" + topicNames() + - ", topicPattern=" + topicPattern() + + "topicNames=" + (topicNames().isPresent() ? topicNames().get() : null) + + ", topicPattern=" + (topicPattern().isPresent() ? topicPattern().get() : null) + ", consumedInternal=" + consumedInternal() + "} " + super.toString(); } @@ -72,21 +72,20 @@ public String toString() { @Override public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final Properties props) { - if (topicPattern() != null) { + if (topicPattern().isPresent()) { topologyBuilder.addSource(consumedInternal().offsetResetPolicy(), nodeName(), consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), consumedInternal().valueDeserializer(), - topicPattern()); + topicPattern().get()); } else { topologyBuilder.addSource(consumedInternal().offsetResetPolicy(), nodeName(), consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), consumedInternal().valueDeserializer(), - topicNames().toArray(new String[0])); - + topicNames().get().toArray(new String[0])); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index b708961073401..77ab7ad6b103a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals.graph; +import java.util.Iterator; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.KTableSource; @@ -83,7 +84,16 @@ public static TableSourceNodeBuilder tableSourceNodeBuilder() { @Override @SuppressWarnings("unchecked") public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final Properties props) { - final String topicName = topicNames().iterator().next(); + final String topicName; + if (topicNames().isPresent()) { + final Iterator topicNames = topicNames().get().iterator(); + topicName = topicNames.next(); + if (topicNames.hasNext()) { + throw new IllegalStateException("A table source node must have a single topic as input"); + } + } else { + throw new IllegalStateException("A table source node must have a single topic as input"); + } // TODO: we assume source KTables can only be timestamped-key-value stores for now. // should be expanded for other types of stores as well.