diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index fc177cdc9037..907440ab3731 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -18,60 +18,68 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit} +import java.io.Closeable -import com.google.common.cache._ -import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} -import org.apache.kafka.clients.producer.KafkaProducer import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord} + import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil} +import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._ +import org.apache.spark.util.ShutdownHookManager -private[kafka010] object CachedKafkaProducer extends Logging { +private[kafka010] class CachedKafkaProducer(val kafkaParams: ju.Map[String, Object]) + extends Closeable with Logging { private type Producer = KafkaProducer[Array[Byte], Array[Byte]] - private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) - - private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get) - .map(_.conf.get(PRODUCER_CACHE_TIMEOUT)) - .getOrElse(defaultCacheExpireTimeout) + private val producer = createProducer() - private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { - override def load(config: Seq[(String, Object)]): Producer = { - createKafkaProducer(config) + private def createProducer(): Producer = { + val producer: Producer = new Producer(kafkaParams) + if (log.isDebugEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams)) + logDebug(s"Created a new instance of kafka producer for $redactedParamsSeq.") } + producer } - private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() { - override def onRemoval( - notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { - val paramsSeq: Seq[(String, Object)] = notification.getKey - val producer: Producer = notification.getValue - if (log.isDebugEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " + - s"due to ${notification.getCause}") + override def close(): Unit = { + try { + if (log.isInfoEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams)) + logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") } - close(paramsSeq, producer) + producer.close() + } catch { + case NonFatal(e) => logWarning("Error while closing kafka producer.", e) } } - private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] = - CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) - .removalListener(removalListener) - .build[Seq[(String, Object)], Producer](cacheLoader) + def send(record: ProducerRecord[Array[Byte], Array[Byte]], callback: Callback): Unit = { + producer.send(record, callback) + } - private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = { - val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava) - if (log.isDebugEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.") + def flush(): Unit = { + producer.flush() + } +} + +private[kafka010] object CachedKafkaProducer extends Logging { + + private val sparkConf = SparkEnv.get.conf + private val producerPool = new InternalKafkaProducerPool(sparkConf) + + ShutdownHookManager.addShutdownHook { () => + try { + producerPool.close() + } catch { + case e: Throwable => + logWarning("Ignoring exception while shutting down pool from shutdown hook", e) } - kafkaProducer } /** @@ -79,50 +87,20 @@ private[kafka010] object CachedKafkaProducer extends Logging { * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = { - val updatedKafkaProducerConfiguration = + def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { + val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) .setAuthenticationConfigIfNeeded() .build() - val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration) - try { - guavaCache.get(paramsSeq) - } catch { - case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) - if e.getCause != null => - throw e.getCause - } - } - - private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = { - val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1) - paramsSeq - } - - /** For explicitly closing kafka producer */ - private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = { - val paramsSeq = paramsToSeq(kafkaParams) - guavaCache.invalidate(paramsSeq) + val key = toCacheKey(updatedKafkaParams) + producerPool.borrowObject(key, updatedKafkaParams) } - /** Auto close on cache evict */ - private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = { - try { - if (log.isInfoEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") - } - producer.close() - } catch { - case NonFatal(e) => logWarning("Error while closing kafka producer.", e) - } + def release(producer: CachedKafkaProducer): Unit = { + producerPool.returnObject(producer) } private[kafka010] def clear(): Unit = { - logInfo("Cleaning up guava cache.") - guavaCache.invalidateAll() + producerPool.reset() } - - // Intended for testing purpose only. - private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala new file mode 100644 index 000000000000..0fb250e8d05a --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io.Closeable +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.internal.Logging + +/** + * Provides object pool for objects which is grouped by a key. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "poolConfig.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe objects unless caller + * shares the object to multiple threads. + */ +private[kafka010] abstract class InternalKafkaConnectorPool[K, V <: Closeable]( + objectFactory: ObjectFactory[K, V], + poolConfig: PoolConfig[V], + swallowedExceptionListener: SwallowedExceptionListener) extends Logging { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private val pool = { + val internalPool = new GenericKeyedObjectPool[K, V](objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(swallowedExceptionListener) + internalPool + } + + /** + * Borrows object from the pool. If there's no idle object for the key, + * the pool will create the object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: K, kafkaParams: ju.Map[String, Object]): V = { + updateKafkaParamForKey(key, kafkaParams) + + if (size >= poolConfig.softMaxSize) { + logWarning("Pool exceeds its soft max size, cleaning up idle objects...") + pool.clearOldest() + } + + pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(connector: V): Unit = { + pool.returnObject(createKey(connector), connector) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(connector: V): Unit = { + pool.invalidateObject(createKey(connector), connector) + } + + /** Invalidates all idle values for the key */ + def invalidateKey(key: K): Unit = { + pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject + * will continue to work, with returned objects destroyed on return. + * + * Also destroys idle instances in the pool. + */ + def close(): Unit = { + pool.close() + } + + def reset(): Unit = { + // this is the best-effort of clearing up. otherwise we should close the pool and create again + // but we don't want to make it "var" only because of tests. + pool.clear() + } + + def numIdle: Int = pool.getNumIdle + + def numIdle(key: K): Int = pool.getNumIdle(key) + + def numActive: Int = pool.getNumActive + + def numActive(key: K): Int = pool.getNumActive(key) + + def size: Int = numIdle + numActive + + def size(key: K): Int = numIdle(key) + numActive(key) + + private def updateKafkaParamForKey(key: K, kafkaParams: ju.Map[String, Object]): Unit = { + // We can assume that kafkaParam should not be different for same cache key, + // otherwise we can't reuse the cached object and cache key should contain kafkaParam. + // So it should be safe to put the key/value pair only when the key doesn't exist. + val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams) + require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " + + s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams") + } + + protected def createKey(connector: V): K +} + +private[kafka010] abstract class PoolConfig[V] extends GenericKeyedObjectPoolConfig[V] { + + init() + + def softMaxSize: Int + + def jmxEnabled: Boolean + + def minEvictableIdleTimeMillis: Long + + def evictorThreadRunIntervalMillis: Long + + def jmxNamePrefix: String + + def init(): Unit = { + // NOTE: Below lines define the behavior, so do not modify unless you know what you are + // doing, and update the class doc accordingly if necessary when you modify. + + // 1. Set min idle objects per key to 0 to avoid creating unnecessary object. + // 2. Set max idle objects per key to 3 but set total objects per key to infinite + // which ensures borrowing per key is not restricted. + // 3. Set max total objects to infinite which ensures all objects are managed in this pool. + setMinIdlePerKey(0) + setMaxIdlePerKey(3) + setMaxTotalPerKey(-1) + setMaxTotal(-1) + + // Set minimum evictable idle time which will be referred from evictor thread + setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis) + setSoftMinEvictableIdleTimeMillis(-1) + + // evictor thread will run test with ten idle objects + setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis) + setNumTestsPerEvictionRun(10) + setEvictionPolicy(new DefaultEvictionPolicy[V]()) + + // Immediately fail on exhausted pool while borrowing + setBlockWhenExhausted(false) + + setJmxEnabled(jmxEnabled) + setJmxNamePrefix(jmxNamePrefix) + } +} + +private[kafka010] abstract class ObjectFactory[K, V <: Closeable] + extends BaseKeyedPooledObjectFactory[K, V] { + val keyToKafkaParams = new ConcurrentHashMap[K, ju.Map[String, Object]]() + + override def create(key: K): V = { + Option(keyToKafkaParams.get(key)) match { + case Some(kafkaParams) => createValue(key, kafkaParams) + case None => throw new IllegalStateException("Kafka params should be set before " + + "borrowing object.") + } + } + + override def wrap(value: V): PooledObject[V] = { + new DefaultPooledObject[V](value) + } + + override def destroyObject(key: K, p: PooledObject[V]): Unit = { + p.getObject.close() + } + + protected def createValue(key: K, kafkaParams: ju.Map[String, Object]): V +} + +private[kafka010] class CustomSwallowedExceptionListener(connectorType: String) + extends SwallowedExceptionListener with Logging { + + override def onSwallowException(e: Exception): Unit = { + logWarning(s"Error closing Kafka $connectorType", e) + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala index 276a942742b8..a8e604564645 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala @@ -18,204 +18,46 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.ConcurrentHashMap -import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} -import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} +import org.apache.commons.pool2.PooledObject import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey -/** - * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. - * - * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on - * the class, and same contract applies: after using the borrowed object, you must either call - * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object - * should be destroyed. - * - * The soft capacity of pool is determined by "spark.kafka.consumer.cache.capacity" config value, - * and the pool will have reasonable default value if the value is not provided. - * (The instance will do its best effort to respect soft capacity but it can exceed when there's - * a borrowing request and there's neither free space nor idle object to clear.) - * - * This class guarantees that no caller will get pooled object once the object is borrowed and - * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] - * unless caller shares the object to multiple threads. - */ +// TODO: revisit the relation between CacheKey and kafkaParams - for now it looks a bit weird +// as we force all consumers having same (groupId, topicPartition) to have same kafkaParams +// which might be viable in performance perspective (kafkaParams might be too huge to use +// as a part of key), but there might be the case kafkaParams could be different - +// cache key should be differentiated for both kafkaParams. private[kafka010] class InternalKafkaConsumerPool( - objectFactory: ObjectFactory, - poolConfig: PoolConfig) extends Logging { + objectFactory: ConsumerObjectFactory, + poolConfig: ConsumerPoolConfig) + extends InternalKafkaConnectorPool[CacheKey, InternalKafkaConsumer]( + objectFactory, + poolConfig, + new CustomSwallowedExceptionListener("consumer")) { def this(conf: SparkConf) = { - this(new ObjectFactory, new PoolConfig(conf)) - } - - // the class is intended to have only soft capacity - assert(poolConfig.getMaxTotal < 0) - - private val pool = { - val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( - objectFactory, poolConfig) - internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) - internalPool - } - - /** - * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, - * the pool will create the [[InternalKafkaConsumer]] object. - * - * If the pool doesn't have idle object for the key and also exceeds the soft capacity, - * pool will try to clear some of idle objects. - * - * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise - * the object will be kept in pool as active object. - */ - def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { - updateKafkaParamForKey(key, kafkaParams) - - if (size >= poolConfig.softMaxSize) { - logWarning("Pool exceeds its soft max size, cleaning up idle objects...") - pool.clearOldest() - } - - pool.borrowObject(key) - } - - /** Returns borrowed object to the pool. */ - def returnObject(consumer: InternalKafkaConsumer): Unit = { - pool.returnObject(extractCacheKey(consumer), consumer) - } - - /** Invalidates (destroy) borrowed object to the pool. */ - def invalidateObject(consumer: InternalKafkaConsumer): Unit = { - pool.invalidateObject(extractCacheKey(consumer), consumer) - } - - /** Invalidates all idle consumers for the key */ - def invalidateKey(key: CacheKey): Unit = { - pool.clear(key) - } - - /** - * Closes the keyed object pool. Once the pool is closed, - * borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject - * will continue to work, with returned objects destroyed on return. - * - * Also destroys idle instances in the pool. - */ - def close(): Unit = { - pool.close() - } - - def reset(): Unit = { - // this is the best-effort of clearing up. otherwise we should close the pool and create again - // but we don't want to make it "var" only because of tests. - pool.clear() + this(new ConsumerObjectFactory, new ConsumerPoolConfig(conf)) } - def numIdle: Int = pool.getNumIdle - - def numIdle(key: CacheKey): Int = pool.getNumIdle(key) - - def numActive: Int = pool.getNumActive - - def numActive(key: CacheKey): Int = pool.getNumActive(key) - - def size: Int = numIdle + numActive - - def size(key: CacheKey): Int = numIdle(key) + numActive(key) - - // TODO: revisit the relation between CacheKey and kafkaParams - for now it looks a bit weird - // as we force all consumers having same (groupId, topicPartition) to have same kafkaParams - // which might be viable in performance perspective (kafkaParams might be too huge to use - // as a part of key), but there might be the case kafkaParams could be different - - // cache key should be differentiated for both kafkaParams. - private def updateKafkaParamForKey(key: CacheKey, kafkaParams: ju.Map[String, Object]): Unit = { - // We can assume that kafkaParam should not be different for same cache key, - // otherwise we can't reuse the cached object and cache key should contain kafkaParam. - // So it should be safe to put the key/value pair only when the key doesn't exist. - val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams) - require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " + - s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams") - } - - private def extractCacheKey(consumer: InternalKafkaConsumer): CacheKey = { + override protected def createKey(consumer: InternalKafkaConsumer): CacheKey = { new CacheKey(consumer.topicPartition, consumer.kafkaParams) } } -private[kafka010] object InternalKafkaConsumerPool { - object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging { - override def onSwallowException(e: Exception): Unit = { - logError(s"Error closing Kafka consumer", e) - } - } - - class PoolConfig(conf: SparkConf) extends GenericKeyedObjectPoolConfig[InternalKafkaConsumer] { - private var _softMaxSize = Int.MaxValue - - def softMaxSize: Int = _softMaxSize - - init() - - def init(): Unit = { - _softMaxSize = conf.get(CONSUMER_CACHE_CAPACITY) - - val jmxEnabled = conf.get(CONSUMER_CACHE_JMX_ENABLED) - val minEvictableIdleTimeMillis = conf.get(CONSUMER_CACHE_TIMEOUT) - val evictorThreadRunIntervalMillis = conf.get( - CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) - - // NOTE: Below lines define the behavior, so do not modify unless you know what you are - // doing, and update the class doc accordingly if necessary when you modify. - - // 1. Set min idle objects per key to 0 to avoid creating unnecessary object. - // 2. Set max idle objects per key to 3 but set total objects per key to infinite - // which ensures borrowing per key is not restricted. - // 3. Set max total objects to infinite which ensures all objects are managed in this pool. - setMinIdlePerKey(0) - setMaxIdlePerKey(3) - setMaxTotalPerKey(-1) - setMaxTotal(-1) - - // Set minimum evictable idle time which will be referred from evictor thread - setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis) - setSoftMinEvictableIdleTimeMillis(-1) - - // evictor thread will run test with ten idle objects - setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis) - setNumTestsPerEvictionRun(10) - setEvictionPolicy(new DefaultEvictionPolicy[InternalKafkaConsumer]()) - - // Immediately fail on exhausted pool while borrowing - setBlockWhenExhausted(false) - - setJmxEnabled(jmxEnabled) - setJmxNamePrefix("kafka010-cached-simple-kafka-consumer-pool") - } - } - - class ObjectFactory extends BaseKeyedPooledObjectFactory[CacheKey, InternalKafkaConsumer] { - val keyToKafkaParams = new ConcurrentHashMap[CacheKey, ju.Map[String, Object]]() - - override def create(key: CacheKey): InternalKafkaConsumer = { - Option(keyToKafkaParams.get(key)) match { - case Some(kafkaParams) => new InternalKafkaConsumer(key.topicPartition, kafkaParams) - case None => throw new IllegalStateException("Kafka params should be set before " + - "borrowing object.") - } - } - - override def wrap(value: InternalKafkaConsumer): PooledObject[InternalKafkaConsumer] = { - new DefaultPooledObject[InternalKafkaConsumer](value) - } +private class ConsumerPoolConfig(conf: SparkConf) extends PoolConfig[InternalKafkaConsumer] { + def softMaxSize: Int = conf.get(CONSUMER_CACHE_CAPACITY) + def jmxEnabled: Boolean = conf.get(CONSUMER_CACHE_JMX_ENABLED) + def minEvictableIdleTimeMillis: Long = conf.get(CONSUMER_CACHE_TIMEOUT) + def evictorThreadRunIntervalMillis: Long = conf.get(CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) + def jmxNamePrefix: String = "kafka010-cached-simple-kafka-consumer-pool" +} - override def destroyObject(key: CacheKey, p: PooledObject[InternalKafkaConsumer]): Unit = { - p.getObject.close() - } +private class ConsumerObjectFactory extends ObjectFactory[CacheKey, InternalKafkaConsumer] { + override protected def createValue( + key: CacheKey, + kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { + new InternalKafkaConsumer(key.topicPartition, kafkaParams) } } - diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala new file mode 100644 index 000000000000..165b64313abb --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.commons.pool2.PooledObject + +import org.apache.spark.SparkConf +import org.apache.spark.sql.kafka010.InternalKafkaProducerPool.CacheKey + +private[kafka010] class InternalKafkaProducerPool( + objectFactory: ProducerObjectFactory, + poolConfig: ProducerPoolConfig) + extends InternalKafkaConnectorPool[CacheKey, CachedKafkaProducer]( + objectFactory, + poolConfig, + new CustomSwallowedExceptionListener("producer")) { + + def this(conf: SparkConf) = { + this(new ProducerObjectFactory, new ProducerPoolConfig(conf)) + } + + override protected def createKey(producer: CachedKafkaProducer): CacheKey = { + InternalKafkaProducerPool.toCacheKey(producer.kafkaParams) + } +} + +private class ProducerPoolConfig(conf: SparkConf) extends PoolConfig[CachedKafkaProducer] { + def softMaxSize: Int = conf.get(PRODUCER_CACHE_CAPACITY) + def jmxEnabled: Boolean = conf.get(PRODUCER_CACHE_JMX_ENABLED) + def minEvictableIdleTimeMillis: Long = conf.get(PRODUCER_CACHE_TIMEOUT) + def evictorThreadRunIntervalMillis: Long = conf.get(PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) + def jmxNamePrefix: String = "kafka010-cached-simple-kafka-producer-pool" +} + +private class ProducerObjectFactory extends ObjectFactory[CacheKey, CachedKafkaProducer] { + override protected def createValue( + key: CacheKey, + kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { + new CachedKafkaProducer(kafkaParams) + } +} + +private[kafka010] object InternalKafkaProducerPool { + type CacheKey = Seq[(String, Object)] + + def toCacheKey(params: ju.Map[String, Object]): CacheKey = { + params.asScala.toSeq.sortBy(x => x._1) + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index ca82c908f441..f2dad945719e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -613,7 +613,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { consumerPool.close() } catch { case e: Throwable => - logWarning("Ignoring Exception while shutting down pools from shutdown hook", e) + logWarning("Ignoring exception while shutting down pools from shutdown hook", e) } } @@ -639,6 +639,11 @@ private[kafka010] object KafkaDataConsumer extends Logging { new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool) } + private[kafka010] def clear(): Unit = { + consumerPool.reset() + fetchedDataPool.reset() + } + private def reportDataLoss0( failOnDataLoss: Boolean, finalMessage: String, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala index 3f8d3d2da579..870ed7aef4de 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala @@ -44,7 +44,7 @@ private[kafka010] class KafkaDataWriter( inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams) + private var producer = CachedKafkaProducer.acquire(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -55,20 +55,36 @@ private[kafka010] class KafkaDataWriter( // Send is asynchronous, but we can't commit until all rows are actually in Kafka. // This requires flushing and then checking that no callbacks produced errors. // We also check for errors before to fail as soon as possible - the check is cheap. - checkForErrors() - producer.flush() - checkForErrors() + try { + checkForErrors() + producer.flush() + checkForErrors() + } finally { + releaseProducer() + } KafkaDataWriterCommitMessage } - def abort(): Unit = {} + def abort(): Unit = { + close() + } def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() + try { checkForErrors() - CachedKafkaProducer.close(producerParams) + if (producer != null) { + producer.flush() + checkForErrors() + } + } finally { + releaseProducer() + } + } + + private def releaseProducer(): Unit = { + if (producer != null) { + CachedKafkaProducer.release(producer) + producer = null } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index b423ddc959c1..eb95603cf994 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -39,13 +39,13 @@ private[kafka010] class KafkaWriteTask( inputSchema: Seq[Attribute], topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { // used to synchronize with Kafka callbacks - private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ + private var producer: CachedKafkaProducer = _ /** * Writes key value data out to topics. */ def execute(iterator: Iterator[InternalRow]): Unit = { - producer = CachedKafkaProducer.getOrCreate(producerConfiguration) + producer = CachedKafkaProducer.acquire(producerConfiguration) while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() sendRow(currentRow, producer) @@ -53,11 +53,17 @@ private[kafka010] class KafkaWriteTask( } def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() + try { checkForErrors() - producer = null + if (producer != null) { + producer.flush() + checkForErrors() + } + } finally { + if (producer != null) { + CachedKafkaProducer.release(producer) + producer = null + } } } } @@ -83,7 +89,7 @@ private[kafka010] abstract class KafkaRowWriter( * assuming the row is in Kafka. */ protected def sendRow( - row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): Unit = { + row: InternalRow, producer: CachedKafkaProducer): Unit = { val projectedRow = projection(row) val topic = projectedRow.getUTF8String(0) val key = projectedRow.getBinary(1) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala index 6f6ae55fc497..f103b5b69b58 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala @@ -26,12 +26,6 @@ package object kafka010 { // scalastyle:ignore // ^^ scalastyle:ignore is for ignoring warnings about digits in package name type PartitionOffsetMap = Map[TopicPartition, Long] - private[kafka010] val PRODUCER_CACHE_TIMEOUT = - ConfigBuilder("spark.kafka.producer.cache.timeout") - .doc("The expire time to remove the unused producers.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("10m") - private[kafka010] val CONSUMER_CACHE_CAPACITY = ConfigBuilder("spark.kafka.consumer.cache.capacity") .doc("The maximum number of consumers cached. Please note it's a soft limit" + @@ -74,4 +68,32 @@ package object kafka010 { // scalastyle:ignore "When non-positive, no idle evictor thread will be run.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1m") + + private[kafka010] val PRODUCER_CACHE_CAPACITY = + ConfigBuilder("spark.kafka.producer.cache.capacity") + .doc("The maximum number of producers cached. Please note it's a soft limit" + + " (check Structured Streaming Kafka integration guide for further details).") + .intConf + .createWithDefault(64) + + private[kafka010] val PRODUCER_CACHE_JMX_ENABLED = + ConfigBuilder("spark.kafka.producer.cache.jmx.enable") + .doc("Enable or disable JMX for pools created with this configuration instance.") + .booleanConf + .createWithDefault(false) + + private[kafka010] val PRODUCER_CACHE_TIMEOUT = + ConfigBuilder("spark.kafka.producer.cache.timeout") + .doc("The minimum amount of time a producer may sit idle in the pool before " + + "it is eligible for eviction by the evictor. " + + "When non-positive, no producers will be evicted from the pool due to idle time alone.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5m") + + private[kafka010] val PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL = + ConfigBuilder("spark.kafka.producer.cache.evictorThreadRunInterval") + .doc("The interval of time between runs of the idle evictor thread for producer pool. " + + "When non-positive, no idle evictor thread will be run.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1m") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 7425a74315e1..4506a4029d88 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -17,61 +17,133 @@ package org.apache.spark.sql.kafka010 -import java.{util => ju} -import java.util.concurrent.ConcurrentMap +import java.util.concurrent.{Executors, TimeUnit} -import org.apache.kafka.clients.producer.KafkaProducer +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.ProducerConfig.{KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG} import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.PrivateMethodTester +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._ import org.apache.spark.sql.test.SharedSparkSession class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTester with KafkaTest { - type KP = KafkaProducer[Array[Byte], Array[Byte]] + private var testUtils: KafkaTestUtils = _ + private val topic = "topic" + Random.nextInt() + private var producerPool: InternalKafkaProducerPool = _ + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils(Map[String, Object]()) + testUtils.setup() + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + super.afterAll() + } - protected override def beforeEach(): Unit = { + override def beforeEach(): Unit = { super.beforeEach() - CachedKafkaProducer.clear() + + producerPool = { + val internalKafkaConsumerPoolMethod = PrivateMethod[InternalKafkaProducerPool]('producerPool) + CachedKafkaProducer.invokePrivate(internalKafkaConsumerPoolMethod()) + } + + producerPool.reset() } - test("Should return the cached instance on calling getOrCreate with same params.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") + private def getKafkaParams(acks: Int = 0) = Map[String, Object]( + "acks" -> acks.toString, // Here only host should be resolvable, it does not need a running instance of kafka server. - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer = CachedKafkaProducer.getOrCreate(kafkaParams) - val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams) - assert(producer == producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]](Symbol("getAsMap")) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map.size == 1) + BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, + KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, + VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName + ).asJava + + test("acquire should return the cached instance with same params") { + val kafkaParams = getKafkaParams() + + val producer1 = CachedKafkaProducer.acquire(kafkaParams) + CachedKafkaProducer.release(producer1) + val producer2 = CachedKafkaProducer.acquire(kafkaParams) + CachedKafkaProducer.release(producer2) + + assert(producer1 === producer2) + assert(producerPool.size(toCacheKey(kafkaParams)) === 1) } - test("Should close the correct kafka producer for the given kafkaPrams.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams) - kafkaParams.put("acks", "1") - val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams) - // With updated conf, a new producer instance should be created. - assert(producer != producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]](Symbol("getAsMap")) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map.size == 2) - - CachedKafkaProducer.close(kafkaParams) - val map2 = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map2.size == 1) - import scala.collection.JavaConverters._ - val (seq: Seq[(String, Object)], _producer: KP) = map2.asScala.toArray.apply(0) - assert(_producer == producer) + test("acquire should return a new instance with different params") { + val kafkaParams1 = getKafkaParams() + val kafkaParams2 = getKafkaParams(1) + + val producer1 = CachedKafkaProducer.acquire(kafkaParams1) + CachedKafkaProducer.release(producer1) + val producer2 = CachedKafkaProducer.acquire(kafkaParams2) + CachedKafkaProducer.release(producer2) + + assert(producer1 !== producer2) + assert(producerPool.size(toCacheKey(kafkaParams1)) === 1) + assert(producerPool.size(toCacheKey(kafkaParams2)) === 1) + } + + test("Concurrent use of CachedKafkaProducer") { + val data = (1 to 1000).map(_.toString) + testUtils.createTopic(topic, 1) + + val kafkaParams = getKafkaParams() + val numThreads = 100 + val numProducerUsages = 500 + + @volatile var error: Throwable = null + + val callback = new Callback() { + override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { + if (error == null && e != null) { + error = e + } + } + } + + def produce(): Unit = { + val taskContext = if (Random.nextBoolean) { + new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null) + } else { + null + } + TaskContext.setTaskContext(taskContext) + val producer = CachedKafkaProducer.acquire(kafkaParams) + try { + data.foreach { d => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, d.getBytes) + producer.send(record, callback) + } + } finally { + CachedKafkaProducer.release(producer) + } + } + + val threadpool = Executors.newFixedThreadPool(numThreads) + try { + val futures = (1 to numProducerUsages).map { i => + threadpool.submit(new Runnable { + override def run(): Unit = { produce() } + }) + } + futures.foreach(_.get(1, TimeUnit.MINUTES)) + assert(error == null) + } finally { + threadpool.shutdown() + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala similarity index 96% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala index 78d7feef5851..3143429abd71 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala @@ -29,7 +29,13 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession -class InternalKafkaConsumerPoolSuite extends SharedSparkSession { +/* + * There are multiple implementations of [[InternalKafkaConnectorPool]] but they don't differ + * significantly. Because of that only [[InternalKafkaConsumerPool]] used to test all the + * functionality. If the behavior of implementations starts to differ it worth to add further + * tests but for now it would be mainly copy-paste. + */ +class InternalKafkaConnectorPoolSuite extends SharedSparkSession { test("basic multiple borrows and returns for single key") { val pool = new InternalKafkaConsumerPool(new SparkConf()) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala index d22955180d05..6e1f10e7f6d7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala @@ -195,7 +195,7 @@ class KafkaDataConsumerSuite @volatile var error: Throwable = null - def consume(i: Int): Unit = { + def consume(): Unit = { val taskContext = if (Random.nextBoolean) { new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null) } else { @@ -233,9 +233,9 @@ class KafkaDataConsumerSuite val threadpool = Executors.newFixedThreadPool(numThreads) try { - val futures = (1 to numConsumerUsages).map { i => + val futures = (1 to numConsumerUsages).map { _ => threadpool.submit(new Runnable { - override def run(): Unit = { consume(i) } + override def run(): Unit = { consume() } }) } futures.foreach(_.get(1, TimeUnit.MINUTES)) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala index 19acda95c707..2900322b947b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala @@ -21,12 +21,16 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite -/** A trait to clean cached Kafka producers in `afterAll` */ +/** A trait to clean cached Kafka connector in `afterAll` */ trait KafkaTest extends BeforeAndAfterAll { self: SparkFunSuite => override def afterAll(): Unit = { - super.afterAll() - CachedKafkaProducer.clear() + try { + KafkaDataConsumer.clear() + CachedKafkaProducer.clear() + } finally { + super.afterAll() + } } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index 82913cf416a5..246672bcbacf 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -121,8 +121,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before val numThreads = 100 val numConsumerUsages = 500 - @volatile var error: Throwable = null - def consume(i: Int): Unit = { val useCache = Random.nextBoolean val taskContext = if (Random.nextBoolean) { @@ -138,10 +136,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before new String(bytes) } assert(rcvd == data) - } catch { - case e: Throwable => - error = e - throw e } finally { consumer.release() } @@ -155,7 +149,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before }) } futures.foreach(_.get(1, TimeUnit.MINUTES)) - assert(error == null) } finally { threadPool.shutdown() }