Skip to content

Commit 3641c3d

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer
### What changes were proposed in this pull request? Kafka producers are now closed when `spark.kafka.producer.cache.timeout` reached which could be significant problem when processing big SQL queries. The workaround was to increase `spark.kafka.producer.cache.timeout` to a number where the biggest SQL query can be finished. In this PR I've adapted similar solution which already exists on the consumer side, namely applies Apache Commons Pool on the producer side as well. Main advantages choosing this solution: * Producers are not closed until they're in use * No manual reference counting needed (which may be error prone) * Thread-safe by design * Provides jmx connection to the pool where metrics can be fetched What this PR contains: * Introduced producer side parameters to configure pool * Renamed `InternalKafkaConsumerPool` to `InternalKafkaConnectorPool` and made it abstract * Created 2 implementations from it: `InternalKafkaConsumerPool` and `InternalKafkaProducerPool` * Adapted `CachedKafkaProducer` to use `InternalKafkaProducerPool` * Changed `KafkaDataWriter` and `KafkaDataWriteTask` to release producer even in failure scenario * Added several new tests * Extended `KafkaTest` to clear not only producers but consumers as well * Renamed `InternalKafkaConsumerPoolSuite` to `InternalKafkaConnectorPoolSuite` where only consumer tests are checking the behavior (please see comment for reasoning) What this PR not yet contains(but intended when the main concept is stable): * User facing documentation ### Why are the changes needed? Kafka producer closed after 10 minutes (with default settings). ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing + additional unit tests. Cluster tests being started. Closes #25853 from gaborgsomogyi/SPARK-21869. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 4ec04e5 commit 3641c3d

File tree

13 files changed

+554
-332
lines changed

13 files changed

+554
-332
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 48 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,111 +18,89 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21-
import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
21+
import java.io.Closeable
2222

23-
import com.google.common.cache._
24-
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
25-
import org.apache.kafka.clients.producer.KafkaProducer
2623
import scala.collection.JavaConverters._
2724
import scala.util.control.NonFatal
2825

26+
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord}
27+
2928
import org.apache.spark.SparkEnv
3029
import org.apache.spark.internal.Logging
3130
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
31+
import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._
32+
import org.apache.spark.util.ShutdownHookManager
3233

33-
private[kafka010] object CachedKafkaProducer extends Logging {
34+
private[kafka010] class CachedKafkaProducer(val kafkaParams: ju.Map[String, Object])
35+
extends Closeable with Logging {
3436

3537
private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
3638

37-
private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
38-
39-
private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get)
40-
.map(_.conf.get(PRODUCER_CACHE_TIMEOUT))
41-
.getOrElse(defaultCacheExpireTimeout)
39+
private val producer = createProducer()
4240

43-
private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
44-
override def load(config: Seq[(String, Object)]): Producer = {
45-
createKafkaProducer(config)
41+
private def createProducer(): Producer = {
42+
val producer: Producer = new Producer(kafkaParams)
43+
if (log.isDebugEnabled()) {
44+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
45+
logDebug(s"Created a new instance of kafka producer for $redactedParamsSeq.")
4646
}
47+
producer
4748
}
4849

49-
private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
50-
override def onRemoval(
51-
notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
52-
val paramsSeq: Seq[(String, Object)] = notification.getKey
53-
val producer: Producer = notification.getValue
54-
if (log.isDebugEnabled()) {
55-
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
56-
logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " +
57-
s"due to ${notification.getCause}")
50+
override def close(): Unit = {
51+
try {
52+
if (log.isInfoEnabled()) {
53+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
54+
logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.")
5855
}
59-
close(paramsSeq, producer)
56+
producer.close()
57+
} catch {
58+
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
6059
}
6160
}
6261

63-
private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] =
64-
CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
65-
.removalListener(removalListener)
66-
.build[Seq[(String, Object)], Producer](cacheLoader)
62+
def send(record: ProducerRecord[Array[Byte], Array[Byte]], callback: Callback): Unit = {
63+
producer.send(record, callback)
64+
}
6765

68-
private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
69-
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
70-
if (log.isDebugEnabled()) {
71-
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
72-
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
66+
def flush(): Unit = {
67+
producer.flush()
68+
}
69+
}
70+
71+
private[kafka010] object CachedKafkaProducer extends Logging {
72+
73+
private val sparkConf = SparkEnv.get.conf
74+
private val producerPool = new InternalKafkaProducerPool(sparkConf)
75+
76+
ShutdownHookManager.addShutdownHook { () =>
77+
try {
78+
producerPool.close()
79+
} catch {
80+
case e: Throwable =>
81+
logWarning("Ignoring exception while shutting down pool from shutdown hook", e)
7382
}
74-
kafkaProducer
7583
}
7684

7785
/**
7886
* Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
7987
* exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
8088
* one instance per specified kafkaParams.
8189
*/
82-
private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = {
83-
val updatedKafkaProducerConfiguration =
90+
def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = {
91+
val updatedKafkaParams =
8492
KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
8593
.setAuthenticationConfigIfNeeded()
8694
.build()
87-
val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration)
88-
try {
89-
guavaCache.get(paramsSeq)
90-
} catch {
91-
case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
92-
if e.getCause != null =>
93-
throw e.getCause
94-
}
95-
}
96-
97-
private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = {
98-
val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1)
99-
paramsSeq
100-
}
101-
102-
/** For explicitly closing kafka producer */
103-
private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = {
104-
val paramsSeq = paramsToSeq(kafkaParams)
105-
guavaCache.invalidate(paramsSeq)
95+
val key = toCacheKey(updatedKafkaParams)
96+
producerPool.borrowObject(key, updatedKafkaParams)
10697
}
10798

108-
/** Auto close on cache evict */
109-
private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = {
110-
try {
111-
if (log.isInfoEnabled()) {
112-
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
113-
logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.")
114-
}
115-
producer.close()
116-
} catch {
117-
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
118-
}
99+
def release(producer: CachedKafkaProducer): Unit = {
100+
producerPool.returnObject(producer)
119101
}
120102

121103
private[kafka010] def clear(): Unit = {
122-
logInfo("Cleaning up guava cache.")
123-
guavaCache.invalidateAll()
104+
producerPool.reset()
124105
}
125-
126-
// Intended for testing purpose only.
127-
private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap()
128106
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.kafka010
19+
20+
import java.{util => ju}
21+
import java.io.Closeable
22+
import java.util.concurrent.ConcurrentHashMap
23+
24+
import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener}
25+
import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
26+
27+
import org.apache.spark.internal.Logging
28+
29+
/**
30+
* Provides object pool for objects which is grouped by a key.
31+
*
32+
* This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on
33+
* the class, and same contract applies: after using the borrowed object, you must either call
34+
* returnObject() if the object is healthy to return to pool, or invalidateObject() if the object
35+
* should be destroyed.
36+
*
37+
* The soft capacity of pool is determined by "poolConfig.capacity" config value,
38+
* and the pool will have reasonable default value if the value is not provided.
39+
* (The instance will do its best effort to respect soft capacity but it can exceed when there's
40+
* a borrowing request and there's neither free space nor idle object to clear.)
41+
*
42+
* This class guarantees that no caller will get pooled object once the object is borrowed and
43+
* not yet returned, hence provide thread-safety usage of non-thread-safe objects unless caller
44+
* shares the object to multiple threads.
45+
*/
46+
private[kafka010] abstract class InternalKafkaConnectorPool[K, V <: Closeable](
47+
objectFactory: ObjectFactory[K, V],
48+
poolConfig: PoolConfig[V],
49+
swallowedExceptionListener: SwallowedExceptionListener) extends Logging {
50+
51+
// the class is intended to have only soft capacity
52+
assert(poolConfig.getMaxTotal < 0)
53+
54+
private val pool = {
55+
val internalPool = new GenericKeyedObjectPool[K, V](objectFactory, poolConfig)
56+
internalPool.setSwallowedExceptionListener(swallowedExceptionListener)
57+
internalPool
58+
}
59+
60+
/**
61+
* Borrows object from the pool. If there's no idle object for the key,
62+
* the pool will create the object.
63+
*
64+
* If the pool doesn't have idle object for the key and also exceeds the soft capacity,
65+
* pool will try to clear some of idle objects.
66+
*
67+
* Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise
68+
* the object will be kept in pool as active object.
69+
*/
70+
def borrowObject(key: K, kafkaParams: ju.Map[String, Object]): V = {
71+
updateKafkaParamForKey(key, kafkaParams)
72+
73+
if (size >= poolConfig.softMaxSize) {
74+
logWarning("Pool exceeds its soft max size, cleaning up idle objects...")
75+
pool.clearOldest()
76+
}
77+
78+
pool.borrowObject(key)
79+
}
80+
81+
/** Returns borrowed object to the pool. */
82+
def returnObject(connector: V): Unit = {
83+
pool.returnObject(createKey(connector), connector)
84+
}
85+
86+
/** Invalidates (destroy) borrowed object to the pool. */
87+
def invalidateObject(connector: V): Unit = {
88+
pool.invalidateObject(createKey(connector), connector)
89+
}
90+
91+
/** Invalidates all idle values for the key */
92+
def invalidateKey(key: K): Unit = {
93+
pool.clear(key)
94+
}
95+
96+
/**
97+
* Closes the keyed object pool. Once the pool is closed,
98+
* borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject
99+
* will continue to work, with returned objects destroyed on return.
100+
*
101+
* Also destroys idle instances in the pool.
102+
*/
103+
def close(): Unit = {
104+
pool.close()
105+
}
106+
107+
def reset(): Unit = {
108+
// this is the best-effort of clearing up. otherwise we should close the pool and create again
109+
// but we don't want to make it "var" only because of tests.
110+
pool.clear()
111+
}
112+
113+
def numIdle: Int = pool.getNumIdle
114+
115+
def numIdle(key: K): Int = pool.getNumIdle(key)
116+
117+
def numActive: Int = pool.getNumActive
118+
119+
def numActive(key: K): Int = pool.getNumActive(key)
120+
121+
def size: Int = numIdle + numActive
122+
123+
def size(key: K): Int = numIdle(key) + numActive(key)
124+
125+
private def updateKafkaParamForKey(key: K, kafkaParams: ju.Map[String, Object]): Unit = {
126+
// We can assume that kafkaParam should not be different for same cache key,
127+
// otherwise we can't reuse the cached object and cache key should contain kafkaParam.
128+
// So it should be safe to put the key/value pair only when the key doesn't exist.
129+
val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams)
130+
require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " +
131+
s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams")
132+
}
133+
134+
protected def createKey(connector: V): K
135+
}
136+
137+
private[kafka010] abstract class PoolConfig[V] extends GenericKeyedObjectPoolConfig[V] {
138+
139+
init()
140+
141+
def softMaxSize: Int
142+
143+
def jmxEnabled: Boolean
144+
145+
def minEvictableIdleTimeMillis: Long
146+
147+
def evictorThreadRunIntervalMillis: Long
148+
149+
def jmxNamePrefix: String
150+
151+
def init(): Unit = {
152+
// NOTE: Below lines define the behavior, so do not modify unless you know what you are
153+
// doing, and update the class doc accordingly if necessary when you modify.
154+
155+
// 1. Set min idle objects per key to 0 to avoid creating unnecessary object.
156+
// 2. Set max idle objects per key to 3 but set total objects per key to infinite
157+
// which ensures borrowing per key is not restricted.
158+
// 3. Set max total objects to infinite which ensures all objects are managed in this pool.
159+
setMinIdlePerKey(0)
160+
setMaxIdlePerKey(3)
161+
setMaxTotalPerKey(-1)
162+
setMaxTotal(-1)
163+
164+
// Set minimum evictable idle time which will be referred from evictor thread
165+
setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
166+
setSoftMinEvictableIdleTimeMillis(-1)
167+
168+
// evictor thread will run test with ten idle objects
169+
setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis)
170+
setNumTestsPerEvictionRun(10)
171+
setEvictionPolicy(new DefaultEvictionPolicy[V]())
172+
173+
// Immediately fail on exhausted pool while borrowing
174+
setBlockWhenExhausted(false)
175+
176+
setJmxEnabled(jmxEnabled)
177+
setJmxNamePrefix(jmxNamePrefix)
178+
}
179+
}
180+
181+
private[kafka010] abstract class ObjectFactory[K, V <: Closeable]
182+
extends BaseKeyedPooledObjectFactory[K, V] {
183+
val keyToKafkaParams = new ConcurrentHashMap[K, ju.Map[String, Object]]()
184+
185+
override def create(key: K): V = {
186+
Option(keyToKafkaParams.get(key)) match {
187+
case Some(kafkaParams) => createValue(key, kafkaParams)
188+
case None => throw new IllegalStateException("Kafka params should be set before " +
189+
"borrowing object.")
190+
}
191+
}
192+
193+
override def wrap(value: V): PooledObject[V] = {
194+
new DefaultPooledObject[V](value)
195+
}
196+
197+
override def destroyObject(key: K, p: PooledObject[V]): Unit = {
198+
p.getObject.close()
199+
}
200+
201+
protected def createValue(key: K, kafkaParams: ju.Map[String, Object]): V
202+
}
203+
204+
private[kafka010] class CustomSwallowedExceptionListener(connectorType: String)
205+
extends SwallowedExceptionListener with Logging {
206+
207+
override def onSwallowException(e: Exception): Unit = {
208+
logWarning(s"Error closing Kafka $connectorType", e)
209+
}
210+
}

0 commit comments

Comments
 (0)