diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt index d711d076a5..139fe1ae8c 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt @@ -136,9 +136,14 @@ class PipelineSubscriber( } } - val pipelineKafkaProperties = kafkaProperties.withAppId( - HashUtils.hashIfLong(metadata.name), - ) + val pipelineKafkaProperties = kafkaProperties + .withAppId( + HashUtils.hashIfLong(metadata.name), + ) + .withStreamThreads( + PipelineTopology.getNumThreadsFor(topology) + ) + val streamsApp = KafkaStreams(builder.build(), pipelineKafkaProperties) val pipelineTopology = PipelineTopology(metadata, topology, streamsApp) diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt index c0f4ff5526..12b81ae616 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt @@ -61,3 +61,12 @@ fun KafkaProperties.withAppId(name: String): KafkaProperties { return properties } + +fun KafkaProperties.withStreamThreads(n: Int): KafkaProperties { + val properties = KafkaProperties() + + properties.putAll(this.toMap()) + this[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = n + + return properties +} diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/PipelineTopology.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/PipelineTopology.kt index 3f7339e248..548d65e0f5 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/PipelineTopology.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/PipelineTopology.kt @@ -4,6 +4,9 @@ import io.klogging.noCoLogger import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.KafkaStreams.StateListener import java.util.concurrent.CountDownLatch +import kotlin.math.floor +import kotlin.math.log2 +import kotlin.math.max typealias PipelineId = String @@ -47,5 +50,11 @@ class PipelineTopology( companion object { private val logger = noCoLogger(PipelineTopology::class) + + fun getNumThreadsFor(pipelineSteps: List): Int { + val numSteps = pipelineSteps.size.toFloat() + val scale = floor(log2(numSteps)) + return max(1, scale.toInt()) + } } } \ No newline at end of file