Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor).java"/>

<suppress checks="MethodLength"
files="KTableImpl.java"/>
Expand Down
92 changes: 39 additions & 53 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
Expand All @@ -54,8 +53,7 @@
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
Expand Down Expand Up @@ -159,15 +157,13 @@ public class KafkaStreams implements AutoCloseable {
private final ScheduledExecutorService rocksDBMetricsRecordingService;
private final Admin adminClient;
private final StreamsMetricsImpl streamsMetrics;
private final ProcessorTopology taskTopology;
private final ProcessorTopology globalTaskTopology;
private final long totalCacheSize;
private final StreamStateListener streamStateListener;
private final StateRestoreListener delegatingStateRestoreListener;
private final Map<Long, StreamThread.State> threadState;
private final UUID processId;
private final KafkaClientSupplier clientSupplier;
private final InternalTopologyBuilder internalTopologyBuilder;
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;

GlobalStreamThread globalStreamThread;
Expand Down Expand Up @@ -690,7 +686,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
*/
public KafkaStreams(final Topology topology,
final Properties props) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
this(topology, new StreamsConfig(props), new DefaultKafkaClientSupplier());
}

/**
Expand All @@ -708,7 +704,7 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
this(topology, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
}

/**
Expand All @@ -725,7 +721,7 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final Properties props,
final Time time) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(), time);
this(topology, new StreamsConfig(props), new DefaultKafkaClientSupplier(), time);
}

/**
Expand All @@ -745,7 +741,7 @@ public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier,
final Time time) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, time);
this(topology, new StreamsConfig(props), clientSupplier, time);
}

/**
Expand Down Expand Up @@ -778,7 +774,7 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) {
this(topology.internalTopologyBuilder, config, clientSupplier);
this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier);
}

/**
Expand All @@ -795,41 +791,41 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final StreamsConfig config,
final Time time) {
this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time);
this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, new DefaultKafkaClientSupplier(), time);
}

private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
private KafkaStreams(final Topology topology,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier, time);
}

protected KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this(topologyMetadata, config, clientSupplier, Time.SYSTEM);
}

private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
private KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this.config = config;
this.time = time;

this.internalTopologyBuilder = internalTopologyBuilder;
internalTopologyBuilder.rewriteTopology(config);
this.topologyMetadata = topologyMetadata;
this.topologyMetadata.buildAndRewriteTopology();

// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
taskTopology = internalTopologyBuilder.buildTopology();
globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();

final boolean hasGlobalTopology = globalTaskTopology != null;
final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
final boolean hasGlobalTopology = topologyMetadata.hasGlobalTopology();

try {
stateDirectory = new StateDirectory(config, time, hasPersistentStores, internalTopologyBuilder.hasNamedTopologies());
stateDirectory = new StateDirectory(config, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies());
processId = stateDirectory.initializeProcessId();
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}


// The application ID is a required config and hence should always have value
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
Expand Down Expand Up @@ -859,27 +855,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
ClientMetrics.addVersionMetric(streamsMetrics);
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString());
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads());

streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
this.topologyMetadata,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));

oldHandler = false;
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
delegatingStateRestoreListener = new DelegatingStateRestoreListener();

totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
final int numStreamThreads = getNumStreamThreads(hasGlobalTopology);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(config);
final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);

GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(
globalTaskTopology,
topologyMetadata.globalTaskTopology(),
config,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
Expand All @@ -897,7 +893,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
threadState = new HashMap<>(numStreamThreads);
streamStateListener = new StreamStateListener(threadState, globalThreadState);

final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(this.topologyMetadata.globalStateStores());

if (hasGlobalTopology) {
globalStreamThread.setStateListener(streamStateListener);
Expand All @@ -914,7 +910,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,

private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
final StreamThread streamThread = StreamThread.create(
internalTopologyBuilder,
topologyMetadata,
config,
clientSupplier,
adminClient,
Expand Down Expand Up @@ -953,23 +949,6 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
return new Metrics(metricConfig, reporters, time, metricsContext);
}

private int getNumStreamThreads(final boolean hasGlobalTopology) {
final int numStreamThreads;
if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
log.info("Overriding number of StreamThreads to zero for global-only topology");
numStreamThreads = 0;
} else {
numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
}

if (numStreamThreads == 0 && !hasGlobalTopology) {
log.error("Topology with no input topics will create no stream threads and no global thread.");
throw new TopologyException("Topology has no stream threads and no global threads, " +
"must subscribe to at least one source topic or global table.");
}
return numStreamThreads;
}

/**
* Adds and starts a stream thread in addition to the stream threads that are already running in this
* Kafka Streams client.
Expand Down Expand Up @@ -1195,7 +1174,7 @@ private long getCacheSizePerThread(final int numStreamThreads) {
if (numStreamThreads == 0) {
return totalCacheSize;
}
return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
}

private void resizeThreadCache(final long cacheSizePerThread) {
Expand Down Expand Up @@ -1284,6 +1263,14 @@ public synchronized void start() throws IllegalStateException, StreamsException
} else {
throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
}

if (topologyMetadata.isEmpty()) {
if (setState(State.RUNNING)) {
log.debug("Transitioning directly to RUNNING for app with no named topologies");
} else {
throw new IllegalStateException("Unexpected error in transitioning KafkaStreams with empty processing topology to RUNNING");
}
}
}

/**
Expand Down Expand Up @@ -1575,8 +1562,7 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
validateIsRunningOrRebalancing();
final String storeName = storeQueryParameters.storeName();
if ((taskTopology == null || !taskTopology.hasStore(storeName))
&& (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
if (!topologyMetadata.hasStore(storeName)) {
throw new UnknownStateStoreException(
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TaskId implements Comparable<TaskId> {

private static final Logger LOG = LoggerFactory.getLogger(TaskId.class);

public static final String NAMED_TOPOLOGY_DELIMITER = "__";

/** The ID of the subtopology, aka topicGroupId. */
@Deprecated
public final int topicGroupId;
Expand Down Expand Up @@ -80,30 +82,30 @@ public String namedTopology() {

@Override
public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
return namedTopology != null ? namedTopology + NAMED_TOPOLOGY_DELIMITER + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
}

/**
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
public static TaskId parse(final String taskIdStr) {
final int firstIndex = taskIdStr.indexOf('_');
final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1);
if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}

try {
// If only one copy of '_' exists, there is no named topology in the string
if (secondIndex < 0) {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, firstIndex));
final int partition = Integer.parseInt(taskIdStr.substring(firstIndex + 1));
final int namedTopologyDelimiterIndex = taskIdStr.indexOf(NAMED_TOPOLOGY_DELIMITER);
// If there is no copy of the NamedTopology delimiter, this task has no named topology and only one `_` char
if (namedTopologyDelimiterIndex < 0) {
final int index = taskIdStr.indexOf('_');

final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
final int partition = Integer.parseInt(taskIdStr.substring(index + 1));

return new TaskId(topicGroupId, partition);
} else {
final String namedTopology = taskIdStr.substring(0, firstIndex);
final int topicGroupId = Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex));
final int partition = Integer.parseInt(taskIdStr.substring(secondIndex + 1));
final int topicGroupIdIndex = namedTopologyDelimiterIndex + 2;
final int subtopologyPartitionDelimiterIndex = taskIdStr.indexOf('_', topicGroupIdIndex);

final String namedTopology = taskIdStr.substring(0, namedTopologyDelimiterIndex);
final int topicGroupId = Integer.parseInt(taskIdStr.substring(topicGroupIdIndex, subtopologyPartitionDelimiterIndex));
final int partition = Integer.parseInt(taskIdStr.substring(subtopologyPartitionDelimiterIndex + 1));

return new TaskId(topicGroupId, partition, namedTopology);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_V2;

class ActiveTaskCreator {
private final InternalTopologyBuilder builder;
private final TopologyMetadata topologyMetadata;
private final StreamsConfig config;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
Expand All @@ -64,7 +64,7 @@ class ActiveTaskCreator {
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;

ActiveTaskCreator(final InternalTopologyBuilder builder,
ActiveTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
Expand All @@ -75,7 +75,7 @@ class ActiveTaskCreator {
final String threadId,
final UUID processId,
final Logger log) {
this.builder = builder;
this.topologyMetadata = topologyMetadata;
this.config = config;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
Expand Down Expand Up @@ -143,7 +143,7 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,

final LogContext logContext = getLogContext(taskId);

final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology());
final ProcessorTopology topology = topologyMetadata.buildSubtopology(taskId);

final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Expand Down Expand Up @@ -194,7 +194,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
inputPartitions,
consumer,
logContext,
builder.buildSubtopology(standbyTask.id.subtopology()),
topologyMetadata.buildSubtopology(standbyTask.id),
stateManager,
context
);
Expand Down
Loading