-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.5: Structured Streaming read limit support follow-up #12260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -154,8 +154,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception | |
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() | ||
| throws Exception { | ||
| public void testReadStreamWithMaxFiles1() throws Exception { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed a few tests to be more concise. The old names were unwieldy and also not conforming to Java style. |
||
| appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); | ||
|
|
||
| assertThat( | ||
|
|
@@ -165,8 +164,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_ | |
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() | ||
| throws Exception { | ||
| public void testReadStreamWithMaxFiles2() throws Exception { | ||
| appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); | ||
|
|
||
| assertThat( | ||
|
|
@@ -176,8 +174,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_ | |
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() | ||
| throws Exception { | ||
| public void testReadStreamWithMaxRows1() throws Exception { | ||
| appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); | ||
|
|
||
| // only 1 micro-batch will be formed and we will read data partially | ||
|
|
@@ -186,7 +183,8 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1 | |
| ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"))) | ||
| .isEqualTo(1); | ||
|
|
||
| StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"); | ||
| StreamingQuery query = | ||
| startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")); | ||
|
|
||
| // check answer correctness only 1 record read the micro-batch will be stuck | ||
| List<SimpleRecord> actual = rowsAvailable(query); | ||
|
|
@@ -196,8 +194,24 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1 | |
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() | ||
| throws Exception { | ||
| public void testReadStreamWithMaxRows2() throws Exception { | ||
| appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); | ||
|
|
||
| assertThat( | ||
| microBatchCount( | ||
| ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"))) | ||
| .isEqualTo(4); | ||
|
|
||
| StreamingQuery query = | ||
| startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")); | ||
|
|
||
| List<SimpleRecord> actual = rowsAvailable(query); | ||
| assertThat(actual) | ||
| .containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS)); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamWithMaxRows4() throws Exception { | ||
| appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); | ||
|
|
||
| assertThat( | ||
|
|
@@ -206,6 +220,18 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4 | |
| .isEqualTo(2); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamWithCompositeReadLimit() throws Exception { | ||
| appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); | ||
|
|
||
| assertThat( | ||
| microBatchCount( | ||
| ImmutableMap.of( | ||
| SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1", | ||
| SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"))) | ||
| .isEqualTo(6); | ||
|
Comment on lines
+227
to
+232
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fails without the fix to |
||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testReadStreamOnIcebergThenAddData() throws Exception { | ||
| List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for catching ! This got missed, as we don't take the Readlimit we get from latestOffset API but rather from the configs which are set in constructor earlier!