diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql index 058bbfd7e191b6..87b290e1a551ea 100644 --- a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql @@ -85,4 +85,26 @@ INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES (20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY 82201', 'PROCESSING', false), (20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987', 'SHIPPED', true), (20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND 58301', 'PENDING', false), -(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true); \ No newline at end of file +(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true); + + +-- time travle schema change +ALTER TABLE test_paimon_time_travel_db.tbl_time_travel ADD COLUMNS ( + new_col1 INT +); + +-- - snpashot 5 +INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES +(6001, 9001, '2024-02-11', 456.80, '123 New Street, Downtown, CA 90210', 'COMPLETED', true, 100), +(6002, 9002, '2024-02-12', 289.45, '456 Updated Ave, Midtown, NY 10001', 'PROCESSING', false, 200), +(6003, 9003, '2024-02-13', 378.90, '789 Modern Blvd, Uptown, TX 75201', 'SHIPPED', true, 300); + +CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_5', snapshot => 5); + +-- - snapshot 6 +INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES +(6004, 9004, '2024-02-14', 199.99, '321 Future Lane, Innovation, WA 98001', 'PENDING', true, 400), +(6005, 9005, '2024-02-15', 567.25, '654 Progress Drive, Tech City, OR 97201', 'COMPLETED', false, 500), +(6006, 9006, '2024-02-16', 123.75, '987 Advanced Court, Silicon Valley, CA 94301', 'CANCELLED', true, 600); + +CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_6', snapshot => 6); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 15f83d0636cb50..06983786849428 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -102,20 +102,45 @@ public Table getPaimonTable(Optional snapshot) { private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional tableSnapshot, Optional scanParams) { makeSureInitialized(); + + // Current limitation: cannot specify both table snapshot and scan parameters simultaneously. if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) { // If a snapshot is specified, // use the specified snapshot and the corresponding schema(not the latest // schema). try { Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams); + Table dataTable = paimonTable.copy( + Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id()))); return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY, - new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable)); + new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), dataTable)); } catch (Exception e) { LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e); throw new RuntimeException( "Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()), e); } + } else if (scanParams.isPresent() && scanParams.get().isBranch()) { + try { + String branch = PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable); + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(getOrBuildNameMapping(), branch, null); + Optional latestSnapshot = table.latestSnapshot(); + long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; + if (latestSnapshot.isPresent()) { + latestSnapshotId = latestSnapshot.get().id(); + } + // Branches in Paimon can have independent schemas and snapshots. + // TODO: Add time travel support for paimon branch tables. + DataTable dataTable = (DataTable) table; + Long schemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L); + return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY, + new PaimonSnapshot(latestSnapshotId, schemaId, dataTable)); + } catch (Exception e) { + LOG.warn("Failed to get Paimon branch for table {}", paimonTable.name(), e); + throw new RuntimeException( + "Failed to get Paimon branch: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()), + e); + } } else { // Otherwise, use the latest snapshot and the latest schema. return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 9d108c2ad9a45a..641322a7be8e56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -29,9 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hive.HiveUtil; -import org.apache.doris.datasource.paimon.source.PaimonSource; import org.apache.doris.thrift.TColumnType; import org.apache.doris.thrift.TPrimitiveType; import org.apache.doris.thrift.schema.external.TArrayField; @@ -48,8 +46,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.StartupMode; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -88,7 +84,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -103,22 +98,6 @@ public class PaimonUtil { private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+"); - private static final List> PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList( - CoreOptions.SCAN_SNAPSHOT_ID, - CoreOptions.SCAN_TAG_NAME, - CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, - CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, - CoreOptions.INCREMENTAL_BETWEEN, - CoreOptions.INCREMENTAL_TO_AUTO_TAG); - - private static final List> PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList( - CoreOptions.SCAN_TIMESTAMP_MILLIS, - CoreOptions.SCAN_TIMESTAMP, - CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, - CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, - CoreOptions.INCREMENTAL_BETWEEN, - CoreOptions.INCREMENTAL_TO_AUTO_TAG); - public static List read( Table table, @Nullable int[] projection, @Nullable Predicate predicate, Pair, String>... dynamicOptions) @@ -430,100 +409,6 @@ public static String encodeObjectToString(T t) { } } - /** - * Builds a snapshot-specific table for time travel queries. - * - * @param baseTable the base Paimon table to copy configuration from - * @param tableSnapshot the snapshot specification (type + value) - * @return a Table instance configured for the specified time travel query - * @throws UserException if snapshot configuration is invalid - */ - public static Table getTableBySnapshot(Table baseTable, TableSnapshot tableSnapshot) - throws UserException { - final String value = tableSnapshot.getValue(); - final TableSnapshot.VersionType type = tableSnapshot.getType(); - final boolean isDigital = DIGITAL_REGEX.matcher(value).matches(); - - switch (type) { - case TIME: - return isDigital - ? getTableBySnapshotTimestampMillis(baseTable, value) - : getTableBySnapshotTime(baseTable, value); - - case VERSION: - if (isDigital) { - return getTableBySnapshotId(baseTable, value); - } - return getTableByTag(baseTable, value); - - default: - throw new UserException(String.format("Unsupported version type: %s", type)); - } - } - - /** - * Builds a table configured to read from a specific snapshot ID. - * - * @param baseTable the base Paimon table to copy configuration from - * @param snapshotId the snapshot ID as a string - * @return a Table instance configured to read from the specified snapshot ID - */ - private static Table getTableBySnapshotId(Table baseTable, String snapshotId) { - Map options = new HashMap<>( - PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_SNAPSHOT startup mode, must set only one key in: - // [scan_tag_name, scan_watermark, scan_snapshot_id] - options.put(CoreOptions.SCAN_TAG_NAME.key(), null); - options.put(CoreOptions.SCAN_WATERMARK.key(), null); - options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - - /** - * Builds a table configured to read from a specific timestamp. - * - * @param baseTable the base Paimon table to copy configuration from - * @param timestampStr the timestamp as a string - * @return a Table instance configured to read from the specified timestamp - */ - private static Table getTableBySnapshotTime(Table baseTable, String timestampStr) { - Map options = new HashMap<>( - PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_TIMESTAMP startup mode, must set only one key in: - // [scan_timestamp, scan_timestamp_millis] - options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString()); - options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr); - options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - - /** - * Builds a table configured to read from a specific timestamp in milliseconds. - * - * @param baseTable the base Paimon table to copy configuration from - * @param timestampStr the timestamp in milliseconds as a string - * @return a Table instance configured to read from the specified timestamp - */ - private static Table getTableBySnapshotTimestampMillis(Table baseTable, String timestampStr) { - Map options = new HashMap<>( - PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_TIMESTAMP startup mode, must set only one key in: - // [scan_timestamp, scan_timestamp_millis] - options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString()); - options.put(CoreOptions.SCAN_TIMESTAMP.key(), null); - options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - /** * Extracts the reference name (branch or tag name) from table scan parameters. * @@ -545,53 +430,6 @@ public static String extractBranchOrTagName(TableScanParams scanParams) { } } - - /** - * Builds a branch-specific table for time travel queries. - * - * @param source the Paimon source containing catalog and table information - * @param baseTable the base Paimon table - * @param branchName the branch name - * @return a Table instance configured to read from the specified branch - * @throws UserException if branch does not exist - */ - public static Table getTableByBranch(PaimonSource source, Table baseTable, String branchName) throws UserException { - - if (!checkBranchExists(baseTable, branchName)) { - throw new UserException(String.format("Branch '%s' does not exist", branchName)); - } - - PaimonExternalCatalog catalog = (PaimonExternalCatalog) source.getCatalog(); - ExternalTable externalTable = (ExternalTable) source.getTargetTable(); - return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(), branchName, null); - } - - /** - * Builds a tag-specific table for time travel queries. - * - * @param baseTable the base Paimon table to copy configuration from - * @param tagName the tag name - * @return a Table instance configured to read from the specified tag - * @throws UserException if tag does not exist - */ - public static Table getTableByTag(Table baseTable, String tagName) throws UserException { - if (!checkTagsExists(baseTable, tagName)) { - throw new UserException(String.format("Tag '%s' does not exist", tagName)); - } - - Map options = new HashMap<>( - PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_SNAPSHOT startup mode, must set only one key in: - // [scan_tag_name, scan_watermark, scan_snapshot_id] - options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName); - options.put(CoreOptions.SCAN_WATERMARK.key(), null); - options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - // get snapshot info from query like 'for version/time as of' or '@tag' public static Snapshot getPaimonSnapshot(Table table, Optional querySnapshot, Optional scanParams) throws UserException { @@ -632,14 +470,24 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim if (isDigital) { timestampMillis = Long.parseLong(timestamp); } else { - timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone()); + // Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS. + // use default local time zone. + timestampMillis = DateTimeUtils.parseTimestampData(timestamp, 3, TimeUtils.getTimeZone()).getMillisecond(); if (timestampMillis < 0) { throw new DateTimeException("can't parse time: " + timestamp); } } Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis); if (snapshot == null) { - throw new UserException("can't find snapshot older than : " + timestamp); + Snapshot earliestSnapshot = table.snapshotManager().earliestSnapshot(); + throw new UserException( + String.format( + "There is currently no snapshot earlier than or equal to timestamp [%s], " + + "the earliest snapshot's timestamp is [%s]", + timestampMillis, + earliestSnapshot == null + ? "null" + : String.valueOf(earliestSnapshot.timeMillis()))); } return snapshot; } @@ -661,51 +509,18 @@ private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName) return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName)); } - /** - * Creates a map of conflicting Paimon options with null values for exclusion. - * - * @param illegalOptions the list of ConfigOptions that should be set to null - * @return a HashMap containing the illegal options as keys with null values - */ - public static Map excludePaimonConflictOptions(List> illegalOptions) { - return illegalOptions.stream() - .collect(HashMap::new, - (m, option) -> m.put(option.key(), null), - HashMap::putAll); - } - - /** - * Checks if a tag exists in the given table. - * - * @param baseTable the Paimon table - * @param tagName the tag name to check - * @return true if tag exists, false otherwise - * @throws UserException if table is not a FileStoreTable - */ - public static boolean checkTagsExists(Table baseTable, String tagName) throws UserException { + public static String resolvePaimonBranch(TableScanParams tableScanParams, Table baseTable) + throws UserException { + String branchName = extractBranchOrTagName(tableScanParams); if (!(baseTable instanceof FileStoreTable)) { throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName()); } final FileStoreTable fileStoreTable = (FileStoreTable) baseTable; - return fileStoreTable.tagManager().tagExists(tagName); - } - - /** - * Checks if a branch exists in the given table. - * - * @param baseTable the Paimon table - * @param branchName the branch name to check - * @return true if branch exists, false otherwise - * @throws UserException if table is not a FileStoreTable - */ - public static boolean checkBranchExists(Table baseTable, String branchName) throws UserException { - if (!(baseTable instanceof FileStoreTable)) { - throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName()); + if (!fileStoreTable.branchManager().branchExists(branchName)) { + throw new UserException("can't find branch: " + branchName); } - - final FileStoreTable fileStoreTable = (FileStoreTable) baseTable; - return fileStoreTable.branchManager().branchExists(branchName); + return branchName; } public static Map getPartitionInfoMap(Table table, BinaryRow partitionValues, String timeZone) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index a5d3060c4dd55d..05a4a133fc0cbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -18,7 +18,6 @@ package org.apache.doris.datasource.paimon.source; import org.apache.doris.analysis.TableScanParams; -import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; @@ -677,39 +676,16 @@ public static Map validateIncrementalReadParams(Map - * This method handles different scan modes including incremental reads and system tables, - * applying the necessary transformations to the base Paimon table. - * - * @return processed Paimon table object configured according to scan parameters - * @throws UserException when system table configuration is incorrect - */ private Table getProcessedTable() throws UserException { Table baseTable = source.getPaimonTable(); - if (getScanParams() != null && getQueryTableSnapshot() != null) { - throw new UserException("Can not specify scan params and table snapshot at same time."); - } TableScanParams theScanParams = getScanParams(); - if (theScanParams != null) { - if (theScanParams.incrementalRead()) { - return baseTable.copy(getIncrReadParams()); - } - - if (theScanParams.isBranch()) { - return PaimonUtil.getTableByBranch(source, baseTable, PaimonUtil.extractBranchOrTagName(theScanParams)); - } - if (theScanParams.isTag()) { - return PaimonUtil.getTableByTag(baseTable, PaimonUtil.extractBranchOrTagName(theScanParams)); - } + if (theScanParams != null && getQueryTableSnapshot() != null) { + throw new UserException("Can not specify scan params and table snapshot at same time."); } - TableSnapshot theTableSnapshot = getQueryTableSnapshot(); - if (theTableSnapshot != null) { - return PaimonUtil.getTableBySnapshot(baseTable, theTableSnapshot); + if (theScanParams != null && theScanParams.incrementalRead()) { + return baseTable.copy(getIncrReadParams()); } - return baseTable; } } diff --git a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out index 3527cc4604e6f3..93a7c42e165ba7 100644 --- a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out +++ b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out @@ -1001,3 +1001,101 @@ true 7 false 5 true 7 +-- !schema_change_snapshot_5_version_count -- +15 + +-- !schema_change_snapshot_5_version -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 + +-- !schema_change_snapshot_6_version_count -- +18 + +-- !schema_change_snapshot_6_version -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 +6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001 PENDING true 400 +6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201 COMPLETED false 500 +6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA 94301 CANCELLED true 600 + +-- !time_zone_time_travel_basic -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true + +-- !time_zone_time_travel_plus08_jni_true -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true + +-- !time_zone_time_travel_plus08_jni_false -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true + +-- !time_zone_time_travel_plus06_jni_true -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 +6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001 PENDING true 400 +6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201 COMPLETED false 500 +6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA 94301 CANCELLED true 600 + +-- !time_zone_time_travel_plus06_jni_false -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 +6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001 PENDING true 400 +6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201 COMPLETED false 500 +6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA 94301 CANCELLED true 600 + diff --git a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy index 99a4e94a4c6025..96e6551aa884dc 100644 --- a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy +++ b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy @@ -16,6 +16,8 @@ // under the License. import java.time.format.DateTimeFormatter +import java.time.format.DateTimeFormatterBuilder +import java.time.temporal.ChronoField import java.time.LocalDateTime import java.time.ZoneId @@ -29,12 +31,23 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d return } // Create date time formatter - + DateTimeFormatter unifiedFormatter = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd") + .optionalStart() + .appendLiteral('T') + .optionalEnd() + .optionalStart() + .appendLiteral(' ') + .optionalEnd() + .appendPattern("HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 3, true) + .optionalEnd() + .toFormatter() String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String catalog_name = "test_paimon_time_travel_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - DateTimeFormatter iso_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS") - DateTimeFormatter standard_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") String db_name = "test_paimon_time_travel_db" String tableName = "tbl_time_travel" try { @@ -54,8 +67,8 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d sql """switch `${catalog_name}`""" logger.info("switched to catalog " + catalog_name) sql """use ${db_name}""" - //system table snapshots to get create time. - List> snapshotRes = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id;""" + // Query system table snapshots to get creation time. Get the first four snapshots before schema change (snapshot IDs 1-4) + List> snapshotRes = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id limit 4;""" logger.info("Query result from ${tableName}\$snapshots: ${snapshotRes}") assertTrue(snapshotRes.size()==4) assertTrue(snapshotRes[0].size()==2) @@ -68,14 +81,8 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d logger.info("Processing snapshot ${index + 1}: ID=${snapshotId}, commit_time=${commitTime}") try { - LocalDateTime dateTime; - if (commitTime.contains("T")){ - dateTime = LocalDateTime.parse(commitTime, iso_formatter) - }else { - dateTime = LocalDateTime.parse(commitTime, standard_formatter) - } - - String snapshotTime = dateTime.atZone(ZoneId.systemDefault()).format(standard_formatter); + LocalDateTime dateTime = LocalDateTime.parse(commitTime, unifiedFormatter) + String snapshotTime = dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter); long timestamp = dateTime.atZone(ZoneId.systemDefault()) .toInstant() .toEpochMilli() @@ -191,8 +198,8 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } } - - List> tagsResult = sql """ select snapshot_id,tag_name from ${tableName}\$tags order by snapshot_id;""" + // Get the previous 4 snapshot IDs and their corresponding tags + List> tagsResult = sql """ select snapshot_id,tag_name from ${tableName}\$tags order by snapshot_id limit 4;""" logger.info("Query result from ${tableName}\$tags: ${tagsResult}") assertTrue(tagsResult.size()==4) assertTrue(tagsResult[0].size()==2) @@ -239,13 +246,87 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } } + /** + * Test time travel queries on snapshots created after schema changes. + * + * Background: run09.sql adds a new column, creating a schema evolution + * at snapshot ID 4. This test verifies that time travel works correctly + * with snapshots (ID > 4) that use the updated schema. + */ + List> snapshotSchemaChangeAfterRes = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots where snapshot_id > 4 order by snapshot_id limit 2;""" + logger.info("Query result from ${tableName}\$snapshots after schema change: ${snapshotSchemaChangeAfterRes}") + + snapshotSchemaChangeAfterRes.eachWithIndex { snapshotRow, index -> + int snapshotId = snapshotRow[0] as int + try { + String baseQueryName = "qt_schema_change_snapshot_${snapshotId}" + + // Time travel by snapshot ID after schema change + "${baseQueryName}_version_count" """select count(*) from ${tableName} FOR VERSION AS OF ${snapshotId} ;""" + "${baseQueryName}_version" """select * from ${tableName} FOR VERSION AS OF ${snapshotId} order by order_id;""" + logger.info("Completed schema change queries for snapshot ${snapshotId}") + + } catch (Exception e) { + logger.error("Failed to process schema change snapshot ${snapshotId}: ${e.message}") + throw e + } + } + + + // Test time zone behavior with time travel queries + List> timeTravelZone = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id limit 1;""" + logger.info("Query result from ${tableName}\$snapshots: ${timeTravelZone}") + + String commitTime = timeTravelZone[0][1] as String + + LocalDateTime dateTime = LocalDateTime.parse(commitTime, unifiedFormatter) + String snapshotTime = dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter) + + try { + // Basic time string query + qt_time_zone_time_travel_basic """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + // Test with +08:00 timezone + sql """set force_jni_scanner=true; set time_zone='+08:00';""" + qt_time_zone_time_travel_plus08_jni_true """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + sql """set force_jni_scanner=false;""" + qt_time_zone_time_travel_plus08_jni_false """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + // Test with +06:00 timezone + sql """set force_jni_scanner=true; set time_zone='+06:00';""" + qt_time_zone_time_travel_plus06_jni_true """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + sql """set force_jni_scanner=false;""" + qt_time_zone_time_travel_plus06_jni_false """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + // Test with +10:00 timezone - these should throw exceptions + sql """set force_jni_scanner=true; set time_zone='+10:00';""" + test { + sql """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + exception ("There is currently no snapshot earlier than or equal to timestamp") + } + + sql """set force_jni_scanner=false;""" + test { + sql """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + exception ("There is currently no snapshot earlier than or equal to timestamp") + } + + } finally { + sql """ unset variable time_zone; """ + sql """ set force_jni_scanner = false; """ + } + + + // Error handling tests test { sql """ select * from ${tableName}@branch('name'='not_exists_branch'); """ - exception "Branch 'not_exists_branch' does not exist" + exception "can't find branch: not_exists_branch" } test { sql """ select * from ${tableName}@branch(not_exists_branch); """ - exception "Branch 'not_exists_branch' does not exist" + exception "can't find branch: not_exists_branch" } test { sql """ select * from ${tableName}@tag('name'='not_exists_tag'); """ @@ -278,6 +359,4 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } finally { // sql """drop catalog if exists ${catalog_name}""" } -} - - +} \ No newline at end of file