From 042cd915cbcba67f00e2c11b3e0f1d8684557eef Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Fri, 5 Sep 2025 16:41:43 +0800 Subject: [PATCH 01/16] remove redurant code --- .../doris/datasource/paimon/source/PaimonScanNode.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index bccecd30d7693b..0168b69be5105d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -695,16 +695,7 @@ private Table getProcessedTable() throws UserException { if (theScanParams.isBranch()) { return PaimonUtil.getTableByBranch(source, baseTable, PaimonUtil.extractBranchOrTagName(theScanParams)); } - if (theScanParams.isTag()) { - return PaimonUtil.getTableByTag(baseTable, PaimonUtil.extractBranchOrTagName(theScanParams)); - } - } - - TableSnapshot theTableSnapshot = getQueryTableSnapshot(); - if (theTableSnapshot != null) { - return PaimonUtil.getTableBySnapshot(baseTable, theTableSnapshot); } - return baseTable; } } From 714b59608e16827be0a3359509fe3b06c6107710 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Fri, 5 Sep 2025 20:37:58 +0800 Subject: [PATCH 02/16] remove redurant code --- .../paimon/PaimonExternalTable.java | 12 ++++---- .../paimon/PaimonMetadataCache.java | 29 ++++++++++--------- .../paimon/PaimonSnapshotCacheKey.java | 17 +++++++---- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 013aab4f479177..e5f5f3dbee8f72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -102,25 +102,25 @@ public Table getPaimonTable(Optional snapshot) { private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional tableSnapshot, Optional scanParams) { makeSureInitialized(); + long snapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) { // If a snapshot is specified, // use the specified snapshot and the corresponding schema(not the latest // schema). try { Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams); - return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY, - new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable)); + snapshotId = snapshot.id(); } catch (Exception e) { LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e); throw new RuntimeException( "Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()), e); } - } else { - // Otherwise, use the latest snapshot and the latest schema. - return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSnapshot(this); } + // Otherwise, use the latest snapshot and the latest schema. + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(this, snapshotId); + } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 11a76b0022d604..326a25d877fec0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -67,11 +67,11 @@ public PaimonMetadataCache(ExecutorService executor) { private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { NameMapping nameMapping = key.getNameMapping(); try { - PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonSnapshot paimonSnapshot = loadPaimonSnapshot(key); List partitionColumns = getPaimonSchemaCacheValue(nameMapping, - latestSnapshot.getSchemaId()).getPartitionColumns(); + paimonSnapshot.getSchemaId()).getPartitionColumns(); PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); - return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); + return new PaimonSnapshotCacheValue(partitionInfo, paimonSnapshot); } catch (Exception e) { throw new CacheException("failed to load paimon snapshot %s.%s.%s or reason: %s", e, nameMapping.getCtlId(), nameMapping.getLocalDbName(), nameMapping.getLocalTblName(), @@ -109,23 +109,26 @@ private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List new IOException("Catalog not found: " + id)); Table table = externalCatalog.getPaimonTable(nameMapping); Table snapshotTable = table; // snapshotId and schemaId - Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; - Optional optionalSnapshot = table.latestSnapshot(); - if (optionalSnapshot.isPresent()) { - latestSnapshotId = optionalSnapshot.get().id(); - snapshotTable = - table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString())); + Long currentSnapshotId = key.getSnapshotId(); + if (currentSnapshotId == PaimonSnapshot.INVALID_SNAPSHOT_ID) { + Optional optionalSnapshot = table.latestSnapshot(); + if (optionalSnapshot.isPresent()) { + currentSnapshotId = optionalSnapshot.get().id(); + } } + snapshotTable = + table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), + Long.toString(currentSnapshotId))); DataTable dataTable = (DataTable) table; long latestSchemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L); - return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable); + return new PaimonSnapshot(currentSnapshotId, latestSchemaId, snapshotTable); } public void invalidateCatalogCache(long catalogId) { @@ -149,8 +152,8 @@ public void invalidateDbCache(long catalogId, String dbName) { .forEach(snapshotCache::invalidate); } - public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable dorisTable) { - PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping()); + public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable dorisTable, long snapshotId) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping(), snapshotId); return snapshotCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java index 6154d607f0b2b3..c4a195b7727505 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -19,34 +19,38 @@ import org.apache.doris.datasource.NameMapping; +import java.util.Objects; import java.util.StringJoiner; public class PaimonSnapshotCacheKey { private final NameMapping nameMapping; + private final long snapshotId; - public PaimonSnapshotCacheKey(NameMapping nameMapping) { + public PaimonSnapshotCacheKey(NameMapping nameMapping, long snapshotId) { this.nameMapping = nameMapping; + this.snapshotId = snapshotId; } public NameMapping getNameMapping() { return nameMapping; } + public long getSnapshotId() { + return snapshotId; + } + @Override public boolean equals(Object o) { - if (this == o) { - return true; - } if (o == null || getClass() != o.getClass()) { return false; } PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; - return nameMapping.equals(that.nameMapping); + return snapshotId == that.snapshotId && Objects.equals(nameMapping, that.nameMapping); } @Override public int hashCode() { - return nameMapping.hashCode(); + return Objects.hash(nameMapping, snapshotId); } @Override @@ -55,6 +59,7 @@ public String toString() { .add("catalog=" + nameMapping.getCtlId()) .add("dbName='" + nameMapping.getLocalDbName() + "'") .add("tableName='" + nameMapping.getLocalTblName() + "'") + .add("snapshotId='" + snapshotId + "'") .toString(); } } From 87c3c4d795f6f336edbc03d5c99a390766936f07 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Mon, 8 Sep 2025 14:35:25 +0800 Subject: [PATCH 03/16] fix checkstyle --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 0168b69be5105d..399eb44ac17eb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -18,7 +18,6 @@ package org.apache.doris.datasource.paimon.source; import org.apache.doris.analysis.TableScanParams; -import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; From ba4a764f0b5ad727aa9924a368536a0699902056 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 10 Sep 2025 10:51:30 +0800 Subject: [PATCH 04/16] Revert "remove redurant code" This reverts commit 714b59608e16827be0a3359509fe3b06c6107710. --- .../paimon/PaimonExternalTable.java | 12 ++++---- .../paimon/PaimonMetadataCache.java | 29 +++++++++---------- .../paimon/PaimonSnapshotCacheKey.java | 17 ++++------- 3 files changed, 25 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index e5f5f3dbee8f72..013aab4f479177 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -102,25 +102,25 @@ public Table getPaimonTable(Optional snapshot) { private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional tableSnapshot, Optional scanParams) { makeSureInitialized(); - long snapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) { // If a snapshot is specified, // use the specified snapshot and the corresponding schema(not the latest // schema). try { Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams); - snapshotId = snapshot.id(); + return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY, + new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable)); } catch (Exception e) { LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e); throw new RuntimeException( "Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()), e); } + } else { + // Otherwise, use the latest snapshot and the latest schema. + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(this); } - // Otherwise, use the latest snapshot and the latest schema. - return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSnapshot(this, snapshotId); - } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 326a25d877fec0..11a76b0022d604 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -67,11 +67,11 @@ public PaimonMetadataCache(ExecutorService executor) { private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { NameMapping nameMapping = key.getNameMapping(); try { - PaimonSnapshot paimonSnapshot = loadPaimonSnapshot(key); + PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); List partitionColumns = getPaimonSchemaCacheValue(nameMapping, - paimonSnapshot.getSchemaId()).getPartitionColumns(); + latestSnapshot.getSchemaId()).getPartitionColumns(); PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); - return new PaimonSnapshotCacheValue(partitionInfo, paimonSnapshot); + return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); } catch (Exception e) { throw new CacheException("failed to load paimon snapshot %s.%s.%s or reason: %s", e, nameMapping.getCtlId(), nameMapping.getLocalDbName(), nameMapping.getLocalTblName(), @@ -109,26 +109,23 @@ private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List new IOException("Catalog not found: " + id)); Table table = externalCatalog.getPaimonTable(nameMapping); Table snapshotTable = table; // snapshotId and schemaId - Long currentSnapshotId = key.getSnapshotId(); - if (currentSnapshotId == PaimonSnapshot.INVALID_SNAPSHOT_ID) { - Optional optionalSnapshot = table.latestSnapshot(); - if (optionalSnapshot.isPresent()) { - currentSnapshotId = optionalSnapshot.get().id(); - } + Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; + Optional optionalSnapshot = table.latestSnapshot(); + if (optionalSnapshot.isPresent()) { + latestSnapshotId = optionalSnapshot.get().id(); + snapshotTable = + table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString())); } - snapshotTable = - table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), - Long.toString(currentSnapshotId))); DataTable dataTable = (DataTable) table; long latestSchemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L); - return new PaimonSnapshot(currentSnapshotId, latestSchemaId, snapshotTable); + return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable); } public void invalidateCatalogCache(long catalogId) { @@ -152,8 +149,8 @@ public void invalidateDbCache(long catalogId, String dbName) { .forEach(snapshotCache::invalidate); } - public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable dorisTable, long snapshotId) { - PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping(), snapshotId); + public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable dorisTable) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping()); return snapshotCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java index c4a195b7727505..6154d607f0b2b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -19,38 +19,34 @@ import org.apache.doris.datasource.NameMapping; -import java.util.Objects; import java.util.StringJoiner; public class PaimonSnapshotCacheKey { private final NameMapping nameMapping; - private final long snapshotId; - public PaimonSnapshotCacheKey(NameMapping nameMapping, long snapshotId) { + public PaimonSnapshotCacheKey(NameMapping nameMapping) { this.nameMapping = nameMapping; - this.snapshotId = snapshotId; } public NameMapping getNameMapping() { return nameMapping; } - public long getSnapshotId() { - return snapshotId; - } - @Override public boolean equals(Object o) { + if (this == o) { + return true; + } if (o == null || getClass() != o.getClass()) { return false; } PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; - return snapshotId == that.snapshotId && Objects.equals(nameMapping, that.nameMapping); + return nameMapping.equals(that.nameMapping); } @Override public int hashCode() { - return Objects.hash(nameMapping, snapshotId); + return nameMapping.hashCode(); } @Override @@ -59,7 +55,6 @@ public String toString() { .add("catalog=" + nameMapping.getCtlId()) .add("dbName='" + nameMapping.getLocalDbName() + "'") .add("tableName='" + nameMapping.getLocalTblName() + "'") - .add("snapshotId='" + snapshotId + "'") .toString(); } } From 7c968406760800bebd1c9516a0de6fd4bfefd099 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 10 Sep 2025 11:09:32 +0800 Subject: [PATCH 05/16] remove time travel redundant code to reduce io times --- .../doris/datasource/paimon/PaimonUtil.java | 172 +----------------- .../paimon/source/PaimonScanNode.java | 9 - 2 files changed, 1 insertion(+), 180 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 35883cd73161dd..56e7f54da347a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -48,8 +48,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.StartupMode; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -88,7 +86,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -103,22 +100,6 @@ public class PaimonUtil { private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+"); - private static final List> PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList( - CoreOptions.SCAN_SNAPSHOT_ID, - CoreOptions.SCAN_TAG_NAME, - CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, - CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, - CoreOptions.INCREMENTAL_BETWEEN, - CoreOptions.INCREMENTAL_TO_AUTO_TAG); - - private static final List> PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList( - CoreOptions.SCAN_TIMESTAMP_MILLIS, - CoreOptions.SCAN_TIMESTAMP, - CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, - CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, - CoreOptions.INCREMENTAL_BETWEEN, - CoreOptions.INCREMENTAL_TO_AUTO_TAG); - public static List read( Table table, @Nullable int[] projection, @Nullable Predicate predicate, Pair, String>... dynamicOptions) @@ -509,100 +490,6 @@ private static String serializePartitionValue(org.apache.paimon.types.DataType t } } - /** - * Builds a snapshot-specific table for time travel queries. - * - * @param baseTable the base Paimon table to copy configuration from - * @param tableSnapshot the snapshot specification (type + value) - * @return a Table instance configured for the specified time travel query - * @throws UserException if snapshot configuration is invalid - */ - public static Table getTableBySnapshot(Table baseTable, TableSnapshot tableSnapshot) - throws UserException { - final String value = tableSnapshot.getValue(); - final TableSnapshot.VersionType type = tableSnapshot.getType(); - final boolean isDigital = DIGITAL_REGEX.matcher(value).matches(); - - switch (type) { - case TIME: - return isDigital - ? getTableBySnapshotTimestampMillis(baseTable, value) - : getTableBySnapshotTime(baseTable, value); - - case VERSION: - if (isDigital) { - return getTableBySnapshotId(baseTable, value); - } - return getTableByTag(baseTable, value); - - default: - throw new UserException(String.format("Unsupported version type: %s", type)); - } - } - - /** - * Builds a table configured to read from a specific snapshot ID. - * - * @param baseTable the base Paimon table to copy configuration from - * @param snapshotId the snapshot ID as a string - * @return a Table instance configured to read from the specified snapshot ID - */ - private static Table getTableBySnapshotId(Table baseTable, String snapshotId) { - Map options = new HashMap<>( - PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_SNAPSHOT startup mode, must set only one key in: - // [scan_tag_name, scan_watermark, scan_snapshot_id] - options.put(CoreOptions.SCAN_TAG_NAME.key(), null); - options.put(CoreOptions.SCAN_WATERMARK.key(), null); - options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - - /** - * Builds a table configured to read from a specific timestamp. - * - * @param baseTable the base Paimon table to copy configuration from - * @param timestampStr the timestamp as a string - * @return a Table instance configured to read from the specified timestamp - */ - private static Table getTableBySnapshotTime(Table baseTable, String timestampStr) { - Map options = new HashMap<>( - PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_TIMESTAMP startup mode, must set only one key in: - // [scan_timestamp, scan_timestamp_millis] - options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString()); - options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr); - options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - - /** - * Builds a table configured to read from a specific timestamp in milliseconds. - * - * @param baseTable the base Paimon table to copy configuration from - * @param timestampStr the timestamp in milliseconds as a string - * @return a Table instance configured to read from the specified timestamp - */ - private static Table getTableBySnapshotTimestampMillis(Table baseTable, String timestampStr) { - Map options = new HashMap<>( - PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_TIMESTAMP startup mode, must set only one key in: - // [scan_timestamp, scan_timestamp_millis] - options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString()); - options.put(CoreOptions.SCAN_TIMESTAMP.key(), null); - options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - /** * Extracts the reference name (branch or tag name) from table scan parameters. * @@ -624,7 +511,6 @@ public static String extractBranchOrTagName(TableScanParams scanParams) { } } - /** * Builds a branch-specific table for time travel queries. * @@ -645,32 +531,6 @@ public static Table getTableByBranch(PaimonSource source, Table baseTable, Strin return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(), branchName, null); } - /** - * Builds a tag-specific table for time travel queries. - * - * @param baseTable the base Paimon table to copy configuration from - * @param tagName the tag name - * @return a Table instance configured to read from the specified tag - * @throws UserException if tag does not exist - */ - public static Table getTableByTag(Table baseTable, String tagName) throws UserException { - if (!checkTagsExists(baseTable, tagName)) { - throw new UserException(String.format("Tag '%s' does not exist", tagName)); - } - - Map options = new HashMap<>( - PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3); - - // For Paimon FROM_SNAPSHOT startup mode, must set only one key in: - // [scan_tag_name, scan_watermark, scan_snapshot_id] - options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName); - options.put(CoreOptions.SCAN_WATERMARK.key(), null); - options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null); - options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS)); - - return baseTable.copy(options); - } - // get snapshot info from query like 'for version/time as of' or '@tag' public static Snapshot getPaimonSnapshot(Table table, Optional querySnapshot, Optional scanParams) throws UserException { @@ -718,7 +578,7 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim } Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis); if (snapshot == null) { - throw new UserException("can't find snapshot older than : " + timestamp); + throw new UserException("can't find snapshot older than : " + timestampMillis); } return snapshot; } @@ -740,36 +600,6 @@ private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName) return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName)); } - /** - * Creates a map of conflicting Paimon options with null values for exclusion. - * - * @param illegalOptions the list of ConfigOptions that should be set to null - * @return a HashMap containing the illegal options as keys with null values - */ - public static Map excludePaimonConflictOptions(List> illegalOptions) { - return illegalOptions.stream() - .collect(HashMap::new, - (m, option) -> m.put(option.key(), null), - HashMap::putAll); - } - - /** - * Checks if a tag exists in the given table. - * - * @param baseTable the Paimon table - * @param tagName the tag name to check - * @return true if tag exists, false otherwise - * @throws UserException if table is not a FileStoreTable - */ - public static boolean checkTagsExists(Table baseTable, String tagName) throws UserException { - if (!(baseTable instanceof FileStoreTable)) { - throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName()); - } - - final FileStoreTable fileStoreTable = (FileStoreTable) baseTable; - return fileStoreTable.tagManager().tagExists(tagName); - } - /** * Checks if a branch exists in the given table. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 399eb44ac17eb0..9111e53d3b7009 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -671,15 +671,6 @@ public static Map validateIncrementalReadParams(Map - * This method handles different scan modes including incremental reads and system tables, - * applying the necessary transformations to the base Paimon table. - * - * @return processed Paimon table object configured according to scan parameters - * @throws UserException when system table configuration is incorrect - */ private Table getProcessedTable() throws UserException { Table baseTable = source.getPaimonTable(); if (getScanParams() != null && getQueryTableSnapshot() != null) { From e163134dcb44c822601f91306fb3a73aad4ac14d Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 10 Sep 2025 12:20:58 +0800 Subject: [PATCH 06/16] debug options --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 9111e53d3b7009..bf376220fc5f0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -673,6 +673,7 @@ public static Map validateIncrementalReadParams(Map options = baseTable.options(); if (getScanParams() != null && getQueryTableSnapshot() != null) { throw new UserException("Can not specify scan params and table snapshot at same time."); } From f8c98a46a649dfb29aed9eda483992f03e319146 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 10 Sep 2025 12:31:20 +0800 Subject: [PATCH 07/16] debug options --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index bf376220fc5f0a..7f41b7730a1b9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -674,6 +674,10 @@ public static Map validateIncrementalReadParams(Map options = baseTable.options(); + boolean b = options.containsKey(PAIMON_SCAN_SNAPSHOT_ID); + if (b) { + LOG.info("debeg snapshot id" + options.get(PAIMON_SCAN_SNAPSHOT_ID)); + } if (getScanParams() != null && getQueryTableSnapshot() != null) { throw new UserException("Can not specify scan params and table snapshot at same time."); } From c3b9a553e2010984ecc9841b3a3450410eec8ca1 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Thu, 11 Sep 2025 00:01:28 +0800 Subject: [PATCH 08/16] optimize code --- .../apache/doris/datasource/paimon/PaimonExternalTable.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 013aab4f479177..1e5ba50bed6396 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -108,8 +108,10 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional Date: Thu, 11 Sep 2025 10:20:01 +0800 Subject: [PATCH 09/16] optimize code --- .../java/org/apache/doris/datasource/paimon/PaimonUtil.java | 2 +- .../doris/datasource/paimon/source/PaimonScanNode.java | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 56e7f54da347a9..83cf9588502ca4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -578,7 +578,7 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim } Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis); if (snapshot == null) { - throw new UserException("can't find snapshot older than : " + timestampMillis); + throw new UserException("can't find snapshot earlier or equal timestamp millis : " + timestampMillis); } return snapshot; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 7f41b7730a1b9d..9111e53d3b7009 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -673,11 +673,6 @@ public static Map validateIncrementalReadParams(Map options = baseTable.options(); - boolean b = options.containsKey(PAIMON_SCAN_SNAPSHOT_ID); - if (b) { - LOG.info("debeg snapshot id" + options.get(PAIMON_SCAN_SNAPSHOT_ID)); - } if (getScanParams() != null && getQueryTableSnapshot() != null) { throw new UserException("Can not specify scan params and table snapshot at same time."); } From f68c583ff3d2b2123d4819ce056ae8888c9b033a Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Thu, 11 Sep 2025 11:44:38 +0800 Subject: [PATCH 10/16] optimize code --- .../org/apache/doris/datasource/paimon/PaimonUtil.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 83cf9588502ca4..24fe46ccd2ff35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -578,7 +578,15 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim } Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis); if (snapshot == null) { - throw new UserException("can't find snapshot earlier or equal timestamp millis : " + timestampMillis); + Snapshot earliestSnapshot = table.snapshotManager().earliestSnapshot(); + throw new UserException( + String.format( + "There is currently no snapshot earlier than or equal to timestamp [%s], " + + "the earliest snapshot's timestamp is [%s]", + timestampMillis, + earliestSnapshot == null + ? "null" + : String.valueOf(earliestSnapshot.timeMillis()))); } return snapshot; } From 6cc5c25eb4b1d0bd6aa47cff1c64b951a0123ada Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Fri, 12 Sep 2025 22:26:56 +0800 Subject: [PATCH 11/16] branch independent schema --- .../paimon/PaimonExternalTable.java | 21 ++++++++++ .../doris/datasource/paimon/PaimonUtil.java | 39 ++++--------------- .../paimon/source/PaimonScanNode.java | 15 +++---- 3 files changed, 33 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 1e5ba50bed6396..633d9c5d53929e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -118,6 +118,27 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional latestSnapshot = table.latestSnapshot(); + long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; + if (latestSnapshot.isPresent()) { + latestSnapshotId = latestSnapshot.get().id(); + } + // Branches in Paimon can have independent schemas and snapshots. + // TODO: Add time travel support for paimon branch tables. + DataTable dataTable = (DataTable) table; + Long schemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L); + return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY, + new PaimonSnapshot(latestSnapshotId, schemaId, dataTable)); + } catch (Exception e) { + LOG.warn("Failed to get Paimon branch for table {}", paimonTable.name(), e); + throw new RuntimeException( + "Failed to get Paimon branch: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()), + e); + } } else { // Otherwise, use the latest snapshot and the latest schema. return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 24fe46ccd2ff35..6cfb7c9bcba99c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -29,9 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hive.HiveUtil; -import org.apache.doris.datasource.paimon.source.PaimonSource; import org.apache.doris.thrift.TColumnType; import org.apache.doris.thrift.TPrimitiveType; import org.apache.doris.thrift.schema.external.TArrayField; @@ -511,26 +509,6 @@ public static String extractBranchOrTagName(TableScanParams scanParams) { } } - /** - * Builds a branch-specific table for time travel queries. - * - * @param source the Paimon source containing catalog and table information - * @param baseTable the base Paimon table - * @param branchName the branch name - * @return a Table instance configured to read from the specified branch - * @throws UserException if branch does not exist - */ - public static Table getTableByBranch(PaimonSource source, Table baseTable, String branchName) throws UserException { - - if (!checkBranchExists(baseTable, branchName)) { - throw new UserException(String.format("Branch '%s' does not exist", branchName)); - } - - PaimonExternalCatalog catalog = (PaimonExternalCatalog) source.getCatalog(); - ExternalTable externalTable = (ExternalTable) source.getTargetTable(); - return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(), branchName, null); - } - // get snapshot info from query like 'for version/time as of' or '@tag' public static Snapshot getPaimonSnapshot(Table table, Optional querySnapshot, Optional scanParams) throws UserException { @@ -608,20 +586,17 @@ private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName) return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName)); } - /** - * Checks if a branch exists in the given table. - * - * @param baseTable the Paimon table - * @param branchName the branch name to check - * @return true if branch exists, false otherwise - * @throws UserException if table is not a FileStoreTable - */ - public static boolean checkBranchExists(Table baseTable, String branchName) throws UserException { + public static String getPaimonBranch(TableScanParams tableScanParams, Table baseTable) + throws UserException { + String branchName = extractBranchOrTagName(tableScanParams); if (!(baseTable instanceof FileStoreTable)) { throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName()); } final FileStoreTable fileStoreTable = (FileStoreTable) baseTable; - return fileStoreTable.branchManager().branchExists(branchName); + if (!fileStoreTable.branchManager().branchExists(branchName)) { + throw new UserException("can't find branch: " + branchName); + } + return branchName; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 9111e53d3b7009..d8e572933806a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -465,7 +465,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { sb.append(prefix).append("PaimonSplitStats: \n"); int size = splitStats.size(); if (size <= 4) { - for (SplitStat splitStat : splitStats) { + for (PaimonScanNode.SplitStat splitStat : splitStats) { sb.append(String.format("%s %s\n", prefix, splitStat)); } } else { @@ -673,18 +673,13 @@ public static Map validateIncrementalReadParams(Map Date: Sat, 13 Sep 2025 11:37:47 +0800 Subject: [PATCH 12/16] optimize code --- .../doris/datasource/paimon/PaimonExternalTable.java | 7 ++++--- .../org/apache/doris/datasource/paimon/PaimonUtil.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 633d9c5d53929e..b025c5b18bdfd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -102,6 +102,8 @@ public Table getPaimonTable(Optional snapshot) { private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional tableSnapshot, Optional scanParams) { makeSureInitialized(); + + // Current limitation: cannot specify both table snapshot and scan parameters simultaneously. if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) { // If a snapshot is specified, // use the specified snapshot and the corresponding schema(not the latest @@ -120,7 +122,7 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional latestSnapshot = table.latestSnapshot(); long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; @@ -136,8 +138,7 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional new UserException("can't find snapshot by tag: " + tagName)); } - public static String getPaimonBranch(TableScanParams tableScanParams, Table baseTable) + public static String resolvePaimonBranch(TableScanParams tableScanParams, Table baseTable) throws UserException { String branchName = extractBranchOrTagName(tableScanParams); if (!(baseTable instanceof FileStoreTable)) { From 77fa278ab01fef8b716f64e73ede00450046946c Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Sat, 13 Sep 2025 12:16:58 +0800 Subject: [PATCH 13/16] add support more timestamp format --- .../java/org/apache/doris/datasource/paimon/PaimonUtil.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 7ba29f5b8e9ccb..16a9c5c5b43f6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -549,7 +549,9 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim if (isDigital) { timestampMillis = Long.parseLong(timestamp); } else { - timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone()); + // Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS. + // use default local time zone. + timestampMillis = DateTimeUtils.parseTimestampData(timestamp, 3, TimeUtils.getTimeZone()).getMillisecond(); if (timestampMillis < 0) { throw new DateTimeException("can't parse time: " + timestamp); } From 0b386e981907f92d4d0de7bd6d86244dabe9116a Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Sat, 13 Sep 2025 13:12:20 +0800 Subject: [PATCH 14/16] optimize code --- .../apache/doris/datasource/paimon/PaimonExternalTable.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index b025c5b18bdfd0..3fba39810a242c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -138,7 +138,8 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional Date: Sat, 13 Sep 2025 14:33:13 +0800 Subject: [PATCH 15/16] add regression test --- .../paimon/run09.sql | 24 +++- .../paimon/paimon_time_travel.out | 98 +++++++++++++++ .../paimon/paimon_time_travel.groovy | 119 +++++++++++++++--- 3 files changed, 220 insertions(+), 21 deletions(-) diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql index 058bbfd7e191b6..87b290e1a551ea 100644 --- a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql @@ -85,4 +85,26 @@ INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES (20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY 82201', 'PROCESSING', false), (20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987', 'SHIPPED', true), (20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND 58301', 'PENDING', false), -(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true); \ No newline at end of file +(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true); + + +-- time travle schema change +ALTER TABLE test_paimon_time_travel_db.tbl_time_travel ADD COLUMNS ( + new_col1 INT +); + +-- - snpashot 5 +INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES +(6001, 9001, '2024-02-11', 456.80, '123 New Street, Downtown, CA 90210', 'COMPLETED', true, 100), +(6002, 9002, '2024-02-12', 289.45, '456 Updated Ave, Midtown, NY 10001', 'PROCESSING', false, 200), +(6003, 9003, '2024-02-13', 378.90, '789 Modern Blvd, Uptown, TX 75201', 'SHIPPED', true, 300); + +CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_5', snapshot => 5); + +-- - snapshot 6 +INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES +(6004, 9004, '2024-02-14', 199.99, '321 Future Lane, Innovation, WA 98001', 'PENDING', true, 400), +(6005, 9005, '2024-02-15', 567.25, '654 Progress Drive, Tech City, OR 97201', 'COMPLETED', false, 500), +(6006, 9006, '2024-02-16', 123.75, '987 Advanced Court, Silicon Valley, CA 94301', 'CANCELLED', true, 600); + +CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_6', snapshot => 6); diff --git a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out index 3527cc4604e6f3..93a7c42e165ba7 100644 --- a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out +++ b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out @@ -1001,3 +1001,101 @@ true 7 false 5 true 7 +-- !schema_change_snapshot_5_version_count -- +15 + +-- !schema_change_snapshot_5_version -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 + +-- !schema_change_snapshot_6_version_count -- +18 + +-- !schema_change_snapshot_6_version -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 +6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001 PENDING true 400 +6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201 COMPLETED false 500 +6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA 94301 CANCELLED true 600 + +-- !time_zone_time_travel_basic -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true + +-- !time_zone_time_travel_plus08_jni_true -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true + +-- !time_zone_time_travel_plus08_jni_false -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true + +-- !time_zone_time_travel_plus06_jni_true -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 +6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001 PENDING true 400 +6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201 COMPLETED false 500 +6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA 94301 CANCELLED true 600 + +-- !time_zone_time_travel_plus06_jni_false -- +1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701 COMPLETED true \N +1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507 PROCESSING false \N +1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX 75001 SHIPPED true \N +2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545 PENDING false \N +2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226 COMPLETED true \N +2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024 CANCELLED false \N +3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602 SHIPPED true \N +3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012 PROCESSING true \N +3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI 48097 COMPLETED false \N +5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309 PENDING true \N +5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109 SHIPPED false \N +5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001 COMPLETED true \N +6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210 COMPLETED true 100 +6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001 PROCESSING false 200 +6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201 SHIPPED true 300 +6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001 PENDING true 400 +6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201 COMPLETED false 500 +6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA 94301 CANCELLED true 600 + diff --git a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy index 99a4e94a4c6025..96e6551aa884dc 100644 --- a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy +++ b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy @@ -16,6 +16,8 @@ // under the License. import java.time.format.DateTimeFormatter +import java.time.format.DateTimeFormatterBuilder +import java.time.temporal.ChronoField import java.time.LocalDateTime import java.time.ZoneId @@ -29,12 +31,23 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d return } // Create date time formatter - + DateTimeFormatter unifiedFormatter = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd") + .optionalStart() + .appendLiteral('T') + .optionalEnd() + .optionalStart() + .appendLiteral(' ') + .optionalEnd() + .appendPattern("HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 3, true) + .optionalEnd() + .toFormatter() String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String catalog_name = "test_paimon_time_travel_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - DateTimeFormatter iso_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS") - DateTimeFormatter standard_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") String db_name = "test_paimon_time_travel_db" String tableName = "tbl_time_travel" try { @@ -54,8 +67,8 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d sql """switch `${catalog_name}`""" logger.info("switched to catalog " + catalog_name) sql """use ${db_name}""" - //system table snapshots to get create time. - List> snapshotRes = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id;""" + // Query system table snapshots to get creation time. Get the first four snapshots before schema change (snapshot IDs 1-4) + List> snapshotRes = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id limit 4;""" logger.info("Query result from ${tableName}\$snapshots: ${snapshotRes}") assertTrue(snapshotRes.size()==4) assertTrue(snapshotRes[0].size()==2) @@ -68,14 +81,8 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d logger.info("Processing snapshot ${index + 1}: ID=${snapshotId}, commit_time=${commitTime}") try { - LocalDateTime dateTime; - if (commitTime.contains("T")){ - dateTime = LocalDateTime.parse(commitTime, iso_formatter) - }else { - dateTime = LocalDateTime.parse(commitTime, standard_formatter) - } - - String snapshotTime = dateTime.atZone(ZoneId.systemDefault()).format(standard_formatter); + LocalDateTime dateTime = LocalDateTime.parse(commitTime, unifiedFormatter) + String snapshotTime = dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter); long timestamp = dateTime.atZone(ZoneId.systemDefault()) .toInstant() .toEpochMilli() @@ -191,8 +198,8 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } } - - List> tagsResult = sql """ select snapshot_id,tag_name from ${tableName}\$tags order by snapshot_id;""" + // Get the previous 4 snapshot IDs and their corresponding tags + List> tagsResult = sql """ select snapshot_id,tag_name from ${tableName}\$tags order by snapshot_id limit 4;""" logger.info("Query result from ${tableName}\$tags: ${tagsResult}") assertTrue(tagsResult.size()==4) assertTrue(tagsResult[0].size()==2) @@ -239,13 +246,87 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } } + /** + * Test time travel queries on snapshots created after schema changes. + * + * Background: run09.sql adds a new column, creating a schema evolution + * at snapshot ID 4. This test verifies that time travel works correctly + * with snapshots (ID > 4) that use the updated schema. + */ + List> snapshotSchemaChangeAfterRes = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots where snapshot_id > 4 order by snapshot_id limit 2;""" + logger.info("Query result from ${tableName}\$snapshots after schema change: ${snapshotSchemaChangeAfterRes}") + + snapshotSchemaChangeAfterRes.eachWithIndex { snapshotRow, index -> + int snapshotId = snapshotRow[0] as int + try { + String baseQueryName = "qt_schema_change_snapshot_${snapshotId}" + + // Time travel by snapshot ID after schema change + "${baseQueryName}_version_count" """select count(*) from ${tableName} FOR VERSION AS OF ${snapshotId} ;""" + "${baseQueryName}_version" """select * from ${tableName} FOR VERSION AS OF ${snapshotId} order by order_id;""" + logger.info("Completed schema change queries for snapshot ${snapshotId}") + + } catch (Exception e) { + logger.error("Failed to process schema change snapshot ${snapshotId}: ${e.message}") + throw e + } + } + + + // Test time zone behavior with time travel queries + List> timeTravelZone = sql """ select snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id limit 1;""" + logger.info("Query result from ${tableName}\$snapshots: ${timeTravelZone}") + + String commitTime = timeTravelZone[0][1] as String + + LocalDateTime dateTime = LocalDateTime.parse(commitTime, unifiedFormatter) + String snapshotTime = dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter) + + try { + // Basic time string query + qt_time_zone_time_travel_basic """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + // Test with +08:00 timezone + sql """set force_jni_scanner=true; set time_zone='+08:00';""" + qt_time_zone_time_travel_plus08_jni_true """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + sql """set force_jni_scanner=false;""" + qt_time_zone_time_travel_plus08_jni_false """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + // Test with +06:00 timezone + sql """set force_jni_scanner=true; set time_zone='+06:00';""" + qt_time_zone_time_travel_plus06_jni_true """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + sql """set force_jni_scanner=false;""" + qt_time_zone_time_travel_plus06_jni_false """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + + // Test with +10:00 timezone - these should throw exceptions + sql """set force_jni_scanner=true; set time_zone='+10:00';""" + test { + sql """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + exception ("There is currently no snapshot earlier than or equal to timestamp") + } + + sql """set force_jni_scanner=false;""" + test { + sql """select * from ${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id""" + exception ("There is currently no snapshot earlier than or equal to timestamp") + } + + } finally { + sql """ unset variable time_zone; """ + sql """ set force_jni_scanner = false; """ + } + + + // Error handling tests test { sql """ select * from ${tableName}@branch('name'='not_exists_branch'); """ - exception "Branch 'not_exists_branch' does not exist" + exception "can't find branch: not_exists_branch" } test { sql """ select * from ${tableName}@branch(not_exists_branch); """ - exception "Branch 'not_exists_branch' does not exist" + exception "can't find branch: not_exists_branch" } test { sql """ select * from ${tableName}@tag('name'='not_exists_tag'); """ @@ -278,6 +359,4 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } finally { // sql """drop catalog if exists ${catalog_name}""" } -} - - +} \ No newline at end of file From b862ab6d43defa4505af35cee1fd8ca9a5280e93 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Sat, 13 Sep 2025 14:37:49 +0800 Subject: [PATCH 16/16] revert code --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index d8e572933806a7..c895c264c0dfc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -465,7 +465,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { sb.append(prefix).append("PaimonSplitStats: \n"); int size = splitStats.size(); if (size <= 4) { - for (PaimonScanNode.SplitStat splitStat : splitStats) { + for (SplitStat splitStat : splitStats) { sb.append(String.format("%s %s\n", prefix, splitStat)); } } else {