diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run01.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run01.sql index 11532b8b66c16b..1f1c750e841dda 100644 --- a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run01.sql +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run01.sql @@ -76,4 +76,12 @@ CREATE TABLE date_partition ( 'file.format'='orc' ); -insert into date_partition values(1,date '2020-01-01'); \ No newline at end of file +insert into date_partition values(1,date '2020-01-01'); + +drop table if exists test_schema_change; +create table test_schema_change; +alter table test_schema_change add column id int; +insert into test_schema_change values(1); +CALL sys.create_tag(table => 'test_schema_change', tag => 'tag1', snapshot => 1); +CALL sys.create_branch('test_schema_change', 'branch1', 'tag1'); +alter table test_schema_change add column name string; \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index 8d019be68b75fc..d4a5c51a50eef9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -292,6 +292,18 @@ public static long timeStringToLong(String timeStr, TimeZone timeZone) { return d.atZone(timeZone.toZoneId()).toInstant().toEpochMilli(); } + public static long msTimeStringToLong(String timeStr, TimeZone timeZone) { + DateTimeFormatter dateFormatTimeZone = getDatetimeMsFormatWithTimeZone(); + dateFormatTimeZone.withZone(timeZone.toZoneId()); + LocalDateTime d; + try { + d = LocalDateTime.parse(timeStr, dateFormatTimeZone); + } catch (DateTimeParseException e) { + return -1; + } + return d.atZone(timeZone.toZoneId()).toInstant().toEpochMilli(); + } + // Check if the time zone_value is valid public static String checkTimeZoneValidAndStandardize(String value) throws DdlException { Function standardizeValue = s -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index a80218e8bb86da..013aab4f479177 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.DataTable; @@ -98,10 +99,28 @@ public Table getPaimonTable(Optional snapshot) { return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable(); } - private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional tableSnapshot, + Optional scanParams) { makeSureInitialized(); - return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSnapshot(this); + if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) { + // If a snapshot is specified, + // use the specified snapshot and the corresponding schema(not the latest + // schema). + try { + Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams); + return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY, + new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable)); + } catch (Exception e) { + LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e); + throw new RuntimeException( + "Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()), + e); + } + } else { + // Otherwise, use the latest snapshot and the latest schema. + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(this); + } } @Override @@ -212,7 +231,8 @@ public MTMVSnapshotIf getTableSnapshot(Optional snapshot) throws A @Override public long getNewestUpdateVersionOrTime() { - return getPaimonSnapshotCacheValue().getPartitionInfo().getNameToPartition().values().stream() + return getPaimonSnapshotCacheValue(Optional.empty(), Optional.empty()).getPartitionInfo().getNameToPartition() + .values().stream() .mapToLong(Partition::lastFileCreationTime).max().orElse(0); } @@ -228,7 +248,7 @@ public boolean isPartitionColumnAllowNull() { @Override public MvccSnapshot loadSnapshot(Optional tableSnapshot, Optional scanParams) { - return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); + return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue(tableSnapshot, scanParams)); } @Override @@ -289,7 +309,7 @@ private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional optionalSnapshot = table.latestSnapshot(); if (optionalSnapshot.isPresent()) { latestSnapshotId = optionalSnapshot.get().id(); - latestSchemaId = table.snapshot(latestSnapshotId).schemaId(); snapshotTable = table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString())); } + DataTable dataTable = (DataTable) table; + long latestSchemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L); return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 877923932940bc..fac655928ffc57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hive.HiveUtil; import org.apache.doris.datasource.paimon.source.PaimonSource; @@ -49,6 +50,7 @@ import org.apache.logging.log4j.Logger; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.StartupMode; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; @@ -58,9 +60,11 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.CharType; import org.apache.paimon.types.DataField; @@ -75,8 +79,10 @@ import org.apache.paimon.utils.Projection; import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.DateTimeException; import java.time.LocalDate; import java.time.LocalTime; import java.time.ZoneId; @@ -87,6 +93,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -656,6 +663,75 @@ public static Table getTableByTag(Table baseTable, String tagName) throws UserEx return baseTable.copy(options); } + // get snapshot info from query like 'for version/time as of' or '@tag' + public static Snapshot getPaimonSnapshot(Table table, Optional querySnapshot, + Optional scanParams) throws UserException { + Preconditions.checkArgument(querySnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag()), + "should spec version or time or tag"); + Preconditions.checkArgument(!(querySnapshot.isPresent() && scanParams.isPresent()), + "should not spec both snapshot and scan params"); + + DataTable dataTable = (DataTable) table; + if (querySnapshot.isPresent()) { + return getPaimonSnapshotByTableSnapshot(dataTable, querySnapshot.get()); + } else if (scanParams.isPresent() && scanParams.get().isTag()) { + return getPaimonSnapshotByTag(dataTable, extractBranchOrTagName(scanParams.get())); + } else { + throw new UserException("should spec version or time or tag"); + } + } + + private static Snapshot getPaimonSnapshotByTableSnapshot(DataTable table, TableSnapshot tableSnapshot) + throws UserException { + final String value = tableSnapshot.getValue(); + final TableSnapshot.VersionType type = tableSnapshot.getType(); + final boolean isDigital = DIGITAL_REGEX.matcher(value).matches(); + + switch (type) { + case TIME: + return getPaimonSnapshotByTimestamp(table, value, isDigital); + case VERSION: + return isDigital ? getPaimonSnapshotBySnapshotId(table, value) : getPaimonSnapshotByTag(table, value); + default: + throw new UserException("Unsupported snapshot type: " + type); + } + } + + private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String timestamp, boolean isDigital) + throws UserException { + long timestampMillis = 0; + if (isDigital) { + timestampMillis = Long.parseLong(timestamp); + } else { + timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone()); + if (timestampMillis < 0) { + throw new DateTimeException("can't parse time: " + timestamp); + } + } + Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis); + if (snapshot == null) { + throw new UserException("can't find snapshot older than : " + timestamp); + } + return snapshot; + } + + private static Snapshot getPaimonSnapshotBySnapshotId(DataTable table, String snapshotString) + throws UserException { + long snapshotId = Long.parseLong(snapshotString); + try { + Snapshot snapshot = table.snapshotManager().tryGetSnapshot(snapshotId); + return snapshot; + } catch (FileNotFoundException e) { + throw new UserException("can't find snapshot by id: " + snapshotId, e); + } + } + + private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName) + throws UserException { + Optional tag = table.tagManager().get(tagName); + return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName)); + } + /** * Creates a map of conflicting Paimon options with null values for exclusion. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 37a840f34c89a5..bccecd30d7693b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -49,7 +49,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.TableSchema; @@ -384,20 +383,16 @@ public Map getIncrReadParams() throws UserException { @VisibleForTesting public List getPaimonSplitFromAPI() throws UserException { - if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) { - // an empty table in PaimonSnapshotCacheValue - return Collections.emptyList(); - } + Table paimonTable = getProcessedTable(); int[] projected = desc.getSlots().stream().mapToInt( - slot -> source.getPaimonTable().rowType() - .getFieldNames() - .stream() - .map(String::toLowerCase) - .collect(Collectors.toList()) - .indexOf(slot.getColumn().getName())) + slot -> paimonTable.rowType() + .getFieldNames() + .stream() + .map(String::toLowerCase) + .collect(Collectors.toList()) + .indexOf(slot.getColumn().getName())) + .filter(i -> i >= 0) .toArray(); - - Table paimonTable = getProcessedTable(); ReadBuilder readBuilder = paimonTable.newReadBuilder(); return readBuilder.withFilter(predicates) .withProjection(projected) @@ -713,5 +708,3 @@ private Table getProcessedTable() throws UserException { return baseTable; } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 4e934509bc85cf..4ac6ba67132c35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -468,7 +468,7 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException { } if (tableIf instanceof MvccTable) { MvccTable mvccTable = (MvccTable) tableIf; - MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty(), null); + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty(), Optional.empty()); snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); } } diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_branch_tag.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_branch_tag.out new file mode 100644 index 00000000000000..97e995bc6dcac3 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_branch_tag.out @@ -0,0 +1,30 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !desc_schema -- +id int Yes true \N +name text Yes true \N + +-- !select_table -- +1 \N +2 \N +3 \N + +-- !select_branch -- +1 \N +2 \N +3 \N + +-- !select_branch2 -- +1 \N +2 \N +3 \N + +-- !select_tag -- +1 +2 +3 + +-- !select_tag2 -- +1 +2 +3 + diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out b/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out index fed740a880378d..90b56b63a3dcca 100644 --- a/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out +++ b/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out @@ -230,3 +230,22 @@ new_col1 array Yes true \N -- !count_4 -- 3 +-- !desc_latest_schema -- +id int Yes true \N +name varchar(2147483647) Yes true \N + +-- !query_latest_schema -- +1 \N + +-- !time_travel_schema_branch -- +1 \N + +-- !time_travel_schema_tag -- +1 + +-- !time_travel_schema_tag2 -- +1 + +-- !time_travel_schema_snapshot -- +1 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_branch_tag.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_branch_tag.groovy new file mode 100644 index 00000000000000..470499c8f4b1cc --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_branch_tag.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + + +suite("iceberg_schema_change_with_branch_tag", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_schema_change_with_branch_tag" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + // init table test_schema_change_with_branch_tag + sql """ drop table if exists test_schema_change_with_branch_tag; """ + sql """ create table test_schema_change_with_branch_tag (id int); """ + sql """ insert into test_schema_change_with_branch_tag values (1), (2), (3); """ + // create branch and tag + sql """ alter table test_schema_change_with_branch_tag create branch test_branch; """ + sql """ alter table test_schema_change_with_branch_tag create tag test_tag; """ + // schema change but no insert data, no snaptshot + sql """ alter table test_schema_change_with_branch_tag add column name string; """ + + // this should get latest schema + qt_desc_schema """ desc test_schema_change_with_branch_tag; """ + qt_select_table """ select * from test_schema_change_with_branch_tag order by id; """ + + qt_select_branch """ select * from test_schema_change_with_branch_tag FOR VERSION AS OF 'test_branch' """ + qt_select_branch2 """ select * from test_schema_change_with_branch_tag@branch(test_branch) """ + qt_select_tag """ select * from test_schema_change_with_branch_tag FOR VERSION AS OF 'test_tag' """ + qt_select_tag2 """ select * from test_schema_change_with_branch_tag@tag(test_tag) """ +} diff --git a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy index bfc2457c6a3e27..08a271e054bd39 100644 --- a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy +++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy @@ -94,7 +94,7 @@ suite("test_paimon_incr_read", "p0,external,doris,external_docker,external_docke } test { sql """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2) for version as of 1""" - exception "Can not specify scan params and table snapshot" + exception "should not spec both snapshot and scan params" } } diff --git a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy index b6d3caddb8c4c4..1432b75f49f9ca 100644 --- a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy +++ b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy @@ -249,15 +249,15 @@ suite("paimon_time_travel", "p0,external,doris,external_docker,external_docker_d } test { sql """ select * from ${tableName}@tag('name'='not_exists_tag'); """ - exception "Tag 'not_exists_tag' does not exist" + exception "can't find snapshot by tag: not_exists_tag" } test { sql """ select * from ${tableName}@tag(not_exists_tag); """ - exception "Tag 'not_exists_tag' does not exist" + exception "can't find snapshot by tag: not_exists_tag" } test { sql """ select * from ${tableName} for version as of 'not_exists_tag'; """ - exception "Tag 'not_exists_tag' does not exist" + exception "can't find snapshot by tag: not_exists_tag" } // Use branch function to query tags diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy index 2e9f9790a28583..04a7200d6c272b 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy @@ -81,7 +81,15 @@ suite("test_paimon_schema_change", "p0,external,doris,external_docker,external_d qt_count_3 """ select count(*) from sc_parquet;""" qt_count_4 """ select count(*) from sc_orc;""" - + // should get latest schema + qt_desc_latest_schema """ desc test_paimon_spark.test_schema_change; """ + qt_query_latest_schema """ SELECT * FROM test_paimon_spark.test_schema_change; """ + // shoudle get latest schme for branch + qt_time_travel_schema_branch """ select * from test_paimon_spark.test_schema_change@branch(branch1); """ + // should get the schema in tag + qt_time_travel_schema_tag """ select * from test_paimon_spark.test_schema_change for version as of 'tag1'; """ + qt_time_travel_schema_tag2 """ select * from test_paimon_spark.test_schema_change@tag(tag1); """ + qt_time_travel_schema_snapshot """ select * from test_paimon_spark.test_schema_change for version as of '1'; """ } }