Skip to content

Commit 1911c94

Browse files
authored
Spark 4.0: Structured Streaming read limit support follow-up (#13095)
1 parent e139dbc commit 1911c94

File tree

2 files changed

+82
-14
lines changed

2 files changed

+82
-14
lines changed

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,12 @@
5858
import org.apache.spark.broadcast.Broadcast;
5959
import org.apache.spark.sql.connector.read.InputPartition;
6060
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
61+
import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
6162
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
6263
import org.apache.spark.sql.connector.read.streaming.Offset;
6364
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
65+
import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
66+
import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
6467
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
6568
import org.slf4j.Logger;
6669
import org.slf4j.LoggerFactory;
@@ -309,6 +312,47 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim
309312
}
310313
}
311314

315+
private static int getMaxFiles(ReadLimit readLimit) {
316+
if (readLimit instanceof ReadMaxFiles) {
317+
return ((ReadMaxFiles) readLimit).maxFiles();
318+
}
319+
320+
if (readLimit instanceof CompositeReadLimit) {
321+
// We do not expect a CompositeReadLimit to contain a nested CompositeReadLimit.
322+
// In fact, it should only be a composite of two or more of ReadMinRows, ReadMaxRows and
323+
// ReadMaxFiles, with no more than one of each.
324+
ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
325+
for (ReadLimit limit : limits) {
326+
if (limit instanceof ReadMaxFiles) {
327+
return ((ReadMaxFiles) limit).maxFiles();
328+
}
329+
}
330+
}
331+
332+
// there is no ReadMaxFiles, so return the default
333+
return Integer.MAX_VALUE;
334+
}
335+
336+
private static int getMaxRows(ReadLimit readLimit) {
337+
if (readLimit instanceof ReadMaxRows) {
338+
long maxRows = ((ReadMaxRows) readLimit).maxRows();
339+
return Math.toIntExact(maxRows);
340+
}
341+
342+
if (readLimit instanceof CompositeReadLimit) {
343+
ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
344+
for (ReadLimit limit : limits) {
345+
if (limit instanceof ReadMaxRows) {
346+
long maxRows = ((ReadMaxRows) limit).maxRows();
347+
return Math.toIntExact(maxRows);
348+
}
349+
}
350+
}
351+
352+
// there is no ReadMaxRows, so return the default
353+
return Integer.MAX_VALUE;
354+
}
355+
312356
@Override
313357
@SuppressWarnings("checkstyle:CyclomaticComplexity")
314358
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
@@ -368,10 +412,8 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
368412
while (taskIter.hasNext()) {
369413
FileScanTask task = taskIter.next();
370414
if (curPos >= startPosOfSnapOffset) {
371-
// TODO : use readLimit provided in function param, the readLimits are derived from
372-
// these 2 properties.
373-
if ((curFilesAdded + 1) > maxFilesPerMicroBatch
374-
|| (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) {
415+
if ((curFilesAdded + 1) > getMaxFiles(limit)
416+
|| (curRecordCount + task.file().recordCount()) > getMaxRows(limit)) {
375417
shouldContinueReading = false;
376418
break;
377419
}
@@ -458,7 +500,7 @@ public ReadLimit getDefaultReadLimit() {
458500
&& maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
459501
ReadLimit[] readLimits = new ReadLimit[2];
460502
readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
461-
readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
503+
readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);
462504
return ReadLimit.compositeLimit(readLimits);
463505
} else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
464506
return ReadLimit.maxFiles(maxFilesPerMicroBatch);

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception
154154
}
155155

156156
@TestTemplate
157-
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
158-
throws Exception {
157+
public void testReadStreamWithMaxFiles1() throws Exception {
159158
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
160159

161160
assertThat(
@@ -165,8 +164,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_
165164
}
166165

167166
@TestTemplate
168-
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2()
169-
throws Exception {
167+
public void testReadStreamWithMaxFiles2() throws Exception {
170168
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
171169

172170
assertThat(
@@ -176,8 +174,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_
176174
}
177175

178176
@TestTemplate
179-
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1()
180-
throws Exception {
177+
public void testReadStreamWithMaxRows1() throws Exception {
181178
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
182179

183180
// only 1 micro-batch will be formed and we will read data partially
@@ -186,7 +183,8 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1
186183
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
187184
.isEqualTo(1);
188185

189-
StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
186+
StreamingQuery query =
187+
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"));
190188

191189
// check answer correctness only 1 record read the micro-batch will be stuck
192190
List<SimpleRecord> actual = rowsAvailable(query);
@@ -196,8 +194,24 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1
196194
}
197195

198196
@TestTemplate
199-
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4()
200-
throws Exception {
197+
public void testReadStreamWithMaxRows2() throws Exception {
198+
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
199+
200+
assertThat(
201+
microBatchCount(
202+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
203+
.isEqualTo(4);
204+
205+
StreamingQuery query =
206+
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"));
207+
208+
List<SimpleRecord> actual = rowsAvailable(query);
209+
assertThat(actual)
210+
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
211+
}
212+
213+
@TestTemplate
214+
public void testReadStreamWithMaxRows4() throws Exception {
201215
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
202216

203217
assertThat(
@@ -206,6 +220,18 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4
206220
.isEqualTo(2);
207221
}
208222

223+
@TestTemplate
224+
public void testReadStreamWithCompositeReadLimit() throws Exception {
225+
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
226+
227+
assertThat(
228+
microBatchCount(
229+
ImmutableMap.of(
230+
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
231+
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
232+
.isEqualTo(6);
233+
}
234+
209235
@TestTemplate
210236
public void testReadStreamOnIcebergThenAddData() throws Exception {
211237
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;

0 commit comments

Comments
 (0)