Skip to content

Commit

Permalink
Scale KStream threads with pipeline steps (#406)
Browse files Browse the repository at this point in the history
* Add extension method to set number of KStream threads in pipeline config

* Add method to calc num KStream threads from pipeline size

* Set number of KStream threads per pipeline
  • Loading branch information
agrski authored Aug 21, 2022
1 parent 0103393 commit d3a6df7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,14 @@ class PipelineSubscriber(
}
}

val pipelineKafkaProperties = kafkaProperties.withAppId(
HashUtils.hashIfLong(metadata.name),
)
val pipelineKafkaProperties = kafkaProperties
.withAppId(
HashUtils.hashIfLong(metadata.name),
)
.withStreamThreads(
PipelineTopology.getNumThreadsFor(topology)
)

val streamsApp = KafkaStreams(builder.build(), pipelineKafkaProperties)

val pipelineTopology = PipelineTopology(metadata, topology, streamsApp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ fun KafkaProperties.withAppId(name: String): KafkaProperties {

return properties
}

fun KafkaProperties.withStreamThreads(n: Int): KafkaProperties {
val properties = KafkaProperties()

properties.putAll(this.toMap())
this[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = n

return properties
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import io.klogging.noCoLogger
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.KafkaStreams.StateListener
import java.util.concurrent.CountDownLatch
import kotlin.math.floor
import kotlin.math.log2
import kotlin.math.max

typealias PipelineId = String

Expand Down Expand Up @@ -47,5 +50,11 @@ class PipelineTopology(

companion object {
private val logger = noCoLogger(PipelineTopology::class)

fun getNumThreadsFor(pipelineSteps: List<PipelineStep>): Int {
val numSteps = pipelineSteps.size.toFloat()
val scale = floor(log2(numSteps))
return max(1, scale.toInt())
}
}
}

0 comments on commit d3a6df7

Please sign in to comment.