diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 339792b4139f3..c4378b4a02663 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -430,20 +430,75 @@ The following configurations are optional: ### Consumer Caching It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor. -Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information: +Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool. + +The caching key is built up from the following information: + * Topic name * Topic partition * Group ID -The size of the cache is limited by spark.kafka.consumer.cache.capacity (default: 64). -If this threshold is reached, it tries to remove the least-used entry that is currently not in use. -If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to -the max number of concurrent tasks that can run in the executor (that is, number of tasks slots), -after which it will never reduce. +The following properties are available to configure the consumer pool: + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.kafka.consumer.cache.capacityThe maximum number of consumers cached. Please note that it's a soft limit.64
spark.kafka.consumer.cache.timeoutThe minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.5m (5 minutes)
spark.kafka.consumer.cache.evictorThreadRunIntervalThe interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.1m (1 minutes)
spark.kafka.consumer.cache.jmx.enableEnable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance. + The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool". + false
+ +The size of the pool is limited by spark.kafka.consumer.cache.capacity, +but it works as "soft-limit" to not block Spark tasks. + +Idle eviction thread periodically removes consumers which are not used longer than given timeout. +If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use. + +If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to +the max number of concurrent tasks that can run in the executor (that is, number of task slots). + +If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons. +At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used +in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well +when they are returned into pool. + +Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point +of Spark's view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool. +Note that it doesn't leverage Apache Commons Pool due to the difference of characteristics. + +The following properties are available to configure the fetched data pool: -If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons. -At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to -be emphasized it will not be closed if any other task is using it. + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.kafka.consumer.fetchedData.cache.timeoutThe minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.5m (5 minutes)
spark.kafka.consumer.fetchedData.cache.evictorThreadRunIntervalThe interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.1m (1 minutes)
## Writing Data to Kafka diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 827ceb89a0c34..5b8738263a60d 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -72,6 +72,11 @@ kafka-clients ${kafka.version} + + org.apache.commons + commons-pool2 + ${commons-pool2.version} + org.apache.kafka kafka_${scala.binary.version} @@ -125,6 +130,11 @@ org.apache.spark spark-tags_${scala.binary.version} + + org.jmock + jmock-junit4 + test + 3.8.1 + + 2.6.2 3.2.10 3.0.15 2.29