Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer

/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
private[kafka010] case class KafkaBatchInputPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import scala.collection.mutable.ArrayBuffer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also cleaned up unused imports and remove unnecessary line (two empty lines between the last import statement and class definition)


import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator


/** Offset range that one partition of the KafkaSourceRDD has to read */
private[kafka010] case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.sql.kafka010.consumer

import java.{util => ju}
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
Expand All @@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}

/**
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* modified in same instance, this class cannot be replaced with general pool implementations
* including Apache Commons Pool which pools KafkaConsumer.
*/
private[kafka010] class FetchedDataPool(
private[consumer] class FetchedDataPool(
executorService: ScheduledExecutorService,
clock: Clock,
conf: SparkConf) extends Logging {
Expand Down Expand Up @@ -159,8 +160,8 @@ private[kafka010] class FetchedDataPool(
}
}

private[kafka010] object FetchedDataPool {
private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) {
private[consumer] object FetchedDataPool {
private[consumer] case class CachedFetchedData(fetchedData: FetchedData) {
var lastReleasedTimestamp: Long = Long.MaxValue
var lastAcquiredTimestamp: Long = Long.MinValue
var inUse: Boolean = false
Expand All @@ -179,5 +180,5 @@ private[kafka010] object FetchedDataPool {
}
}

private[kafka010] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData]
private[consumer] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.sql.kafka010.consumer

import java.{util => ju}
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -25,8 +25,9 @@ import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.kafka010._
import org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey

/**
* Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]].
Expand All @@ -45,10 +46,9 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
* not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]]
* unless caller shares the object to multiple threads.
*/
private[kafka010] class InternalKafkaConsumerPool(
private[consumer] class InternalKafkaConsumerPool(
objectFactory: ObjectFactory,
poolConfig: PoolConfig) extends Logging {

def this(conf: SparkConf) = {
this(new ObjectFactory, new PoolConfig(conf))
}
Expand Down Expand Up @@ -147,7 +147,7 @@ private[kafka010] class InternalKafkaConsumerPool(
}
}

private[kafka010] object InternalKafkaConsumerPool {
private[consumer] object InternalKafkaConsumerPool {
object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging {
override def onSwallowException(e: Exception): Unit = {
logError(s"Error closing Kafka consumer", e)
Expand Down Expand Up @@ -218,4 +218,3 @@ private[kafka010] object InternalKafkaConsumerPool {
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were two empty lines which can be just one empty line.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.sql.kafka010.consumer

import java.{util => ju}
import java.io.Closeable
Expand All @@ -29,9 +29,9 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf, KafkaTokenUtil}
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}

/**
Expand All @@ -47,13 +47,15 @@ private[kafka010] class InternalKafkaConsumer(

val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

private[kafka010] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(
// Exposed for testing
private[consumer] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(
SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
.asInstanceOf[String])

// Kafka consumer is not able to give back the params instantiated with so we need to store it.
// It must be updated whenever a new consumer is created.
private[kafka010] var kafkaParamsWithSecurity: ju.Map[String, Object] = _
// Exposed for testing
private[consumer] var kafkaParamsWithSecurity: ju.Map[String, Object] = _
private val consumer = createConsumer()

/**
Expand Down Expand Up @@ -139,7 +141,7 @@ private[kafka010] class InternalKafkaConsumer(
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
*/
private[kafka010] case class FetchedData(
private[consumer] case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {
Expand Down Expand Up @@ -196,7 +198,7 @@ private[kafka010] case class FetchedData(
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead.
*/
private[kafka010] case class FetchedRecord(
private[consumer] case class FetchedRecord(
var record: ConsumerRecord[Array[Byte], Array[Byte]],
var nextOffsetToFetch: Long) {

Expand All @@ -223,7 +225,8 @@ private[kafka010] class KafkaDataConsumer(
fetchedDataPool: FetchedDataPool) extends Logging {
import KafkaDataConsumer._

@volatile private[kafka010] var _consumer: Option[InternalKafkaConsumer] = None
// Exposed for testing
@volatile private[consumer] var _consumer: Option[InternalKafkaConsumer] = None
@volatile private var _fetchedData: Option[FetchedData] = None

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.sql.kafka010.consumer

import java.{util => ju}
import java.util.concurrent.TimeUnit
Expand All @@ -29,7 +29,8 @@ import org.jmock.lib.concurrent.DeterministicScheduler
import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ManualClock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.sql.kafka010.consumer

import java.{util => ju}

Expand All @@ -26,7 +26,8 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer

import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.kafka010.{CONSUMER_CACHE_CAPACITY, CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, CONSUMER_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession

class InternalKafkaConsumerPoolSuite extends SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.sql.kafka010.consumer

import java.{util => ju}
import java.nio.charset.StandardCharsets
Expand All @@ -32,7 +32,8 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.{TaskContext, TaskContextImpl}
import org.apache.spark.kafka010.KafkaDelegationTokenTest
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession

class KafkaDataConsumerSuite
Expand Down