Skip to content

Commit f154daf

Browse files
authored
Core: Support incremental Scan in RESTCatalogAdapter for RemoteScanPlanning (#14661)
1 parent 9c1dd3b commit f154daf

File tree

2 files changed

+73
-28
lines changed

2 files changed

+73
-28
lines changed

core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
import org.apache.iceberg.BaseTable;
4343
import org.apache.iceberg.BaseTransaction;
4444
import org.apache.iceberg.FileScanTask;
45+
import org.apache.iceberg.IncrementalAppendScan;
4546
import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
4647
import org.apache.iceberg.PartitionSpec;
48+
import org.apache.iceberg.Scan;
4749
import org.apache.iceberg.Schema;
4850
import org.apache.iceberg.SortOrder;
4951
import org.apache.iceberg.Table;
@@ -641,29 +643,40 @@ public static PlanTableScanResponse planTableScan(
641643
Catalog catalog,
642644
TableIdentifier ident,
643645
PlanTableScanRequest request,
644-
Predicate<TableScan> shouldPlanAsync,
645-
ToIntFunction<TableScan> tasksPerPlanTask) {
646+
Predicate<Scan<?, FileScanTask, ?>> shouldPlanAsync,
647+
ToIntFunction<Scan<?, FileScanTask, ?>> tasksPerPlanTask) {
646648
Table table = catalog.loadTable(ident);
647-
TableScan tableScan = table.newScan();
649+
// Configure the appropriate scan type
650+
Scan<?, FileScanTask, ?> configuredScan;
651+
652+
if (request.startSnapshotId() != null && request.endSnapshotId() != null) {
653+
// Incremental append scan for reading changes between snapshots
654+
IncrementalAppendScan incrementalScan =
655+
table
656+
.newIncrementalAppendScan()
657+
.fromSnapshotInclusive(request.startSnapshotId())
658+
.toSnapshot(request.endSnapshotId());
659+
660+
configuredScan = configureScan(incrementalScan, request);
661+
} else {
662+
// Regular table scan at a specific snapshot
663+
TableScan tableScan = table.newScan();
648664

649-
if (request.snapshotId() != null) {
650-
tableScan = tableScan.useSnapshot(request.snapshotId());
651-
}
652-
if (request.select() != null) {
653-
tableScan = tableScan.select(request.select());
654-
}
655-
if (request.filter() != null) {
656-
tableScan = tableScan.filter(request.filter());
657-
}
658-
if (request.statsFields() != null) {
659-
tableScan = tableScan.includeColumnStats(request.statsFields());
660-
}
665+
if (request.snapshotId() != null) {
666+
tableScan = tableScan.useSnapshot(request.snapshotId());
667+
}
661668

662-
tableScan = tableScan.caseSensitive(request.caseSensitive());
669+
// Apply filters and projections using common method
670+
configuredScan = configureScan(tableScan, request);
671+
}
663672

664-
if (shouldPlanAsync.test(tableScan)) {
673+
if (shouldPlanAsync.test(configuredScan)) {
665674
String asyncPlanId = "async-" + UUID.randomUUID();
666-
asyncPlanFiles(tableScan, asyncPlanId, tasksPerPlanTask.applyAsInt(tableScan));
675+
asyncPlanFiles(
676+
configuredScan,
677+
asyncPlanId,
678+
table.uuid().toString(),
679+
tasksPerPlanTask.applyAsInt(configuredScan));
667680
return PlanTableScanResponse.builder()
668681
.withPlanId(asyncPlanId)
669682
.withPlanStatus(PlanStatus.SUBMITTED)
@@ -672,7 +685,11 @@ public static PlanTableScanResponse planTableScan(
672685
}
673686

674687
String planId = "sync-" + UUID.randomUUID();
675-
planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan));
688+
planFilesFor(
689+
configuredScan,
690+
planId,
691+
table.uuid().toString(),
692+
tasksPerPlanTask.applyAsInt(configuredScan));
676693
Pair<List<FileScanTask>, String> initial = IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
677694
return PlanTableScanResponse.builder()
678695
.withPlanStatus(PlanStatus.COMPLETED)
@@ -757,21 +774,48 @@ static void clearPlanningState() {
757774
InMemoryPlanningState.getInstance().clear();
758775
}
759776

777+
/**
778+
* Applies filters, projections, and other scan configurations from the request to the scan.
779+
*
780+
* @param scan the scan to configure
781+
* @param request the plan table scan request containing filters and projections
782+
* @param <T> the specific scan type (TableScan, IncrementalAppendScan, etc.)
783+
* @return the configured scan with filters and projections applied
784+
*/
785+
private static <T extends Scan<T, FileScanTask, ?>> T configureScan(
786+
T scan, PlanTableScanRequest request) {
787+
T configuredScan = scan;
788+
789+
if (request.select() != null) {
790+
configuredScan = configuredScan.select(request.select());
791+
}
792+
if (request.filter() != null) {
793+
configuredScan = configuredScan.filter(request.filter());
794+
}
795+
if (request.statsFields() != null) {
796+
configuredScan = configuredScan.includeColumnStats(request.statsFields());
797+
}
798+
configuredScan = configuredScan.caseSensitive(request.caseSensitive());
799+
800+
return configuredScan;
801+
}
802+
760803
/**
761804
* Plans file scan tasks for a table scan, grouping them into plan tasks for pagination.
762805
*
763-
* @param tableScan the table scan to plan
806+
* @param scan the table scan to plan files for
764807
* @param planId the unique identifier for this plan
808+
* @param tableId the uuid of the table being scanned
765809
* @param tasksPerPlanTask number of file scan tasks to group per plan task
766810
*/
767-
private static void planFilesFor(TableScan tableScan, String planId, int tasksPerPlanTask) {
811+
private static void planFilesFor(
812+
Scan<?, FileScanTask, ?> scan, String planId, String tableId, int tasksPerPlanTask) {
768813
Iterable<List<FileScanTask>> taskGroupings =
769-
Iterables.partition(tableScan.planFiles(), tasksPerPlanTask);
814+
Iterables.partition(scan.planFiles(), tasksPerPlanTask);
770815
int planTaskSequence = 0;
771816
String previousPlanTask = null;
772817
for (List<FileScanTask> taskGrouping : taskGroupings) {
773-
String planTaskKey =
774-
String.format("%s-%s-%s", planId, tableScan.table().uuid(), planTaskSequence++);
818+
String planTaskKey = String.format("%s-%s-%s", planId, tableId, planTaskSequence++);
775819
IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
776820
if (previousPlanTask != null) {
777821
IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, planTaskKey);
@@ -783,11 +827,11 @@ private static void planFilesFor(TableScan tableScan, String planId, int tasksPe
783827

784828
@SuppressWarnings("FutureReturnValueIgnored")
785829
private static void asyncPlanFiles(
786-
TableScan tableScan, String asyncPlanId, int tasksPerPlanTask) {
830+
Scan<?, FileScanTask, ?> scan, String asyncPlanId, String tableId, int tasksPerPlanTask) {
787831
IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
788832
CompletableFuture.runAsync(
789833
() -> {
790-
planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask);
834+
planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask);
791835
},
792836
ASYNC_PLANNING_POOL)
793837
.whenComplete(

core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
import org.apache.http.HttpHeaders;
3232
import org.apache.iceberg.BaseTable;
3333
import org.apache.iceberg.BaseTransaction;
34+
import org.apache.iceberg.FileScanTask;
35+
import org.apache.iceberg.Scan;
3436
import org.apache.iceberg.Table;
35-
import org.apache.iceberg.TableScan;
3637
import org.apache.iceberg.Transaction;
3738
import org.apache.iceberg.Transactions;
3839
import org.apache.iceberg.catalog.Catalog;
@@ -588,7 +589,7 @@ default int numberFileScanTasksPerPlanTask() {
588589
return 100;
589590
}
590591

591-
default boolean shouldPlanTableScanAsync(TableScan tableScan) {
592+
default boolean shouldPlanTableScanAsync(Scan<?, FileScanTask, ?> scan) {
592593
return false;
593594
}
594595
}

0 commit comments

Comments
 (0)