From 20c82b7ceac36840b3db3a2697803fdd36cbeeb1 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 24 Dec 2019 09:32:06 +0900 Subject: [PATCH 1/2] [SPARK-30336][SQL][SS] Move Kafka consumer-related classes to its own package --- .../sql/kafka010/KafkaBatchPartitionReader.scala | 1 + .../spark/sql/kafka010/KafkaContinuousStream.scala | 1 + .../apache/spark/sql/kafka010/KafkaSourceRDD.scala | 7 ++----- .../kafka010/{ => consumer}/FetchedDataPool.scala | 13 +++++++------ .../{ => consumer}/InternalKafkaConsumerPool.scala | 13 ++++++------- .../kafka010/{ => consumer}/KafkaDataConsumer.scala | 6 +++--- .../{ => consumer}/FetchedDataPoolSuite.scala | 5 +++-- .../InternalKafkaConsumerPoolSuite.scala | 5 +++-- .../{ => consumer}/KafkaDataConsumerSuite.scala | 5 +++-- 9 files changed, 29 insertions(+), 27 deletions(-) rename external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/{ => consumer}/FetchedDataPool.scala (92%) rename external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/{ => consumer}/InternalKafkaConsumerPool.scala (96%) rename external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/{ => consumer}/KafkaDataConsumer.scala (99%) rename external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/{ => consumer}/FetchedDataPoolSuite.scala (97%) rename external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/{ => consumer}/InternalKafkaConsumerPoolSuite.scala (97%) rename external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/{ => consumer}/KafkaDataConsumerSuite.scala (98%) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 645b68b0c407..8b37fd6e7e2b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer /** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */ private[kafka010] case class KafkaBatchInputPartition( diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 0603ae39ba62..0b549870a348 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer import org.apache.spark.sql.util.CaseInsensitiveStringMap /** diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index dae9515205f5..f1f3871fc7db 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -19,18 +19,15 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import scala.collection.mutable.ArrayBuffer - -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator - /** Offset range that one partition of the KafkaSourceRDD has to read */ private[kafka010] case class KafkaSourceRDDOffsetRange( topicPartition: TopicPartition, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala similarity index 92% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala index 6f18407a1700..6174bfb20342 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} @@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** @@ -39,7 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * modified in same instance, this class cannot be replaced with general pool implementations * including Apache Commons Pool which pools KafkaConsumer. */ -private[kafka010] class FetchedDataPool( +private[consumer] class FetchedDataPool( executorService: ScheduledExecutorService, clock: Clock, conf: SparkConf) extends Logging { @@ -159,8 +160,8 @@ private[kafka010] class FetchedDataPool( } } -private[kafka010] object FetchedDataPool { - private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +private[consumer] object FetchedDataPool { + private[consumer] case class CachedFetchedData(fetchedData: FetchedData) { var lastReleasedTimestamp: Long = Long.MaxValue var lastAcquiredTimestamp: Long = Long.MinValue var inUse: Boolean = false @@ -179,5 +180,5 @@ private[kafka010] object FetchedDataPool { } } - private[kafka010] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + private[consumer] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPool.scala similarity index 96% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPool.scala index 276a942742b8..2256f96c660b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPool.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.util.concurrent.ConcurrentHashMap @@ -25,8 +25,9 @@ import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010._ +import org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey /** * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. @@ -45,10 +46,9 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] * unless caller shares the object to multiple threads. */ -private[kafka010] class InternalKafkaConsumerPool( +private[consumer] class InternalKafkaConsumerPool( objectFactory: ObjectFactory, poolConfig: PoolConfig) extends Logging { - def this(conf: SparkConf) = { this(new ObjectFactory, new PoolConfig(conf)) } @@ -147,7 +147,7 @@ private[kafka010] class InternalKafkaConsumerPool( } } -private[kafka010] object InternalKafkaConsumerPool { +private[consumer] object InternalKafkaConsumerPool { object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging { override def onSwallowException(e: Exception): Unit = { logError(s"Error closing Kafka consumer", e) @@ -218,4 +218,3 @@ private[kafka010] object InternalKafkaConsumerPool { } } } - diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala similarity index 99% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index ca82c908f441..b4123475e808 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.io.Closeable @@ -29,9 +29,9 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf, KafkaTokenUtil} -import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} +import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} /** diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala similarity index 97% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala index 5449f5d733c5..23bab5cd4808 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.util.concurrent.TimeUnit @@ -29,7 +29,8 @@ import org.jmock.lib.concurrent.DeterministicScheduler import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ManualClock diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPoolSuite.scala similarity index 97% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPoolSuite.scala index 78d7feef5851..3797d5b5bd6a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPoolSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} @@ -26,7 +26,8 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.spark.SparkConf -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.{CONSUMER_CACHE_CAPACITY, CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, CONSUMER_CACHE_TIMEOUT} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession class InternalKafkaConsumerPoolSuite extends SharedSparkSession { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala similarity index 98% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala index d22955180d05..c607c4fc81b7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.nio.charset.StandardCharsets @@ -32,7 +32,8 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.{TaskContext, TaskContextImpl} import org.apache.spark.kafka010.KafkaDelegationTokenTest -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession class KafkaDataConsumerSuite From 757c7da43a1150c7aa321dcc88dfde901c71d25a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 24 Dec 2019 09:48:45 +0900 Subject: [PATCH 2/2] More restrictive narrow-down --- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index b4123475e808..0eac18cffa7b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -47,13 +47,15 @@ private[kafka010] class InternalKafkaConsumer( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private[kafka010] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig( + // Exposed for testing + private[consumer] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig( SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) .asInstanceOf[String]) // Kafka consumer is not able to give back the params instantiated with so we need to store it. // It must be updated whenever a new consumer is created. - private[kafka010] var kafkaParamsWithSecurity: ju.Map[String, Object] = _ + // Exposed for testing + private[consumer] var kafkaParamsWithSecurity: ju.Map[String, Object] = _ private val consumer = createConsumer() /** @@ -141,7 +143,7 @@ private[kafka010] class InternalKafkaConsumer( * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to * poll when `records` is drained. */ -private[kafka010] case class FetchedData( +private[consumer] case class FetchedData( private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], private var _nextOffsetInFetchedData: Long, private var _offsetAfterPoll: Long) { @@ -200,7 +202,7 @@ private[kafka010] case class FetchedData( * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch * instead. */ -private[kafka010] case class FetchedRecord( +private[consumer] case class FetchedRecord( var record: ConsumerRecord[Array[Byte], Array[Byte]], var nextOffsetToFetch: Long) { @@ -227,7 +229,8 @@ private[kafka010] class KafkaDataConsumer( fetchedDataPool: FetchedDataPool) extends Logging { import KafkaDataConsumer._ - @volatile private[kafka010] var _consumer: Option[InternalKafkaConsumer] = None + // Exposed for testing + @volatile private[consumer] var _consumer: Option[InternalKafkaConsumer] = None @volatile private var _fetchedData: Option[FetchedData] = None private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]