Skip to content

Conversation

@wypoon
Copy link
Contributor

@wypoon wypoon commented Feb 13, 2025

This fixes the TODO in #4479.
Use the ReadLimit passed in to SparkMicroBatchStream::latestOffset(Offset, ReadLimit). In testing this, a bug was found in SparkMicroBatchStream::getDefaultReadLimit() and fixed.

Use the ReadLimit passed in to SparkMicroBatchStream::latestOffset.
In addition, fix a bug.
@github-actions github-actions bot added the spark label Feb 13, 2025
Comment on lines -461 to +505
readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for catching ! This got missed, as we don't take the Readlimit we get from latestOffset API but rather from the configs which are set in constructor earlier!

@TestTemplate
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
throws Exception {
public void testReadStreamWithMaxFiles1() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed a few tests to be more concise. The old names were unwieldy and also not conforming to Java style.

Comment on lines +227 to +232
assertThat(
microBatchCount(
ImmutableMap.of(
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
.isEqualTo(6);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails without the fix to SparkMicroBatchStream::getDefaultReadLimit(), as Spark then calls SparkMicroBatchStream::latestOffset(Offset, ReadLimit) with a CompositeReadLimit where one of the ReadLimits is a ReadMaxRows(1).

@wypoon
Copy link
Contributor Author

wypoon commented Feb 14, 2025

@singhpk234 @jackye1995 @RussellSpitzer this is a small fix; can you please review?

Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM with a minor suggestion, Thanks @wypoon !

Comment on lines 325 to 330
for (int i = 0; i < limits.length; i++) {
ReadLimit limit = limits[i];
if (limit instanceof ReadMaxFiles) {
return ((ReadMaxFiles) limit).maxFiles();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] can we use this ?

Suggested change
for (int i = 0; i < limits.length; i++) {
ReadLimit limit = limits[i];
if (limit instanceof ReadMaxFiles) {
return ((ReadMaxFiles) limit).maxFiles();
}
}
for (ReadLimit limit: limits) {
if (limit instanceof ReadMaxFiles) {
return ((ReadMaxFiles) limit).maxFiles();
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted.

Comment on lines -461 to +505
readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for catching ! This got missed, as we don't take the Readlimit we get from latestOffset API but rather from the configs which are set in constructor earlier!

@wypoon
Copy link
Contributor Author

wypoon commented Feb 14, 2025

Thanks @singhpk234.

Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks @wypoon !

@wypoon
Copy link
Contributor Author

wypoon commented Feb 18, 2025

@jackye1995 @RussellSpitzer can you please review?

@wypoon
Copy link
Contributor Author

wypoon commented Mar 6, 2025

@szehon-ho @aokolnychyi would you mind reviewing this?

@wypoon
Copy link
Contributor Author

wypoon commented Mar 12, 2025

@RussellSpitzer would you mind reviewing this when you have some time? It is a small change which @singhpk234 has already reviewed and approved.

@wypoon
Copy link
Contributor Author

wypoon commented Mar 25, 2025

@huaxingao would you mind reviewing this, since you're a Spark expert? It's a small change.

@wypoon
Copy link
Contributor Author

wypoon commented Apr 21, 2025

@RussellSpitzer @huaxingao can you please review this?

Copy link
Contributor

@sririshindra sririshindra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@huaxingao
Copy link
Contributor

@wypoon Thanks for the PR — the changes look good to me. I have a question about the tests. It seems that a test like testReadStreamWithMaxRows2() would pass with both the original implementation (using readConf.maxRecordsPerMicroBatch()) and the new logic (using getMaxRows(readLimit)), since both return the same static value.
Shall we add a test that would fail under the old implementation but pass with the new one?

@wypoon
Copy link
Contributor Author

wypoon commented May 17, 2025

Hi @huaxingao, thank you for reviewing this!
You are correct that the tests would pass with the original implementation, except for testReadStreamWithCompositeReadLimit, which would fail due to the bug in SparkMicroBatchStream::getDefaultReadLimit().
As I understand it, in SparkMicroBatchStream, we implement getDefaultReadLimit() (using what is set by configuration options), and Spark calls latestOffset(Offset, ReadLimit) with that ReadLimit. In principle, Spark can call latestOffset(Offset, ReadLimit) with any ReadLimit and SparkMicroBatchStream should respond according to that ReadLimit, but if Spark calls latestOffset(Offset, ReadLimit) with the ReadLimit given by getDefaultReadLimit(), then there is no difference in behavior between the original implementation (where we ignore the ReadLimit passed in and just use the ReadLimit corresponding to the configuration options) and the one in this PR. However, technically, we should not assume that and instead use the ReadLimit passed in.
Do you know if and how Spark would pass in a different ReadLimit than what is returned by getDefaultReadLimit()?

@huaxingao
Copy link
Contributor

@wypoon I looked into the Spark side and didn’t see a way to dynamically change the ReadLimit. So it seems there’s no easy way to write a regression test that fails without the fix and passes with it. I’ll go ahead and approve the PR and leave it open for a couple of days in case anyone else wants to review it.

@wypoon
Copy link
Contributor Author

wypoon commented May 19, 2025

Thanks @huaxingao!
This PR predates the Spark 4.0 support and I opened #13095 to port it to Spark 4.0, rather than update this PR.

@wypoon wypoon changed the title Spark: Structured Streaming read limit support follow-up Spark 3.5: Structured Streaming read limit support follow-up May 19, 2025
@huaxingao huaxingao merged commit 8a38f5a into apache:main May 20, 2025
31 checks passed
@huaxingao
Copy link
Contributor

Merged. Thanks @wypoon for the PR! Thanks @singhpk234 @sririshindra for reviewing!

pvary pushed a commit that referenced this pull request May 21, 2025
devendra-nr pushed a commit to devendra-nr/iceberg that referenced this pull request Dec 8, 2025
…12260)

* Spark: Structured Streaming read limit support follow-up

Use the ReadLimit passed in to SparkMicroBatchStream::latestOffset.
In addition, fix a bug.

* Use enhanced for loop.
devendra-nr pushed a commit to devendra-nr/iceberg that referenced this pull request Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants