Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ public enum LogKeys implements LogKey {
TIMEOUT,
TIMER,
TIMESTAMP,
TIMESTAMP_COLUMN_NAME,
TIME_UNITS,
TIP,
TOKEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.kafka.common.record.TimestampType

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.SupportsRealTimeRead
import org.apache.spark.sql.connector.read.streaming.SupportsRealTimeRead.RecordStatus
import org.apache.spark.sql.execution.streaming.runtime.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
import org.apache.spark.sql.kafka010.consumer.{KafkaDataConsumer, KafkaDataConsumerIterator}

/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
private[kafka010] case class KafkaBatchInputPartition(
Expand Down Expand Up @@ -67,7 +71,8 @@ private case class KafkaBatchPartitionReader(
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
includeHeaders: Boolean) extends PartitionReader[InternalRow] with Logging {
includeHeaders: Boolean)
extends SupportsRealTimeRead[InternalRow] with Logging {

private val consumer = KafkaDataConsumer.acquire(offsetRange.topicPartition, executorKafkaParams)

Expand All @@ -77,6 +82,12 @@ private case class KafkaBatchPartitionReader(

private var nextOffset = rangeToRead.fromOffset
private var nextRow: UnsafeRow = _
private var iteratorForRealTimeMode: Option[KafkaDataConsumerIterator] = None

// Boolean flag that indicates whether we have logged the type of timestamp (i.e. create time,
// log-append time, etc.) for the Kafka source. We log upon reading the first record, and we
// then skip logging for subsequent records.
private var timestampTypeLogged = false

override def next(): Boolean = {
if (nextOffset < rangeToRead.untilOffset) {
Expand All @@ -93,6 +104,38 @@ private case class KafkaBatchPartitionReader(
}
}

override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
if (!iteratorForRealTimeMode.isDefined) {
logInfo(s"Getting a new kafka consuming iterator for ${offsetRange.topicPartition} " +
s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
iteratorForRealTimeMode = Some(consumer.getIterator(nextOffset))
}
assert(iteratorForRealTimeMode.isDefined)
val nextRecord = iteratorForRealTimeMode.get.nextWithTimeout(timeoutMs)
nextRecord.foreach { record =>

nextRow = unsafeRowProjector(record)
nextOffset = record.offset + 1
if (record.timestampType() == TimestampType.LOG_APPEND_TIME ||
record.timestampType() == TimestampType.CREATE_TIME) {
if (!timestampTypeLogged) {
logInfo(log"Kafka source record timestamp type is " +
log"${MDC(LogKeys.TIMESTAMP_COLUMN_NAME, record.timestampType())}")
timestampTypeLogged = true
}
Comment on lines +119 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain more on this logging behavior? Why we need to do this logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells us the semantics of of the timestamp column from a kafka record. That is, whether timestamp for records from this topic is set to wal append time (when the record is persisted by kafka brokers) or create time which is either when the record is produced by a kafka producer or is user-defined. This information is use when calculating latency to understand what journey we are actually measuring.


RecordStatus.newStatusWithArrivalTimeMs(record.timestamp())
} else {
RecordStatus.newStatusWithoutArrivalTime(true)
}
}
RecordStatus.newStatusWithoutArrivalTime(nextRecord.isDefined)
}

override def getOffset(): KafkaSourcePartitionOffset = {
KafkaSourcePartitionOffset(offsetRange.topicPartition, nextOffset)
}

override def get(): UnsafeRow = {
assert(nextRow != null)
nextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{ERROR, OFFSETS, TIP}
import org.apache.spark.internal.LogKeys.{ERROR, OFFSETS, TIP, TOPIC_PARTITION_OFFSET}
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
Expand Down Expand Up @@ -60,7 +60,11 @@ private[kafka010] class KafkaMicroBatchStream(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends SupportsTriggerAvailableNow with ReportsSourceMetrics with MicroBatchStream with Logging {
extends SupportsTriggerAvailableNow
with SupportsRealTimeMode
with ReportsSourceMetrics
with MicroBatchStream
with Logging {

private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
Expand Down Expand Up @@ -93,6 +97,11 @@ private[kafka010] class KafkaMicroBatchStream(

private var isTriggerAvailableNow: Boolean = false

private var inRealTimeMode = false
override def prepareForRealTimeMode(): Unit = {
inRealTimeMode = true
}

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand Down Expand Up @@ -218,6 +227,107 @@ private[kafka010] class KafkaMicroBatchStream(
}.toArray
}

override def planInputPartitions(start: Offset): Array[InputPartition] = {
// This function is used for real time mode. Trigger restrictions won't be supported.
if (maxOffsetsPerTrigger.isDefined) {
throw new UnsupportedOperationException(
"maxOffsetsPerTrigger is not compatible with real time mode")
}
if (minOffsetPerTrigger.isDefined) {
throw new UnsupportedOperationException(
"minOffsetsPerTrigger is not compatible with real time mode"
)
}
if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
throw new UnsupportedOperationException(
"minpartitions is not compatible with real time mode"
)
}
if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
throw new UnsupportedOperationException(
"endingtimestamp is not compatible with real time mode"
)
}
if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
throw new UnsupportedOperationException(
"maxtriggerdelay is not compatible with real time mode"
)
}

// This function is used by Real-time Mode, where we expect 1:1 mapping between a
// topic partition and an input partition.
// We are skipping partition range check for performance reason. We can always try to do
// it in tasks if needed.
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets

// Here we check previous topic partitions with latest partition offsets to see if we need to
// update the partition list. Here we don't need the updated partition topic to be absolutely
// up to date, because there might already be minutes' delay since new partition is created.
// latestPartitionOffsets should be fetched not long ago anyway.
// If the topic partitions change, we fetch the earliest offsets for all new partitions
// and add them to the list.
assert(latestPartitionOffsets != null, "latestPartitionOffsets should be set in latestOffset")
val latestTopicPartitions = latestPartitionOffsets.keySet
val newStartPartitionOffsets = if (startPartitionOffsets.keySet == latestTopicPartitions) {
startPartitionOffsets
} else {
val newPartitions = latestTopicPartitions.diff(startPartitionOffsets.keySet)
// Instead of fetching earliest offsets, we could fill offset 0 here and avoid this extra
// admin function call. But we consider new partition is rare and getting earliest offset
// aligns with what we do in micro-batch mode and can potentially enable more sanity checks
// in executor side.
val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaMicroBatchStream's existing planInputPartitions calls kafkaOffsetReader.getOffsetRangesFromResolvedOffsets to handle partition offsets.

It handles deleted partitions cases but this new planInputPartitions doesn't, should we also do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka doesn't support deleting partitions so I am not sure if that case is worth checking. If the topic was deleted and recreated the offsets which not be valid and we would fail in that case anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is what currently in getOffsetRangesFromResolvedOffsets called by KafkaMicroBatchStream.planInputPartitions:

if (newPartitionInitialOffsets.keySet != newPartitions) {
  // We cannot get from offsets for some partitions. It means they got deleted.
  val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
  reportDataLoss(
    s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
    () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}

The behavior of reportDataLoss is configurable. It can be a failure like what you did here, or a log warning.

I would suggest to follow existing behavior instead of two different behaviors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


assert(
newPartitionOffsets.keys.forall(!startPartitionOffsets.contains(_)),
"startPartitionOffsets should not contain any key in newPartitionOffsets")

logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionOffsets)}")
// Filter out new partition offsets that are not 0 and log a warning
val nonZeroNewPartitionOffsets = newPartitionOffsets.filter {
case (_, offset) => offset != 0
}
// Log the non-zero new partition offsets
if (nonZeroNewPartitionOffsets.nonEmpty) {
logWarning(log"new partitions should start from offset 0: " +
log"${MDC(OFFSETS, nonZeroNewPartitionOffsets)}")
nonZeroNewPartitionOffsets.foreach {
case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
}
}

val deletedPartitions = startPartitionOffsets.keySet.diff(latestTopicPartitions)
if (deletedPartitions.nonEmpty) {
reportDataLoss(
s"$deletedPartitions are gone. Some data may have been missed",
() =>
KafkaExceptions.partitionsDeleted(deletedPartitions, None))
}

startPartitionOffsets ++ newPartitionOffsets
}

newStartPartitionOffsets.keySet.toSeq.map { tp =>
val fromOffset = newStartPartitionOffsets(tp)
KafkaBatchInputPartition(
KafkaOffsetRange(tp, fromOffset, Long.MaxValue, preferredLoc = None),
executorKafkaParams,
pollTimeoutMs,
failOnDataLoss,
includeHeaders)
}.toArray
}

override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
val mergedMap = offsets.map {
case KafkaSourcePartitionOffset(p, o) => (p, o)
}.toMap
KafkaSourceOffset(mergedMap)
}

override def createReaderFactory(): PartitionReaderFactory = {
KafkaBatchReaderFactory
}
Expand All @@ -235,7 +345,22 @@ private[kafka010] class KafkaMicroBatchStream(
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"

override def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
KafkaMicroBatchStream.metrics(latestConsumedOffset, latestPartitionOffsets)
val reCalculatedLatestPartitionOffsets =
if (inRealTimeMode) {
if (!latestConsumedOffset.isPresent) {
// this means a batch has no end offsets, which should not happen
None
} else {
Some {
kafkaOffsetReader.fetchLatestOffsets(
Some(latestConsumedOffset.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets))
}
}
} else {
Some(latestPartitionOffsets)
}

KafkaMicroBatchStream.metrics(latestConsumedOffset, reCalculatedLatestPartitionOffsets)
}

/**
Expand Down Expand Up @@ -386,13 +511,14 @@ object KafkaMicroBatchStream extends Logging {
*/
def metrics(
latestConsumedOffset: Optional[Offset],
latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
latestAvailablePartitionOffsets: Option[PartitionOffsetMap]): ju.Map[String, String] = {
val offset = Option(latestConsumedOffset.orElse(null))

if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
if (offset.nonEmpty && latestAvailablePartitionOffsets.isDefined) {
val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
val offsetsBehindLatest = latestAvailablePartitionOffsets
.map(partitionOffset => partitionOffset._2 - consumedPartitionOffsets(partitionOffset._1))
val offsetsBehindLatest = latestAvailablePartitionOffsets.get
.map(partitionOffset => partitionOffset._2 -
consumedPartitionOffsets.getOrElse(partitionOffset._1, 0L))
if (offsetsBehindLatest.nonEmpty) {
val avgOffsetBehindLatest = offsetsBehindLatest.sum.toDouble / offsetsBehindLatest.size
return Map[String, String](
Expand Down
Loading