Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,17 @@ private void mergeDuplicateSourceNodes() {
if (graphNode instanceof StreamSourceNode) {
final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;

if (currentSourceNode.topicPattern() != null) {
if (!patternsToSourceNodes.containsKey(currentSourceNode.topicPattern())) {
patternsToSourceNodes.put(currentSourceNode.topicPattern(), currentSourceNode);
if (currentSourceNode.topicPattern().isPresent()) {
final Pattern topicPattern = currentSourceNode.topicPattern().get();
if (!patternsToSourceNodes.containsKey(topicPattern)) {
patternsToSourceNodes.put(topicPattern, currentSourceNode);
} else {
final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(currentSourceNode.topicPattern());
final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(topicPattern);
mainSourceNode.merge(currentSourceNode);
root.removeChild(graphNode);
}
} else {
for (final String topic : currentSourceNode.topicNames()) {
for (final String topic : currentSourceNode.topicNames().get()) {
if (!topicsToSourceNodes.containsKey(topic)) {
topicsToSourceNodes.put(topic, currentSourceNode);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -51,12 +52,12 @@ public SourceGraphNode(final String nodeName,
this.consumedInternal = consumedInternal;
}

public Set<String> topicNames() {
return Collections.unmodifiableSet(topicNames);
public Optional<Set<String>> topicNames() {
return topicNames == null ? Optional.empty() : Optional.of(Collections.unmodifiableSet(topicNames));
}

public Pattern topicPattern() {
return topicPattern;
public Optional<Pattern> topicPattern() {
return Optional.ofNullable(topicPattern);
}

public ConsumedInternal<K, V> consumedInternal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,29 @@ public void merge(final StreamSourceNode<?, ?> other) {
@Override
public String toString() {
return "StreamSourceNode{" +
"topicNames=" + topicNames() +
", topicPattern=" + topicPattern() +
"topicNames=" + (topicNames().isPresent() ? topicNames().get() : null) +
", topicPattern=" + (topicPattern().isPresent() ? topicPattern().get() : null) +
", consumedInternal=" + consumedInternal() +
"} " + super.toString();
}

@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final Properties props) {

if (topicPattern() != null) {
if (topicPattern().isPresent()) {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicPattern());
topicPattern().get());
} else {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicNames().toArray(new String[0]));

topicNames().get().toArray(new String[0]));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.streams.kstream.internals.graph;

import java.util.Iterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
Expand Down Expand Up @@ -83,7 +84,16 @@ public static <K, V> TableSourceNodeBuilder<K, V> tableSourceNodeBuilder() {
@Override
@SuppressWarnings("unchecked")
public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final Properties props) {
final String topicName = topicNames().iterator().next();
final String topicName;
if (topicNames().isPresent()) {
final Iterator<String> topicNames = topicNames().get().iterator();
topicName = topicNames.next();
if (topicNames.hasNext()) {
throw new IllegalStateException("A table source node must have a single topic as input");
}
} else {
throw new IllegalStateException("A table source node must have a single topic as input");
}

// TODO: we assume source KTables can only be timestamped-key-value stores for now.
// should be expanded for other types of stores as well.
Expand Down