diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index a2df652e8190..2e50243240f6 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -25,6 +25,8 @@ import java.util.concurrent.ExecutorService; import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.IncrementalAppendScan; +import org.apache.iceberg.Scan; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -76,50 +78,81 @@ public static List planIcebergSourceSplits( } static CloseableIterable planTasks(Table table, ScanContext context, ExecutorService workerPool) { - TableScan scan = table - .newScan() - .caseSensitive(context.caseSensitive()) - .project(context.project()) - .planWith(workerPool); + ScanMode scanMode = checkScanMode(context); + if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) { + IncrementalAppendScan scan = table.newIncrementalAppendScan(); + scan = refineScanWithBaseConfigs(scan, context, workerPool); - if (context.includeColumnStats()) { - scan = scan.includeColumnStats(); - } + if (context.startSnapshotId() != null) { + scan = scan.fromSnapshotExclusive(context.startSnapshotId()); + } + + if (context.endSnapshotId() != null) { + scan = scan.toSnapshot(context.endSnapshotId()); + } + + return scan.planTasks(); + } else { + TableScan scan = table.newScan(); + scan = refineScanWithBaseConfigs(scan, context, workerPool); + + if (context.snapshotId() != null) { + scan = scan.useSnapshot(context.snapshotId()); + } + + if (context.asOfTimestamp() != null) { + scan = scan.asOfTime(context.asOfTimestamp()); + } - if (context.snapshotId() != null) { - scan = scan.useSnapshot(context.snapshotId()); + return scan.planTasks(); } + } + + private enum ScanMode { + BATCH, + INCREMENTAL_APPEND_SCAN + } - if (context.asOfTimestamp() != null) { - scan = scan.asOfTime(context.asOfTimestamp()); + private static ScanMode checkScanMode(ScanContext context) { + if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) { + return ScanMode.INCREMENTAL_APPEND_SCAN; + } else { + return ScanMode.BATCH; } + } - if (context.startSnapshotId() != null) { - if (context.endSnapshotId() != null) { - scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId()); - } else { - scan = scan.appendsAfter(context.startSnapshotId()); - } + /** + * refine scan with common configs + */ + private static > T refineScanWithBaseConfigs( + T scan, ScanContext context, ExecutorService workerPool) { + T refinedScan = scan + .caseSensitive(context.caseSensitive()) + .project(context.project()) + .planWith(workerPool); + + if (context.includeColumnStats()) { + refinedScan = refinedScan.includeColumnStats(); } if (context.splitSize() != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString()); + refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString()); } if (context.splitLookback() != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString()); + refinedScan = refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString()); } if (context.splitOpenFileCost() != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString()); + refinedScan = refinedScan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString()); } if (context.filters() != null) { for (Expression filter : context.filters()) { - scan = scan.filter(filter); + refinedScan = refinedScan.filter(filter); } } - return scan.planTasks(); + return refinedScan; } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 806006dd1252..84c7652ef41e 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -23,9 +23,11 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; @@ -35,7 +37,8 @@ /** * Context object with optional arguments for a Flink Scan. */ -class ScanContext implements Serializable { +@Internal +public class ScanContext implements Serializable { private static final long serialVersionUID = 1L; @@ -48,6 +51,13 @@ class ScanContext implements Serializable { private static final ConfigOption AS_OF_TIMESTAMP = ConfigOptions.key("as-of-timestamp").longType().defaultValue(null); + private static final ConfigOption STARTING_STRATEGY = + ConfigOptions.key("starting-strategy").enumType(StreamingStartingStrategy.class) + .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT); + + private static final ConfigOption START_SNAPSHOT_TIMESTAMP = + ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null); + private static final ConfigOption START_SNAPSHOT_ID = ConfigOptions.key("start-snapshot-id").longType().defaultValue(null); @@ -75,7 +85,9 @@ class ScanContext implements Serializable { private final boolean caseSensitive; private final boolean exposeLocality; private final Long snapshotId; + private final StreamingStartingStrategy startingStrategy; private final Long startSnapshotId; + private final Long startSnapshotTimestamp; private final Long endSnapshotId; private final Long asOfTimestamp; private final Long splitSize; @@ -91,13 +103,15 @@ class ScanContext implements Serializable { private final boolean includeColumnStats; private final Integer planParallelism; - private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, - Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, - boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema, - List filters, long limit, boolean includeColumnStats, boolean exposeLocality, - Integer planParallelism) { + private ScanContext(boolean caseSensitive, Long snapshotId, StreamingStartingStrategy startingStrategy, + Long startSnapshotTimestamp, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, + Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming, + Duration monitorInterval, String nameMapping, Schema schema, List filters, + long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; + this.startingStrategy = startingStrategy; + this.startSnapshotTimestamp = startSnapshotTimestamp; this.startSnapshotId = startSnapshotId; this.endSnapshotId = endSnapshotId; this.asOfTimestamp = asOfTimestamp; @@ -114,77 +128,104 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.includeColumnStats = includeColumnStats; this.exposeLocality = exposeLocality; this.planParallelism = planParallelism; + + validate(); + } + + private void validate() { + if (isStreaming) { + if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) { + Preconditions.checkArgument(startSnapshotId != null, + "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + Preconditions.checkArgument(startSnapshotTimestamp == null, + "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + } + if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) { + Preconditions.checkArgument(startSnapshotTimestamp != null, + "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + Preconditions.checkArgument(startSnapshotId == null, + "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + } + } } - boolean caseSensitive() { + public boolean caseSensitive() { return caseSensitive; } - Long snapshotId() { + public Long snapshotId() { return snapshotId; } - Long startSnapshotId() { + public StreamingStartingStrategy startingStrategy() { + return startingStrategy; + } + + public Long startSnapshotTimestamp() { + return startSnapshotTimestamp; + } + + public Long startSnapshotId() { return startSnapshotId; } - Long endSnapshotId() { + public Long endSnapshotId() { return endSnapshotId; } - Long asOfTimestamp() { + public Long asOfTimestamp() { return asOfTimestamp; } - Long splitSize() { + public Long splitSize() { return splitSize; } - Integer splitLookback() { + public Integer splitLookback() { return splitLookback; } - Long splitOpenFileCost() { + public Long splitOpenFileCost() { return splitOpenFileCost; } - boolean isStreaming() { + public boolean isStreaming() { return isStreaming; } - Duration monitorInterval() { + public Duration monitorInterval() { return monitorInterval; } - String nameMapping() { + public String nameMapping() { return nameMapping; } - Schema project() { + public Schema project() { return schema; } - List filters() { + public List filters() { return filters; } - long limit() { + public long limit() { return limit; } - boolean includeColumnStats() { + public boolean includeColumnStats() { return includeColumnStats; } - boolean exposeLocality() { + public boolean exposeLocality() { return exposeLocality; } - Integer planParallelism() { + public Integer planParallelism() { return planParallelism; } - ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { + public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(null) @@ -206,7 +247,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI .build(); } - ScanContext copyWithSnapshotId(long newSnapshotId) { + public ScanContext copyWithSnapshotId(long newSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(newSnapshotId) @@ -228,13 +269,15 @@ ScanContext copyWithSnapshotId(long newSnapshotId) { .build(); } - static Builder builder() { + public static Builder builder() { return new Builder(); } - static class Builder { + public static class Builder { private boolean caseSensitive = CASE_SENSITIVE.defaultValue(); private Long snapshotId = SNAPSHOT_ID.defaultValue(); + private StreamingStartingStrategy startingStrategy = STARTING_STRATEGY.defaultValue(); + private Long startSnapshotTimestamp = START_SNAPSHOT_TIMESTAMP.defaultValue(); private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue(); private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue(); private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); @@ -254,98 +297,110 @@ static class Builder { private Builder() { } - Builder caseSensitive(boolean newCaseSensitive) { + public Builder caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; return this; } - Builder useSnapshotId(Long newSnapshotId) { + public Builder useSnapshotId(Long newSnapshotId) { this.snapshotId = newSnapshotId; return this; } - Builder startSnapshotId(Long newStartSnapshotId) { + public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) { + this.startingStrategy = newStartingStrategy; + return this; + } + + public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) { + this.startSnapshotTimestamp = newStartSnapshotTimestamp; + return this; + } + + public Builder startSnapshotId(Long newStartSnapshotId) { this.startSnapshotId = newStartSnapshotId; return this; } - Builder endSnapshotId(Long newEndSnapshotId) { + public Builder endSnapshotId(Long newEndSnapshotId) { this.endSnapshotId = newEndSnapshotId; return this; } - Builder asOfTimestamp(Long newAsOfTimestamp) { + public Builder asOfTimestamp(Long newAsOfTimestamp) { this.asOfTimestamp = newAsOfTimestamp; return this; } - Builder splitSize(Long newSplitSize) { + public Builder splitSize(Long newSplitSize) { this.splitSize = newSplitSize; return this; } - Builder splitLookback(Integer newSplitLookback) { + public Builder splitLookback(Integer newSplitLookback) { this.splitLookback = newSplitLookback; return this; } - Builder splitOpenFileCost(Long newSplitOpenFileCost) { + public Builder splitOpenFileCost(Long newSplitOpenFileCost) { this.splitOpenFileCost = newSplitOpenFileCost; return this; } - Builder streaming(boolean streaming) { + public Builder streaming(boolean streaming) { this.isStreaming = streaming; return this; } - Builder monitorInterval(Duration newMonitorInterval) { + public Builder monitorInterval(Duration newMonitorInterval) { this.monitorInterval = newMonitorInterval; return this; } - Builder nameMapping(String newNameMapping) { + public Builder nameMapping(String newNameMapping) { this.nameMapping = newNameMapping; return this; } - Builder project(Schema newProjectedSchema) { + public Builder project(Schema newProjectedSchema) { this.projectedSchema = newProjectedSchema; return this; } - Builder filters(List newFilters) { + public Builder filters(List newFilters) { this.filters = newFilters; return this; } - Builder limit(long newLimit) { + public Builder limit(long newLimit) { this.limit = newLimit; return this; } - Builder includeColumnStats(boolean newIncludeColumnStats) { + public Builder includeColumnStats(boolean newIncludeColumnStats) { this.includeColumnStats = newIncludeColumnStats; return this; } - Builder exposeLocality(boolean newExposeLocality) { + public Builder exposeLocality(boolean newExposeLocality) { this.exposeLocality = newExposeLocality; return this; } - Builder planParallelism(Integer parallelism) { + public Builder planParallelism(Integer parallelism) { this.planParallelism = parallelism; return this; } - Builder fromProperties(Map properties) { + public Builder fromProperties(Map properties) { Configuration config = new Configuration(); properties.forEach(config::setString); return this.useSnapshotId(config.get(SNAPSHOT_ID)) .caseSensitive(config.get(CASE_SENSITIVE)) .asOfTimestamp(config.get(AS_OF_TIMESTAMP)) + .startingStrategy(config.get(STARTING_STRATEGY)) + .startSnapshotTimestamp(config.get(START_SNAPSHOT_TIMESTAMP)) .startSnapshotId(config.get(START_SNAPSHOT_ID)) .endSnapshotId(config.get(END_SNAPSHOT_ID)) .splitSize(config.get(SPLIT_SIZE)) @@ -358,8 +413,8 @@ Builder fromProperties(Map properties) { } public ScanContext build() { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, - endSnapshotId, asOfTimestamp, splitSize, splitLookback, + return new ScanContext(caseSensitive, snapshotId, startingStrategy, startSnapshotTimestamp, + startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback, splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, filters, limit, includeColumnStats, exposeLocality, planParallelism); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java new file mode 100644 index 000000000000..3e83fbe7f5af --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +/** + * Starting strategy for streaming execution. + */ +public enum StreamingStartingStrategy { + /** + * Do a regular table scan then switch to the incremental mode. + *

+ * The incremental mode starts from the current snapshot exclusive. + */ + TABLE_SCAN_THEN_INCREMENTAL, + + /** + * Start incremental mode from the latest snapshot inclusive. + *

+ * If it is an empty map, all future append snapshots should be discovered. + */ + INCREMENTAL_FROM_LATEST_SNAPSHOT, + + /** + * Start incremental mode from the earliest snapshot inclusive. + *

+ * If it is an empty map, all future append snapshots should be discovered. + */ + INCREMENTAL_FROM_EARLIEST_SNAPSHOT, + + /** + * Start incremental mode from a snapshot with a specific id inclusive. + */ + INCREMENTAL_FROM_SNAPSHOT_ID, + + /** + * Start incremental mode from a snapshot with a specific timestamp inclusive. + *

+ * If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. + */ + INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java new file mode 100644 index 000000000000..8c20f2cf22bc --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.util.Collection; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class ContinuousEnumerationResult { + private final Collection splits; + private final IcebergEnumeratorPosition fromPosition; + private final IcebergEnumeratorPosition toPosition; + + /** + * @param splits should never be null. But it can be an empty collection + * @param fromPosition can be null + * @param toPosition should never be null. But it can have null snapshotId and snapshotTimestampMs + */ + ContinuousEnumerationResult( + Collection splits, + IcebergEnumeratorPosition fromPosition, + IcebergEnumeratorPosition toPosition) { + Preconditions.checkArgument(splits != null, "Invalid to splits collection: null"); + Preconditions.checkArgument(toPosition != null, "Invalid end position: null"); + this.splits = splits; + this.fromPosition = fromPosition; + this.toPosition = toPosition; + } + + public Collection splits() { + return splits; + } + + public IcebergEnumeratorPosition fromPosition() { + return fromPosition; + } + + public IcebergEnumeratorPosition toPosition() { + return toPosition; + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java new file mode 100644 index 000000000000..1737ae6a5023 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.Closeable; +import org.apache.flink.annotation.Internal; + +/** + * This interface is introduced so that we can plug in different split planner for unit test + */ +@Internal +public interface ContinuousSplitPlanner extends Closeable { + + /** + * Discover the files appended between {@code lastPosition} and current table snapshot + */ + ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition); +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java new file mode 100644 index 000000000000..4d6e89684be0 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.source.FlinkSplitPlanner; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner { + private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class); + + private final Table table; + private final ScanContext scanContext; + private final boolean isSharedPool; + private final ExecutorService workerPool; + + /** + * @param threadName thread name prefix for worker pool to run the split planning. + * If null, a shared worker pool will be used. + */ + public ContinuousSplitPlannerImpl(Table table, ScanContext scanContext, String threadName) { + this.table = table; + this.scanContext = scanContext; + this.isSharedPool = threadName == null; + this.workerPool = isSharedPool ? ThreadPools.getWorkerPool() + : ThreadPools.newWorkerPool("iceberg-plan-worker-pool-" + threadName, scanContext.planParallelism()); + } + + @Override + public void close() throws IOException { + if (!isSharedPool) { + workerPool.shutdown(); + } + } + + @Override + public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) { + table.refresh(); + if (lastPosition != null) { + return discoverIncrementalSplits(lastPosition); + } else { + return discoverInitialSplits(); + } + } + + /** + * Discover incremental changes between @{code lastPosition} and current table snapshot + */ + private ContinuousEnumerationResult discoverIncrementalSplits(IcebergEnumeratorPosition lastPosition) { + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot == null) { + // empty table + Preconditions.checkArgument(lastPosition.snapshotId() == null, + "Invalid last enumerated position for an empty table: not null"); + LOG.info("Skip incremental scan because table is empty"); + return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition); + } else if (lastPosition.snapshotId() != null && currentSnapshot.snapshotId() == lastPosition.snapshotId()) { + LOG.info("Current table snapshot is already enumerated: {}", currentSnapshot.snapshotId()); + return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition); + } else { + IcebergEnumeratorPosition newPosition = IcebergEnumeratorPosition.of( + currentSnapshot.snapshotId(), currentSnapshot.timestampMillis()); + ScanContext incrementalScan = scanContext + .copyWithAppendsBetween(lastPosition.snapshotId(), currentSnapshot.snapshotId()); + List splits = FlinkSplitPlanner.planIcebergSourceSplits(table, incrementalScan, workerPool); + LOG.info("Discovered {} splits from incremental scan: " + + "from snapshot (exclusive) is {}, to snapshot (inclusive) is {}", + splits.size(), lastPosition, newPosition); + return new ContinuousEnumerationResult(splits, lastPosition, newPosition); + } + } + + /** + * Discovery initial set of splits based on {@link StreamingStartingStrategy}. + * + *

  • {@link ContinuousEnumerationResult#splits()} should contain initial splits + * discovered from table scan for {@link StreamingStartingStrategy#TABLE_SCAN_THEN_INCREMENTAL}. + * For all other strategies, splits collection should be empty. + *
  • {@link ContinuousEnumerationResult#toPosition()} points to the starting position + * for the next incremental split discovery with exclusive behavior. Meaning files committed + * by the snapshot from the position in {@code ContinuousEnumerationResult} won't be included + * in the next incremental scan. + */ + private ContinuousEnumerationResult discoverInitialSplits() { + Optional startSnapshotOptional = startSnapshot(table, scanContext); + if (!startSnapshotOptional.isPresent()) { + return new ContinuousEnumerationResult(Collections.emptyList(), null, + IcebergEnumeratorPosition.empty()); + } + + Snapshot startSnapshot = startSnapshotOptional.get(); + LOG.info("Get starting snapshot id {} based on strategy {}", + startSnapshot.snapshotId(), scanContext.startingStrategy()); + List splits; + IcebergEnumeratorPosition toPosition; + if (scanContext.startingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) { + // do a batch table scan first + splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool); + LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}", + splits.size(), startSnapshot.snapshotId()); + // For TABLE_SCAN_THEN_INCREMENTAL, incremental mode starts exclusive from the startSnapshot + toPosition = IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis()); + } else { + // For all other modes, starting snapshot should be consumed inclusively. + // Use parentId to achieve the inclusive behavior. It is fine if parentId is null. + splits = Collections.emptyList(); + Long parentSnapshotId = startSnapshot.parentId(); + if (parentSnapshotId != null) { + Snapshot parentSnapshot = table.snapshot(parentSnapshotId); + Long parentSnapshotTimestampMs = parentSnapshot != null ? parentSnapshot.timestampMillis() : null; + toPosition = IcebergEnumeratorPosition.of(parentSnapshotId, parentSnapshotTimestampMs); + } else { + toPosition = IcebergEnumeratorPosition.empty(); + } + + LOG.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}", + startSnapshot.snapshotId(), startSnapshot.timestampMillis()); + } + + return new ContinuousEnumerationResult(splits, null, toPosition); + } + + /** + * Calculate the starting snapshot based on the {@link StreamingStartingStrategy} defined in {@code ScanContext}. + *

    + * If the {@link StreamingStartingStrategy} is not {@link StreamingStartingStrategy#TABLE_SCAN_THEN_INCREMENTAL}, + * the start snapshot should be consumed inclusively. + */ + @VisibleForTesting + static Optional startSnapshot(Table table, ScanContext scanContext) { + switch (scanContext.startingStrategy()) { + case TABLE_SCAN_THEN_INCREMENTAL: + case INCREMENTAL_FROM_LATEST_SNAPSHOT: + return Optional.ofNullable(table.currentSnapshot()); + case INCREMENTAL_FROM_EARLIEST_SNAPSHOT: + return Optional.ofNullable(SnapshotUtil.oldestAncestor(table)); + case INCREMENTAL_FROM_SNAPSHOT_ID: + Snapshot matchedSnapshotById = table.snapshot(scanContext.startSnapshotId()); + Preconditions.checkArgument(matchedSnapshotById != null, + "Start snapshot id not found in history: " + scanContext.startSnapshotId()); + return Optional.of(matchedSnapshotById); + case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: + long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp()); + Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime); + if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) { + return Optional.of(matchedSnapshotByTimestamp); + } else { + // if the snapshotIdAsOfTime has the timestamp value smaller than the scanContext.startSnapshotTimestamp(), + // return the child snapshot whose timestamp value is larger + return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime)); + } + default: + throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.startingStrategy()); + } + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java new file mode 100644 index 000000000000..e024473da3c9 --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; + +class IcebergEnumeratorPosition { + private final Long snapshotId; + // Track snapshot timestamp mainly for info logging + private final Long snapshotTimestampMs; + + static IcebergEnumeratorPosition empty() { + return new IcebergEnumeratorPosition(null, null); + } + + static IcebergEnumeratorPosition of(long snapshotId, Long snapshotTimestampMs) { + return new IcebergEnumeratorPosition(snapshotId, snapshotTimestampMs); + } + + private IcebergEnumeratorPosition(Long snapshotId, Long snapshotTimestampMs) { + this.snapshotId = snapshotId; + this.snapshotTimestampMs = snapshotTimestampMs; + } + + boolean isEmpty() { + return snapshotId == null; + } + + Long snapshotId() { + return snapshotId; + } + + Long snapshotTimestampMs() { + return snapshotTimestampMs; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotId", snapshotId) + .add("snapshotTimestampMs", snapshotTimestampMs) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode( + snapshotId, + snapshotTimestampMs); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergEnumeratorPosition other = (IcebergEnumeratorPosition) o; + return Objects.equal(snapshotId, other.snapshotId()) && + Objects.equal(snapshotTimestampMs, other.snapshotTimestampMs()); + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java new file mode 100644 index 000000000000..bc4e209a46ba --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; + +public class HadoopTableResource extends ExternalResource { + private final TemporaryFolder temporaryFolder; + private final String database; + private final String tableName; + private final Schema schema; + private final PartitionSpec partitionSpec; + + private HadoopCatalog catalog; + private TableLoader tableLoader; + private Table table; + + public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName, Schema schema) { + this(temporaryFolder, database, tableName, schema, null); + } + + public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName, + Schema schema, PartitionSpec partitionSpec) { + this.temporaryFolder = temporaryFolder; + this.database = database; + this.tableName = tableName; + this.schema = schema; + this.partitionSpec = partitionSpec; + } + + @Override + protected void before() throws Throwable { + File warehouseFile = temporaryFolder.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + // before variables + String warehouse = "file:" + warehouseFile; + Configuration hadoopConf = new Configuration(); + this.catalog = new HadoopCatalog(hadoopConf, warehouse); + String location = String.format("%s/%s/%s", warehouse, database, tableName); + this.tableLoader = TableLoader.fromHadoopTable(location); + if (partitionSpec == null) { + this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema); + } else { + this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema, partitionSpec); + } + tableLoader.open(); + } + + @Override + protected void after() { + try { + catalog.dropTable(TableIdentifier.of(database, tableName)); + catalog.close(); + tableLoader.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close catalog resource"); + } + } + + public TableLoader tableLoader() { + return tableLoader; + } + + public Table table() { + return table; + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 357f5ab14f7a..9284b8fa9ef1 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -237,15 +237,15 @@ public void testIncrementalRead() throws Exception { long snapshotId1 = table.currentSnapshot().snapshotId(); // snapshot 2 - List records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + List records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L); helper.appendToTable(records2); - List records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + List records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L); helper.appendToTable(records3); long snapshotId3 = table.currentSnapshot().snapshotId(); // snapshot 4 - helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L)); + helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L)); List expected2 = Lists.newArrayList(); expected2.addAll(records2); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java new file mode 100644 index 000000000000..f1db8ef5d6ad --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +class ManualContinuousSplitPlanner implements ContinuousSplitPlanner { + private final ArrayDeque splits = new ArrayDeque<>(); + private IcebergEnumeratorPosition latestPosition; + + @Override + public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) { + ContinuousEnumerationResult result = new ContinuousEnumerationResult( + Lists.newArrayList(splits), lastPosition, latestPosition); + return result; + } + + /** + * Add new splits to the collection + */ + public void addSplits(List newSplits, IcebergEnumeratorPosition newPosition) { + splits.addAll(newSplits); + this.latestPosition = newPosition; + } + + /** + * Clear the splits collection + */ + public void clearSplits() { + splits.clear(); + } + + @Override + public void close() throws IOException { + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java new file mode 100644 index 000000000000..2bcf2f07da89 --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.joda.time.format.DateTimeFormat; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +public class TestContinuousSplitPlannerImpl { + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final FileFormat fileFormat = FileFormat.PARQUET; + private static final AtomicLong randomSeed = new AtomicLong(); + + @Rule + public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER, + TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + + @Rule + public TestName testName = new TestName(); + + private GenericAppenderHelper dataAppender; + private DataFile dataFile1; + private Snapshot snapshot1; + private DataFile dataFile2; + private Snapshot snapshot2; + + @Before + public void before() throws IOException { + dataAppender = new GenericAppenderHelper(tableResource.table(), fileFormat, TEMPORARY_FOLDER); + } + + private void appendTwoSnapshots() throws IOException { + // snapshot1 + List batch1 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); + dataFile1 = dataAppender.writeFile(null, batch1); + dataAppender.appendToTable(dataFile1); + snapshot1 = tableResource.table().currentSnapshot(); + + // snapshot2 + List batch2 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 1L); + dataFile2 = dataAppender.writeFile(null, batch2); + dataAppender.appendToTable(dataFile2); + snapshot2 = tableResource.table().currentSnapshot(); + } + + /** + * @return the last enumerated snapshot id + */ + private IcebergEnumeratorPosition verifyOneCycle( + ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition) throws Exception { + List batch = RandomGenericData.generate(TestFixtures.SCHEMA, 2, randomSeed.incrementAndGet()); + DataFile dataFile = dataAppender.writeFile(null, batch); + dataAppender.appendToTable(dataFile); + Snapshot snapshot = tableResource.table().currentSnapshot(); + + ContinuousEnumerationResult result = splitPlanner.planSplits(lastPosition); + Assert.assertEquals(lastPosition.snapshotId(), result.fromPosition().snapshotId()); + Assert.assertEquals(lastPosition.snapshotTimestampMs(), result.fromPosition().snapshotTimestampMs()); + Assert.assertEquals(snapshot.snapshotId(), result.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot.timestampMillis(), result.toPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(1, result.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(result.splits()); + Assert.assertEquals(1, split.task().files().size()); + Assert.assertEquals(dataFile.path().toString(), + Iterables.getOnlyElement(split.task().files()).file().path().toString()); + return result.toPosition(); + } + + @Test + public void testTableScanThenIncrementalWithEmptyTable() throws Exception { + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null); + Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty()); + Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition()); + Assert.assertTrue(emptyTableInitialDiscoveryResult.toPosition().isEmpty()); + Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs()); + + ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner + .planSplits(emptyTableInitialDiscoveryResult.toPosition()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.fromPosition().isEmpty()); + Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.toPosition().isEmpty()); + Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs()); + + // next 3 snapshots + IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertNull(initialResult.fromPosition()); + Assert.assertEquals(snapshot2.snapshotId(), initialResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot2.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + Set discoveredFiles = split.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + Set expectedFiles = ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString()); + Assert.assertEquals(expectedFiles, discoveredFiles); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception { + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) + .splitSize(1L) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null); + Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty()); + Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition()); + Assert.assertTrue(emptyTableInitialDiscoveryResult.toPosition().isEmpty()); + Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs()); + + ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner + .planSplits(emptyTableInitialDiscoveryResult.toPosition()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.fromPosition().isEmpty()); + Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.toPosition().isEmpty()); + Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs()); + + // latest mode should discover both snapshots, as latest position is marked by when job starts + appendTwoSnapshots(); + ContinuousEnumerationResult afterTwoSnapshotsAppended = splitPlanner + .planSplits(emptyTableSecondDiscoveryResult.toPosition()); + Assert.assertEquals(2, afterTwoSnapshotsAppended.splits().size()); + + // next 3 snapshots + IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertNull(initialResult.fromPosition()); + // For inclusive behavior, the initial result should point to snapshot1 + // Then the next incremental scan shall discover files from latest snapshot2 (inclusive) + Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(0, initialResult.splits().size()); + + ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition()); + Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue()); + IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits()); + Assert.assertEquals(1, split.task().files().size()); + Set discoveredFiles = split.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + // should discover dataFile2 appended in snapshot2 + Set expectedFiles = ImmutableSet.of(dataFile2.path().toString()); + Assert.assertEquals(expectedFiles, discoveredFiles); + + IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception { + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null); + Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty()); + Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition()); + Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotId()); + Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs()); + + ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner + .planSplits(emptyTableInitialDiscoveryResult.toPosition()); + Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty()); + Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotId()); + Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs()); + Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotId()); + Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs()); + + // next 3 snapshots + IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertNull(initialResult.fromPosition()); + // For inclusive behavior, the initial result should point to snapshot1's parent, + // which leads to null snapshotId and snapshotTimestampMs. + Assert.assertNull(initialResult.toPosition().snapshotId()); + Assert.assertNull(initialResult.toPosition().snapshotTimestampMs()); + Assert.assertEquals(0, initialResult.splits().size()); + + ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition()); + Assert.assertNull(secondResult.fromPosition().snapshotId()); + Assert.assertNull(secondResult.fromPosition().snapshotTimestampMs()); + Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue()); + IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + Set discoveredFiles = split.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + // should discover files appended in both snapshot1 and snapshot2 + Set expectedFiles = ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString()); + Assert.assertEquals(expectedFiles, discoveredFiles); + + IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception { + ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) + .startSnapshotId(1L) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContextWithInvalidSnapshotId, null); + + AssertHelpers.assertThrows("Should detect invalid starting snapshot id", + IllegalArgumentException.class, + "Start snapshot id not found in history: 1", + () -> splitPlanner.planSplits(null)); + } + + @Test + public void testIncrementalFromSnapshotIdWithInvalidIds() throws Exception { + appendTwoSnapshots(); + + // find an invalid snapshotId + long invalidSnapshotId = 0L; + while (invalidSnapshotId == snapshot1.snapshotId() || invalidSnapshotId == snapshot2.snapshotId()) { + invalidSnapshotId++; + } + + ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) + .startSnapshotId(invalidSnapshotId) + .build(); + + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContextWithInvalidSnapshotId, null); + + AssertHelpers.assertThrows("Should detect invalid starting snapshot id", + IllegalArgumentException.class, + "Start snapshot id not found in history: " + invalidSnapshotId, + () -> splitPlanner.planSplits(null)); + } + + @Test + public void testIncrementalFromSnapshotId() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) + .startSnapshotId(snapshot2.snapshotId()) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertNull(initialResult.fromPosition()); + // For inclusive behavior of snapshot2, the initial result should point to snapshot1 (as snapshot2's parent) + Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(0, initialResult.splits().size()); + + ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition()); + Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue()); + IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits()); + Assert.assertEquals(1, split.task().files().size()); + Set discoveredFiles = split.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + // should discover dataFile2 appended in snapshot2 + Set expectedFiles = ImmutableSet.of(dataFile2.path().toString()); + Assert.assertEquals(expectedFiles, discoveredFiles); + + IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } + + @Test + public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception { + ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) + .startSnapshotTimestamp(1L) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContextWithInvalidSnapshotId, null); + + AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp", + IllegalArgumentException.class, + "Cannot find a snapshot older than 1970-01-01 00:00:00.001", + () -> splitPlanner.planSplits(null)); + } + + @Test + public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exception { + appendTwoSnapshots(); + + long invalidSnapshotTimestampMs = snapshot1.timestampMillis() - 1000L; + String invalidSnapshotTimestampMsStr = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZoneUTC() + .print(invalidSnapshotTimestampMs); + + ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) + .startSnapshotTimestamp(invalidSnapshotTimestampMs) + .build(); + + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContextWithInvalidSnapshotId, null); + + AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp", + IllegalArgumentException.class, + "Cannot find a snapshot older than " + invalidSnapshotTimestampMsStr, + () -> splitPlanner.planSplits(null)); + } + + @Test + public void testIncrementalFromSnapshotTimestamp() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) + .startSnapshotTimestamp(snapshot2.timestampMillis()) + .build(); + ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl( + tableResource.table(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertNull(initialResult.fromPosition()); + // For inclusive behavior, the initial result should point to snapshot1 (as snapshot2's parent). + Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(0, initialResult.splits().size()); + + ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition()); + Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue()); + Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue()); + IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits()); + Assert.assertEquals(1, split.task().files().size()); + Set discoveredFiles = split.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + // should discover dataFile2 appended in snapshot2 + Set expectedFiles = ImmutableSet.of(dataFile2.path().toString()); + Assert.assertEquals(expectedFiles, discoveredFiles); + + IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); + for (int i = 0; i < 3; ++i) { + lastPosition = verifyOneCycle(splitPlanner, lastPosition); + } + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java new file mode 100644 index 000000000000..ef52653403b4 --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.enumerator; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; + +public class TestContinuousSplitPlannerImplStartStrategy { + private static final FileFormat FILE_FORMAT = FileFormat.PARQUET; + + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + public final HadoopTableResource tableResource = new HadoopTableResource(temporaryFolder, + TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + @Rule + public final TestRule chain = RuleChain + .outerRule(temporaryFolder) + .around(tableResource); + + private GenericAppenderHelper dataAppender; + private Snapshot snapshot1; + private Snapshot snapshot2; + private Snapshot snapshot3; + + @Before + public void before() throws IOException { + dataAppender = new GenericAppenderHelper(tableResource.table(), FILE_FORMAT, temporaryFolder); + } + + private void appendThreeSnapshots() throws IOException { + List batch1 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); + dataAppender.appendToTable(batch1); + snapshot1 = tableResource.table().currentSnapshot(); + + List batch2 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 1L); + dataAppender.appendToTable(batch2); + snapshot2 = tableResource.table().currentSnapshot(); + + List batch3 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 2L); + dataAppender.appendToTable(batch3); + snapshot3 = tableResource.table().currentSnapshot(); + } + + @Test + public void testTableScanThenIncrementalStrategy() throws IOException { + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + + // emtpy table + Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent()); + + appendThreeSnapshots(); + Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get(); + Assert.assertEquals(snapshot3.snapshotId(), startSnapshot.snapshotId()); + } + + @Test + public void testForLatestSnapshotStrategy() throws IOException { + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) + .build(); + + // emtpy table + Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent()); + + appendThreeSnapshots(); + Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get(); + Assert.assertEquals(snapshot3.snapshotId(), startSnapshot.snapshotId()); + } + + @Test + public void testForEarliestSnapshotStrategy() throws IOException { + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT) + .build(); + + // emtpy table + Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent()); + + appendThreeSnapshots(); + Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get(); + Assert.assertEquals(snapshot1.snapshotId(), startSnapshot.snapshotId()); + } + + @Test + public void testForSpecificSnapshotIdStrategy() throws IOException { + ScanContext scanContextInvalidSnapshotId = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) + .startSnapshotId(1L) + .build(); + + // emtpy table + AssertHelpers.assertThrows("Should detect invalid starting snapshot id", + IllegalArgumentException.class, + "Start snapshot id not found in history: 1", + () -> ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContextInvalidSnapshotId)); + + appendThreeSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) + .startSnapshotId(snapshot2.snapshotId()) + .build(); + + Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get(); + Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId()); + } + + @Test + public void testForSpecificSnapshotTimestampStrategySnapshot2() throws IOException { + ScanContext scanContextInvalidSnapshotTimestamp = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) + .startSnapshotTimestamp(1L) + .build(); + + // emtpy table + AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp", + IllegalArgumentException.class, + "Cannot find a snapshot older than 1970-01-01 00:00:00.001", + () -> ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContextInvalidSnapshotTimestamp)); + + appendThreeSnapshots(); + + ScanContext scanContext = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) + .startSnapshotTimestamp(snapshot2.timestampMillis()) + .build(); + + Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get(); + Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId()); + } + + @Test + public void testForSpecificSnapshotTimestampStrategySnapshot2Minus1() throws IOException { + appendThreeSnapshots(); + + ScanContext config = ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) + .startSnapshotTimestamp(snapshot2.timestampMillis() - 1L) + .build(); + + Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), config).get(); + Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId()); + } +}