Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ private[kafka010] case class CachedKafkaConsumer private(

private var consumer = createConsumer

/** indicates whether this consumer is in use or not */
private var inuse = true

/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
private var nextOffsetInFetchedData = UNKNOWN_OFFSET
Expand All @@ -57,6 +60,20 @@ private[kafka010] case class CachedKafkaConsumer private(
c
}

case class AvailableOffsetRange(earliest: Long, latest: Long)

/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
val latestOffset = consumer.position(topicPartition)
AvailableOffsetRange(earliestOffset, latestOffset)
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
Expand Down Expand Up @@ -107,9 +124,9 @@ private[kafka010] case class CachedKafkaConsumer private(
* `UNKNOWN_OFFSET`.
*/
private def getEarliestAvailableOffsetBetween(offset: Long, untilOffset: Long): Long = {
val (earliestOffset, latestOffset) = getAvailableOffsetRange()
logWarning(s"Some data may be lost. Recovering from the earliest offset: $earliestOffset")
if (offset >= latestOffset || earliestOffset >= untilOffset) {
val range = getAvailableOffsetRange()
logWarning(s"Some data may be lost. Recovering from the earliest offset: ${range.earliest}")
if (offset >= range.latest || range.earliest >= untilOffset) {
// [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap,
// either
// --------------------------------------------------------
Expand All @@ -124,13 +141,13 @@ private[kafka010] case class CachedKafkaConsumer private(
// offset untilOffset earliestOffset latestOffset
val warningMessage =
s"""
|The current available offset range is [$earliestOffset, $latestOffset).
|The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, $untilOffset) will be
| skipped ${additionalMessage(failOnDataLoss = false)}
""".stripMargin
logWarning(warningMessage)
UNKNOWN_OFFSET
} else if (offset >= earliestOffset) {
} else if (offset >= range.earliest) {
// -----------------------------------------------------------------------------
// ^ ^ ^ ^
// | | | |
Expand All @@ -149,12 +166,12 @@ private[kafka010] case class CachedKafkaConsumer private(
// offset earliestOffset min(untilOffset,latestOffset) max(untilOffset, latestOffset)
val warningMessage =
s"""
|The current available offset range is [$earliestOffset, $latestOffset).
| Offset ${offset} is out of range, and records in [$offset, $earliestOffset) will be
|The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be
| skipped ${additionalMessage(failOnDataLoss = false)}
""".stripMargin
logWarning(warningMessage)
earliestOffset
range.earliest
}
}

Expand Down Expand Up @@ -183,8 +200,8 @@ private[kafka010] case class CachedKafkaConsumer private(
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
val (earliestOffset, latestOffset) = getAvailableOffsetRange()
if (offset < earliestOffset || offset >= latestOffset) {
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else {
Expand Down Expand Up @@ -284,18 +301,6 @@ private[kafka010] case class CachedKafkaConsumer private(
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
fetchedData = r.iterator
}

/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
private def getAvailableOffsetRange(): (Long, Long) = {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
val latestOffset = consumer.position(topicPartition)
(earliestOffset, latestOffset)
}
}

private[kafka010] object CachedKafkaConsumer extends Logging {
Expand All @@ -310,7 +315,7 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
if (this.size > capacity) {
if (entry.getValue.inuse == false && this.size > capacity) {
logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
try {
Expand All @@ -327,6 +332,43 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
}
}

def releaseKafkaConsumer(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): Unit = {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)

synchronized {
val consumer = cache.get(key)
if (consumer != null) {
consumer.inuse = false
} else {
logWarning(s"Attempting to release consumer that does not exist")
}
}
}

/**
* Removes (and closes) the Kafka Consumer for the given topic, partition and group id.
*/
def removeKafkaConsumer(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): Unit = {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)

synchronized {
val removedConsumer = cache.remove(key)
if (removedConsumer != null) {
removedConsumer.close()
}
}
}

/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
Expand All @@ -342,16 +384,18 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) {
val removedConsumer = cache.remove(key)
if (removedConsumer != null) {
removedConsumer.close()
}
new CachedKafkaConsumer(topicPartition, kafkaParams)
removeKafkaConsumer(topic, partition, kafkaParams)
val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
consumer.inuse = true
cache.put(key, consumer)
consumer
} else {
if (!cache.containsKey(key)) {
cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
}
cache.get(key)
val consumer = cache.get(key)
consumer.inuse = true
consumer
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition

/**
* Subscribe allows you to subscribe to a fixed collection of topics.
* SubscribePattern allows you to use a regex to specify topics of interest.
* Note that unlike the 0.8 integration, using Subscribe or SubscribePattern
* should respond to adding partitions during a running stream.
* Finally, Assign allows you to specify a fixed collection of partitions.
* All three strategies have overloaded constructors that allow you to specify
* the starting offset for a particular partition.
*/
sealed trait ConsumerStrategy {
/** Create a [[KafkaConsumer]] and subscribe to topics according to a desired strategy */
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}

/**
* Specify a fixed collection of partitions.
*/
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.assign(ju.Arrays.asList(partitions: _*))
consumer
}

override def toString: String = s"Assign[${partitions.mkString(", ")}]"
}

/**
* Subscribe to a fixed collection of topics.
*/
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.subscribe(topics.asJava)
consumer
}

override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
}

/**
* Use a regex to specify topics of interest.
*/
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.subscribe(
ju.regex.Pattern.compile(topicPattern),
new NoOpConsumerRebalanceListener())
consumer
}

override def toString: String = s"SubscribePattern[$topicPattern]"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

/**
* Objects that represent desired offset range limits for starting,
* ending, and specific offsets.
*/
private[kafka010] sealed trait KafkaOffsetRangeLimit

/**
* Represents the desire to bind to the earliest offsets in Kafka
*/
private[kafka010] case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
* Represents the desire to bind to the latest offsets in Kafka
*/
private[kafka010] case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
* Represents the desire to bind to specific offsets. A offset == -1 binds to the
* latest offset, and offset == -2 binds to the earliest offset.
*/
private[kafka010] case class SpecificOffsetRangeLimit(
partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

private[kafka010] object KafkaOffsetRangeLimit {
/**
* Used to denote offset range limits that are resolved via Kafka
*/
val LATEST = -1L // indicates resolution to the latest offset
val EARLIEST = -2L // indicates resolution to the earliest offset
}
Loading