Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -76,50 +78,81 @@ public static List<IcebergSourceSplit> planIcebergSourceSplits(
}

static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context, ExecutorService workerPool) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project())
.planWith(workerPool);
ScanMode scanMode = checkScanMode(context);
if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) {
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}
if (context.startSnapshotId() != null) {
scan = scan.fromSnapshotExclusive(context.startSnapshotId());
}

if (context.endSnapshotId() != null) {
scan = scan.toSnapshot(context.endSnapshotId());
}

return scan.planTasks();
} else {
TableScan scan = table.newScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
return scan.planTasks();
}
}

private enum ScanMode {
BATCH,
INCREMENTAL_APPEND_SCAN
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
private static ScanMode checkScanMode(ScanContext context) {
if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) {
return ScanMode.INCREMENTAL_APPEND_SCAN;
} else {
return ScanMode.BATCH;
}
}

if (context.startSnapshotId() != null) {
if (context.endSnapshotId() != null) {
scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
} else {
scan = scan.appendsAfter(context.startSnapshotId());
}
/**
* refine scan with common configs
*/
private static <T extends Scan<T>> T refineScanWithBaseConfigs(
T scan, ScanContext context, ExecutorService workerPool) {
T refinedScan = scan
.caseSensitive(context.caseSensitive())
.project(context.project())
.planWith(workerPool);

if (context.includeColumnStats()) {
refinedScan = refinedScan.includeColumnStats();
}

if (context.splitSize() != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
}

if (context.splitLookback() != null) {
scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
}

if (context.splitOpenFileCost() != null) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}

if (context.filters() != null) {
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
refinedScan = refinedScan.filter(filter);
}
}

return scan.planTasks();
return refinedScan;
}
}
Loading