diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 88e599a92cd7..23769452d979 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -369,10 +369,15 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer streamsUncaughtExceptionHandler) { + final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); + final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx; + final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); + final String effectiveThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId; final String logPrefix = String.format("stream-thread [%s] ", threadId); final LogContext logContext = new LogContext(logPrefix); + final LogContext effectiveLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", effectiveThreadId)): logContext ; final Logger log = logContext.logger(StreamThread.class); final ReferenceContainer referenceContainer = new ReferenceContainer(); @@ -382,13 +387,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, referenceContainer.clientTags = config.getClientTags(); log.info("Creating restore consumer client"); - final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(threadId)); + final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(effectiveThreadId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( time, config, - logContext, + effectiveLogContext, adminClient, restoreConsumer, userStateRestoreListener, @@ -397,7 +402,6 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); - final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( topologyMetadata, @@ -475,7 +479,6 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, taskManager.setMainConsumer(mainConsumer); referenceContainer.mainConsumer = mainConsumer; - final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId); streamsMetrics.metricsRegistry().addReporter(reporter);