Skip to content

Conversation

@jerrypeng
Copy link
Contributor

@jerrypeng jerrypeng commented Oct 25, 2025

What changes were proposed in this pull request?

Add support for Real-time Mode in the Kafka Source. Which means KafkaMicroBatchStream needs to implement the SupportsRealTimeMode interface and the KakfaPartitionBatchReader needs to extend SupportRealTimeRead interface.

Why are the changes needed?

So that Kafka source and sink can be used by Real-time Mode queries

Does this PR introduce any user-facing change?

Yes, Kafka source and sink can be used by Real-time Mode queries

How was this patch tested?

Many tests added

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot removed the CORE label Oct 28, 2025
@jerrypeng jerrypeng changed the title [WIP] [SPARK-54027] Kafka Source RTM support [SPARK-54027] Kafka Source RTM support Oct 28, 2025
)
}

// This function is used by Low Latency Mode, where we expect 1:1 mapping between a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This function is used by Low Latency Mode, where we expect 1:1 mapping between a
// This function is used by real time mode, where we expect 1:1 mapping between a

Comment on lines +119 to +125
if (record.timestampType() == TimestampType.LOG_APPEND_TIME ||
record.timestampType() == TimestampType.CREATE_TIME) {
if (!timestampTypeLogged) {
logInfo(log"Kafka source record timestamp type is " +
log"${MDC(LogKeys.TIMESTAMP_COLUMN_NAME, record.timestampType())}")
timestampTypeLogged = true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain more on this logging behavior? Why we need to do this logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells us the semantics of of the timestamp column from a kafka record. That is, whether timestamp for records from this topic is set to wal append time (when the record is persisted by kafka brokers) or create time which is either when the record is produced by a kafka producer or is user-defined. This information is use when calculating latency to understand what journey we are actually measuring.

// admin function call. But we consider new partition is rare and getting earliest offset
// aligns with what we do in micro-batch mode and can potentially enable more sanity checks
// in executor side.
val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaMicroBatchStream's existing planInputPartitions calls kafkaOffsetReader.getOffsetRangesFromResolvedOffsets to handle partition offsets.

It handles deleted partitions cases but this new planInputPartitions doesn't, should we also do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka doesn't support deleting partitions so I am not sure if that case is worth checking. If the topic was deleted and recreated the offsets which not be valid and we would fail in that case anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is what currently in getOffsetRangesFromResolvedOffsets called by KafkaMicroBatchStream.planInputPartitions:

if (newPartitionInitialOffsets.keySet != newPartitions) {
  // We cannot get from offsets for some partitions. It means they got deleted.
  val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
  reportDataLoss(
    s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
    () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}

The behavior of reportDataLoss is configurable. It can be a failure like what you did here, or a log warning.

I would suggest to follow existing behavior instead of two different behaviors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 285 to 293
// Filter out new partition offsets that are not 0 and log a warning
val nonZeroNewPartitionOffsets = newPartitionOffsets.filter {
case (_, offset) => offset != 0
}
// Log the non-zero new partition offsets
if (nonZeroNewPartitionOffsets.nonEmpty) {
logWarning(log"new partitions should start from offset 0: " +
log"${MDC(OFFSETS, nonZeroNewPartitionOffsets)}")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non zero offset new partitions case, getOffsetRangesFromResolvedOffsets delegates to reportDataLoss closure. Should we do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add that and make the behavior consistent.


case class WaitUntilBatchProcessed(batchId: Long) extends StreamAction with StreamMustBeRunning

case object WaitUntilCurrentBatchProcessed extends StreamAction with StreamMustBeRunning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this? Can't we use WaitUntilBatchProcessed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waits until the current batch finishes. It is just an easier API to use for testing when we just need to wait for the current batch to finish and not for a specific batch to finish.

viirya added a commit that referenced this pull request Oct 30, 2025
### What changes were proposed in this pull request?

This patch extracts the same method `getOffsetRangesFromResolvedOffsets` from two `KafkaOffsetReader` implementations.

### Why are the changes needed?

When reviewing #52729, found that `KafkaOffsetReaderConsumer` and `KafkaOffsetReaderAdmin` have the exactly same `getOffsetRangesFromResolvedOffsets` method. The method is actually long so seems good to extract them to common one.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52788 from viirya/kafkaoffsetreader_refactor.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
@jerrypeng jerrypeng requested a review from viirya October 30, 2025 23:25
// If we are in micro-batch mode, we need to get the latest partition offsets at the
// start of the batch and recalculate the latest offsets at the end for backlog
// estimation.
Some(kafkaOffsetReader.fetchLatestOffsets(Some(latestPartitionOffsets)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes original behavior? Previously it just uses latestPartitionOffsets without fetching latest offsets again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually fixing an issue with non-rtm queries using kafka. The calculation is is not correct here and will always result in the backlog metrics being zero. "latestPartitionOffsets" is calculated at when "latestOffset" is called at the beginning of a batch. It is basically the offset this batch will read up to so for non-rtm streaming queries latestConsumedOffset will be the same as latestPartitionOffsets resulting in zero backlog. What we should be doing is get the latest offsets from source kafka topic after the batch is processed i.e. when metrics() is called to calculate a useful backlog metric. I know this is not really related to RTM so let me know if I should just create a separate PR for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, let's focus on RTM in this PR and don't change/fix existing behavior here. Please open a separate PR to fix it if you think it is an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(
Some(latestConsumedOffset.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets))
val endTime = System.currentTimeMillis()
rtmFetchLatestOffsetsTimeMs = Some(endTime - startTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure if I miss something. rtmFetchLatestOffsetsTimeMs is assigned here, but it is not used anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me remove this variable it is no necessary.

*
* @param startOffsets, the starting positions to read from, inclusive.
*/
def getIterator(offset: Long): KafkaDataConsumerIterator = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The param doc is startOffsets instead of offset.

Suggested change
def getIterator(offset: Long): KafkaDataConsumerIterator = {
def getIterator(startOffsets: Long): KafkaDataConsumerIterator = {

props
}

test("Union two kafka streams, for each write to sink") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to have this test separately in this KafkaRealTimeModeE2ESuite instead KafkaRealTimeModeSuite or KafkaRealTimeIntegrationSuite?

KafkaRealTimeModeE2ESuite and KafkaRealTimeIntegrationSuite look like both e2e test suites. Should we have just one e2e test suite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some difference. KafkaRealTimeModeSuite only has tests that uses StreamTest framework that allows us to perform step wise testing that writes to a memory sink. KafkaRealTimeIntegrationSuite and KafkaRealTimeModeE2ESuite are more E2E in a more realistic setting. KafkaRealTimeIntegrationSuite deploys a multiple worker cluster to run tests. KafkaRealTimeModeE2ESuite deploys a local in process cluster to test s foreach use case. The reason we are not consolidating the two is because it is easier to retrieve results from foreach sink writer if it is in process. Though we can consolidate the two by creating a kafka producer in foreach sink to write to results to kafka. Perhaps as a follow up item I can look into consolidating KafkaRealTimeModeE2ESuite and KafkaRealTimeIntegrationSuite?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Thanks for the explanation. Looks good to me.

* Tests with a distributed spark cluster with
* separate executors processes deployed.
*/
class KafkaRealTimeIntegrationSuite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between this test suite and KafkaRealTimeModeSuite is that this is specially for distributed spark cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I explain in more detail the differences of the test suites here:
#52729 (comment)

*/
class KafkaRealTimeIntegrationSuite
extends KafkaSourceTest
with StreamRealTimeModeSuiteBase
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I saw you created two suite base classes: StreamRealTimeModeE2ESuiteBase and StreamRealTimeModeSuiteBase. This test suite looks like for e2e tests too, why it doesn't use StreamRealTimeModeE2ESuiteBase but StreamRealTimeModeSuiteBase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can it is just this test suite does really need any of the additional functionality provided in StreamRealTimeModeE2ESuiteBase

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Some minor comments.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To catch up tomorrow's cut, since this looks good already, I will merge this today later. If there are some comments are not addressed yet, we can address them in some followups.

cc @HeartSaVioR

Comment on lines 360 to 362
// If we are in micro-batch mode, we need to get the latest partition offsets at the
// start of the batch and recalculate the latest offsets at the end for backlog
// estimation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not needed right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revert this code based on this thread:
https://github.com/apache/spark/pull/52729/files#r2482113567

So this is not needed

@viirya viirya closed this in 928f253 Nov 1, 2025
@viirya
Copy link
Member

viirya commented Nov 1, 2025

Merged to master branch.

Thanks @jerrypeng

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants