diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 59b11281cc78..5bb9ba1378cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -245,7 +245,10 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho Options conf = options.toConfiguration(); TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new TagManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) { Pair incrementalBetween = options.incrementalBetween(); Optional startTag = tagManager.get(incrementalBetween.getLeft()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 8c4b5d5cec01..835b7595a316 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.tag.Tag; import org.apache.paimon.tag.TagPeriodHandler; import org.apache.paimon.utils.Pair; @@ -50,6 +51,14 @@ public IncrementalTagStartingScanner( this.start = start; this.end = end; this.startingSnapshotId = start.id(); + + TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery( + new SchemaManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()), + start.schemaId(), + end.schemaId()); } @Override @@ -66,7 +75,10 @@ public static AbstractStartingScanner create( endTagName); TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new TagManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); Optional endTag = tagManager.get(endTagName); if (!endTag.isPresent()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 5b4ee4e58c50..4a0f4290df91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -20,8 +20,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.paimon.utils.TagManager; @@ -30,6 +31,7 @@ import java.util.List; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** The util class of resolve snapshot from scan params for time travel. */ public class TimeTravelUtil { @@ -58,7 +60,7 @@ public static Snapshot resolveSnapshotFromOptions( return snapshotManager.latestSnapshot(); } - Preconditions.checkArgument( + checkArgument( scanHandleKey.size() == 1, String.format( "Only one of the following parameters may be set : [%s, %s, %s, %s]", @@ -124,4 +126,22 @@ private static Snapshot resolveSnapshotByTagName( new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); return tagManager.getOrThrow(tagName).trimToSnapshot(); } + + public static void checkRescaleBucketForIncrementalTagQuery( + SchemaManager schemaManager, long schemaId1, long schemaId2) { + if (schemaId1 != schemaId2) { + int bucketNumber1 = bucketNumber(schemaManager, schemaId1); + int bucketNumber2 = bucketNumber(schemaManager, schemaId2); + checkArgument( + bucketNumber1 == bucketNumber2, + "The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.", + bucketNumber1, + bucketNumber2); + } + } + + private static int bucketNumber(SchemaManager schemaManager, long schemaId) { + TableSchema schema = schemaManager.schema(schemaId); + return CoreOptions.fromMap(schema.options()).bucket(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index ee24dc8ef352..d9a5cab1d455 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SnapshotNotExistException; @@ -37,6 +38,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.math.BigDecimal; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -652,6 +654,33 @@ public void testScanBounded() { assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } + @Test + public void testIncrementTagQueryWithRescaleBucket() throws Exception { + sql("CREATE TABLE test (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '1')"); + Table table = paimonTable("test"); + + sql("INSERT INTO test VALUES (1, 11), (2, 22)"); + sql("ALTER TABLE test SET ('bucket' = '2')"); + sql("INSERT OVERWRITE test SELECT * FROM test"); + sql("INSERT INTO test VALUES (3, 33)"); + + table.createTag("2024-01-01", 1); + table.createTag("2024-01-02", 3); + + List incrementalOptions = + Arrays.asList( + "'incremental-between'='2024-01-01,2024-01-02'", + "'incremental-to-auto-tag'='2024-01-02'"); + + for (String option : incrementalOptions) { + assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) */", option)) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "The bucket number of two tags are different (1, 2), which is not supported in incremental tag query.")); + } + } + private void validateCount1PushDown(String sql) { Transformation transformation = AbstractTestBase.translate(tEnv, sql); while (!transformation.getInputs().isEmpty()) {