From 5394f238bd3fb4d31b383ca11ffee411b946fce6 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 29 Apr 2025 13:42:14 +0800 Subject: [PATCH] [fix](iceberg)Use the correct schema for query (#50376) Followup #49956 Problem Summary: When a snapshot is specified in the query, the corresponding schema should be used for parsing, otherwise the latest snapshot should be used for parsing. 1. When using the HMS type, you also need to initialize the executor pool. 2. Set the size of the thread pool to be equal to the number of cores of the current machine. 3. When no snapshot is specified, the latest schema is used. 4. When specifying a snapshot, you need to use the schema corresponding to the snapshot. 5. When generating a scannode, save the schema information and no longer obtain it from the cache to prevent the cache from being refreshed. 6. When refreshing the schema, you need to refresh all schemas of related tables. --- .../iceberg/run09.sql | 37 ++ .../org/apache/doris/catalog/TableIf.java | 11 - .../doris/datasource/ExternalCatalog.java | 2 + .../doris/datasource/ExternalSchemaCache.java | 8 +- .../doris/datasource/FileQueryScanNode.java | 9 +- .../apache/doris/datasource/FileScanNode.java | 2 +- .../doris/datasource/hive/HMSDlaTable.java | 9 + .../datasource/hive/HMSExternalCatalog.java | 6 + .../datasource/hive/HMSExternalTable.java | 16 +- .../datasource/hive/IcebergDlaTable.java | 147 +++++++ .../iceberg/IcebergExternalCatalog.java | 1 - .../iceberg/IcebergExternalTable.java | 352 +--------------- .../iceberg/IcebergMetadataCache.java | 17 +- .../iceberg/IcebergPartitionInfo.java | 6 + .../datasource/iceberg/IcebergUtils.java | 391 ++++++++++++++++-- .../iceberg/source/IcebergScanNode.java | 14 +- .../org/apache/doris/planner/ScanNode.java | 10 +- .../iceberg/IcebergExternalTableTest.java | 114 +++++ ..._iceberg_schema_change_with_timetravel.out | 77 ++++ ...eberg_schema_change_with_timetravel.groovy | 87 ++++ 20 files changed, 921 insertions(+), 395 deletions(-) create mode 100644 docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java create mode 100644 regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out create mode 100644 regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql new file mode 100644 index 00000000000000..c5795883c4defd --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql @@ -0,0 +1,37 @@ +use demo.test_db; + +create table schema_change_with_time_travel (c1 int); +insert into schema_change_with_time_travel values (1); + +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (2,3); + +alter table schema_change_with_time_travel add column c3 int; +insert into schema_change_with_time_travel values (4,5,6); + +alter table schema_change_with_time_travel drop column c2; +insert into schema_change_with_time_travel values (7,8); + +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (9,10,11); + +alter table schema_change_with_time_travel add column c4 int; + + +create table schema_change_with_time_travel_orc (c1 int) tblproperties ("write.format.default"="orc"); +insert into schema_change_with_time_travel_orc values (1); + +alter table schema_change_with_time_travel_orc add column c2 int; +insert into schema_change_with_time_travel_orc values (2,3); + +alter table schema_change_with_time_travel_orc add column c3 int; +insert into schema_change_with_time_travel_orc values (4,5,6); + +alter table schema_change_with_time_travel_orc drop column c2; +insert into schema_change_with_time_travel_orc values (7,8); + +alter table schema_change_with_time_travel_orc add column c2 int; +insert into schema_change_with_time_travel_orc values (9,10,11); + +alter table schema_change_with_time_travel_orc add column c4 int; + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 78fbc77698d34f..7d95d959ba618d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -143,17 +143,6 @@ default List getBaseSchemaOrEmpty() { Column getColumn(String name); - default int getBaseColumnIdxByName(String colName) { - int i = 0; - for (Column col : getBaseSchema()) { - if (col.getName().equalsIgnoreCase(colName)) { - return i; - } - ++i; - } - return -1; - } - String getMysqlType(); String getEngine(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 25b74bb6eff2a2..5c2857cc0b61f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -133,6 +133,8 @@ public abstract class ExternalCatalog CREATE_TIME, USE_META_CACHE); + protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index 0fb66c4e3f9e45..da9a25d71be7ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -100,11 +100,9 @@ public void addSchemaForTest(String dbName, String tblName, ImmutableList key.dbName.equals(dbName) && key.tblName.equals(tblName)) + .forEach(schemaCache::invalidate); } public void invalidateDbCache(String dbName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 56fecb24afcef3..fc1058105f102a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -252,7 +252,14 @@ private void setColumnPositionMapping() } SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId()); String colName = slotDesc.getColumn().getName(); - int idx = tbl.getBaseColumnIdxByName(colName); + int idx = -1; + List columns = getColumns(); + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(colName)) { + idx = i; + break; + } + } if (idx == -1) { throw new UserException("Column " + colName + " not found in table " + tbl.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 8d3aeaa6a267a0..d2c6230ba6bba3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -193,7 +193,7 @@ protected void setDefaultValueExprs(TableIf tbl, TExpr tExpr = new TExpr(); tExpr.setNodes(Lists.newArrayList()); - for (Column column : tbl.getBaseSchema()) { + for (Column column : getColumns()) { Expr expr; if (column.getDefaultValue() != null) { if (column.getDefaultValueExprDef() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java index 7894279b2950ac..034ce1ae443644 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java @@ -73,4 +73,13 @@ abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional getFullSchema() { if (getDlaType() == DLAType.HUDI) { return ((HudiDlaTable) dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)) .getSchema(); + } else if (getDlaType() == DLAType.ICEBERG) { + return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName()); } Optional schemaCacheValue = cache.getSchemaValue(dbName, name); return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null); @@ -1047,8 +1050,12 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { public MvccSnapshot loadSnapshot(Optional tableSnapshot) { if (getDlaType() == DLAType.HUDI) { return new HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this)); + } else if (getDlaType() == DLAType.ICEBERG) { + return new IcebergMvccSnapshot( + IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, getCatalog(), getDbName(), getName())); + } else { + return new EmptyMvccSnapshot(); } - return new EmptyMvccSnapshot(); } public boolean firstColumnIsString() { @@ -1084,4 +1091,9 @@ public List getSupportedSysTables() { return Lists.newArrayList(); } } + + public boolean isValidRelatedTable() { + makeSureInitialized(); + return dlaTable.isValidRelatedTable(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java new file mode 100644 index 00000000000000..36b871282a97f7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue; +import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue; +import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot; +import org.apache.doris.mtmv.MTMVSnapshotIf; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class IcebergDlaTable extends HMSDlaTable { + + private boolean isValidRelatedTableCached = false; + private boolean isValidRelatedTable = false; + + public IcebergDlaTable(HMSExternalTable table) { + super(table); + } + + @Override + public Map getAndCopyPartitionItems(Optional snapshot) { + return Maps.newHashMap( + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()) + .getPartitionInfo().getNameToPartitionItem()); + } + + @Override + public PartitionType getPartitionType(Optional snapshot) { + return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames(Optional snapshot) { + return getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns(Optional snapshot) { + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue( + hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName(), + snapshotValue.getSnapshot().getSchemaId()); + return schemaValue.getPartitionColumns(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); + if (latestSnapshotId <= 0) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVSnapshotIdSnapshot(latestSnapshotId); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException { + hmsTable.makeSureInitialized(); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId()); + } + + @Override + boolean isPartitionColumnAllowNull() { + return true; + } + + @Override + protected boolean isValidRelatedTable() { + if (isValidRelatedTableCached) { + return isValidRelatedTable; + } + isValidRelatedTable = false; + Set allFields = Sets.newHashSet(); + Table table = IcebergUtils.getIcebergTable( + hmsTable.getCatalog(), + hmsTable.getDbName(), + hmsTable.getName() + ); + for (PartitionSpec spec : table.specs().values()) { + if (spec == null) { + isValidRelatedTableCached = true; + return false; + } + List fields = spec.fields(); + if (fields.size() != 1) { + isValidRelatedTableCached = true; + return false; + } + PartitionField partitionField = spec.fields().get(0); + String transformName = partitionField.transform().toString(); + if (!IcebergUtils.YEAR.equals(transformName) + && !IcebergUtils.MONTH.equals(transformName) + && !IcebergUtils.DAY.equals(transformName) + && !IcebergUtils.HOUR.equals(transformName)) { + isValidRelatedTableCached = true; + return false; + } + allFields.add(table.schema().findColumnName(partitionField.sourceId())); + } + isValidRelatedTableCached = true; + isValidRelatedTable = allFields.size() == 1; + return isValidRelatedTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index b695a268b0f649..411493700975b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -45,7 +45,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; - private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16; public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index e9bffbf56e3623..6990e8a7449c6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -17,25 +17,18 @@ package org.apache.doris.datasource.iceberg; -import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; -import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccTable; -import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.systable.SupportedSysTables; import org.apache.doris.datasource.systable.SysTable; import org.apache.doris.mtmv.MTMVBaseTableIf; @@ -52,32 +45,12 @@ import org.apache.doris.thrift.TTableType; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Range; import com.google.common.collect.Sets; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionsTable; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructProjection; - -import java.io.IOException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.Month; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Comparator; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -87,15 +60,7 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { - public static final String YEAR = "year"; - public static final String MONTH = "month"; - public static final String DAY = "day"; - public static final String HOUR = "hour"; - public static final String IDENTITY = "identity"; - public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec - private Table table; - private List partitionColumns; private boolean isValidRelatedTableCached = false; private boolean isValidRelatedTable = false; @@ -120,29 +85,9 @@ public void setTable(Table table) { this.table = table; } - @VisibleForTesting - public void setPartitionColumns(List partitionColumns) { - this.partitionColumns = partitionColumns; - } - @Override public Optional initSchema(SchemaCacheKey key) { - table = getIcebergTable(); - List schema = IcebergUtils.getSchema(catalog, dbName, name, - ((IcebergSchemaCacheKey) key).getSchemaId()); - List tmpColumns = Lists.newArrayList(); - PartitionSpec spec = table.spec(); - for (PartitionField field : spec.fields()) { - Types.NestedField col = table.schema().findField(field.sourceId()); - for (Column c : schema) { - if (c.getName().equalsIgnoreCase(col.name())) { - tmpColumns.add(c); - break; - } - } - } - partitionColumns = tmpColumns; - return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns)); + return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, ((IcebergSchemaCacheKey) key).getSchemaId()); } @Override @@ -180,23 +125,21 @@ public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } - private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() { - return Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() - .getSnapshotCache(catalog, dbName, name); - } - @Override public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } @Override public Map getAndCopyPartitionItems(Optional snapshot) { - return Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem()); + return Maps.newHashMap( + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()) + .getPartitionInfo().getNameToPartitionItem()); } @Override public Map getNameToPartitionItems(Optional snapshot) { - return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()) + .getPartitionInfo().getNameToPartitionItem(); } @Override @@ -211,15 +154,18 @@ public Set getPartitionColumnNames(Optional snapshot) thro @Override public List getPartitionColumns(Optional snapshot) { - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); - IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId()); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); + IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue( + catalog, getDbName(), getName(), snapshotValue.getSnapshot().getSchemaId()); return schemaValue.getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); if (latestSnapshotId <= 0) { throw new AnalysisException("can not find partition: " + partitionName); @@ -231,7 +177,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { makeSureInitialized(); - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId()); } @@ -266,10 +213,10 @@ public boolean isValidRelatedTable() { } PartitionField partitionField = spec.fields().get(0); String transformName = partitionField.transform().toString(); - if (!YEAR.equals(transformName) - && !MONTH.equals(transformName) - && !DAY.equals(transformName) - && !HOUR.equals(transformName)) { + if (!IcebergUtils.YEAR.equals(transformName) + && !IcebergUtils.MONTH.equals(transformName) + && !IcebergUtils.DAY.equals(transformName) + && !IcebergUtils.HOUR.equals(transformName)) { isValidRelatedTableCached = true; return false; } @@ -282,27 +229,13 @@ public boolean isValidRelatedTable() { @Override public MvccSnapshot loadSnapshot(Optional tableSnapshot) { - return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue()); - } - - public long getLatestSnapshotId() { - table = getIcebergTable(); - Snapshot snapshot = table.currentSnapshot(); - return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : table.currentSnapshot().snapshotId(); - } - - public long getSchemaId(long snapshotId) { - table = getIcebergTable(); - return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID - ? IcebergUtils.UNKNOWN_SNAPSHOT_ID - : table.snapshot(snapshotId).schemaId(); + return new IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue( + tableSnapshot, getCatalog(), getDbName(), getName())); } @Override public List getFullSchema() { - Optional snapshotFromContext = MvccUtil.getSnapshotFromContext(this); - IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue(snapshotFromContext); - return getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema(); + return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName()); } @Override @@ -310,239 +243,6 @@ public boolean supportInternalPartitionPruned() { return true; } - public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) { - ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); - Optional schemaCacheValue = cache.getSchemaValue( - new IcebergSchemaCacheKey(dbName, name, schemaId)); - if (!schemaCacheValue.isPresent()) { - throw new CacheException("failed to getSchema for: %s.%s.%s.%s", - null, catalog.getName(), dbName, name, schemaId); - } - return (IcebergSchemaCacheValue) schemaCacheValue.get(); - } - - public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws AnalysisException { - // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. - if (!isValidRelatedTable() || snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { - return new IcebergPartitionInfo(); - } - List icebergPartitions = loadIcebergPartition(snapshotId); - Map nameToPartition = Maps.newHashMap(); - Map nameToPartitionItem = Maps.newHashMap(); - table = getIcebergTable(); - partitionColumns = getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns(); - for (IcebergPartition partition : icebergPartitions) { - nameToPartition.put(partition.getPartitionName(), partition); - String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); - Range partitionRange = getPartitionRange( - partition.getPartitionValues().get(0), transform, partitionColumns); - PartitionItem item = new RangePartitionItem(partitionRange); - nameToPartitionItem.put(partition.getPartitionName(), item); - } - Map> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); - return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); - } - - public List loadIcebergPartition(long snapshotId) { - PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils - .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); - List partitions = Lists.newArrayList(); - try (CloseableIterable tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { - for (FileScanTask task : tasks) { - CloseableIterable rows = task.asDataTask().rows(); - for (StructLike row : rows) { - partitions.add(generateIcebergPartition(row)); - } - } - } catch (IOException e) { - LOG.warn("Failed to get Iceberg table {} partition info.", name, e); - } - return partitions; - } - - public IcebergPartition generateIcebergPartition(StructLike row) { - // row format : - // 0. partitionData, - // 1. spec_id, - // 2. record_count, - // 3. file_count, - // 4. total_data_file_size_in_bytes, - // 5. position_delete_record_count, - // 6. position_delete_file_count, - // 7. equality_delete_record_count, - // 8. equality_delete_file_count, - // 9. last_updated_at, - // 10. last_updated_snapshot_id - table = getIcebergTable(); - Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); - int specId = row.get(1, Integer.class); - PartitionSpec partitionSpec = table.specs().get(specId); - StructProjection partitionData = row.get(0, StructProjection.class); - StringBuilder sb = new StringBuilder(); - List partitionValues = Lists.newArrayList(); - List transforms = Lists.newArrayList(); - for (int i = 0; i < partitionSpec.fields().size(); ++i) { - PartitionField partitionField = partitionSpec.fields().get(i); - Class fieldClass = partitionSpec.javaClasses()[i]; - int fieldId = partitionField.fieldId(); - // Iceberg partition field id starts at PARTITION_DATA_ID_START, - // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START - int index = fieldId - PARTITION_DATA_ID_START; - Object o = partitionData.get(index, fieldClass); - String fieldValue = o == null ? null : o.toString(); - String fieldName = partitionField.name(); - sb.append(fieldName); - sb.append("="); - sb.append(fieldValue); - sb.append("/"); - partitionValues.add(fieldValue); - transforms.add(partitionField.transform().toString()); - } - if (sb.length() > 0) { - sb.delete(sb.length() - 1, sb.length()); - } - String partitionName = sb.toString(); - long recordCount = row.get(2, Long.class); - long fileCount = row.get(3, Integer.class); - long fileSizeInBytes = row.get(4, Long.class); - long lastUpdateTime = row.get(9, Long.class); - long lastUpdateSnapShotId = row.get(10, Long.class); - return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, - lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); - } - - @VisibleForTesting - public Range getPartitionRange(String value, String transform, List partitionColumns) - throws AnalysisException { - // For NULL value, create a minimum partition for it. - if (value == null) { - PartitionKey nullLowKey = PartitionKey.createPartitionKey( - Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); - PartitionKey nullUpKey = nullLowKey.successor(); - return Range.closedOpen(nullLowKey, nullUpKey); - } - LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); - LocalDateTime target; - LocalDateTime lower; - LocalDateTime upper; - long longValue = Long.parseLong(value); - switch (transform) { - case HOUR: - target = epoch.plusHours(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), - target.getHour(), 0, 0); - upper = lower.plusHours(1); - break; - case DAY: - target = epoch.plusDays(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); - upper = lower.plusDays(1); - break; - case MONTH: - target = epoch.plusMonths(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); - upper = lower.plusMonths(1); - break; - case YEAR: - target = epoch.plusYears(longValue); - lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); - upper = lower.plusYears(1); - break; - default: - throw new RuntimeException("Unsupported transform " + transform); - } - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - Column c = partitionColumns.get(0); - Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); - if (c.getType().isDate() || c.getType().isDateV2()) { - formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - } - PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); - PartitionValue upperValue = new PartitionValue(upper.format(formatter)); - PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); - PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); - return Range.closedOpen(lowKey, upperKey); - } - - /** - * Merge overlapped iceberg partitions into one Doris partition. - */ - public Map> mergeOverlapPartitions(Map originPartitions) { - List> entries = sortPartitionMap(originPartitions); - Map> map = Maps.newHashMap(); - for (int i = 0; i < entries.size() - 1; i++) { - Range firstValue = entries.get(i).getValue().getItems(); - String firstKey = entries.get(i).getKey(); - Range secondValue = entries.get(i + 1).getValue().getItems(); - String secondKey = entries.get(i + 1).getKey(); - // If the first entry enclose the second one, remove the second entry and keep a record in the return map. - // So we can track the iceberg partitions those contained by one Doris partition. - while (i < entries.size() && firstValue.encloses(secondValue)) { - originPartitions.remove(secondKey); - map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); - String finalSecondKey = secondKey; - map.computeIfPresent(firstKey, (key, value) -> { - value.add(finalSecondKey); - return value; - }); - i++; - if (i >= entries.size() - 1) { - break; - } - secondValue = entries.get(i + 1).getValue().getItems(); - secondKey = entries.get(i + 1).getKey(); - } - } - return map; - } - - /** - * Sort the given map entries by PartitionItem Range(LOW, HIGH) - * When comparing two ranges, the one with smaller LOW value is smaller than the other one. - * If two ranges have same values of LOW, the one with larger HIGH value is smaller. - * - * For now, we only support year, month, day and hour, - * so it is impossible to have two partially intersect partitions. - * One range is either enclosed by another or has no intersection at all with another. - * - * - * For example, we have these 4 ranges: - * [10, 20), [30, 40), [0, 30), [10, 15) - * - * After sort, they become: - * [0, 30), [10, 20), [10, 15), [30, 40) - */ - public List> sortPartitionMap(Map originPartitions) { - List> entries = new ArrayList<>(originPartitions.entrySet()); - entries.sort(new RangeComparator()); - return entries; - } - - public static class RangeComparator implements Comparator> { - @Override - public int compare(Map.Entry p1, Map.Entry p2) { - PartitionItem value1 = p1.getValue(); - PartitionItem value2 = p2.getValue(); - if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { - Range items1 = value1.getItems(); - Range items2 = value2.getItems(); - if (!items1.hasLowerBound()) { - return -1; - } - if (!items2.hasLowerBound()) { - return 1; - } - PartitionKey upper1 = items1.upperEndpoint(); - PartitionKey lower1 = items1.lowerEndpoint(); - PartitionKey upper2 = items2.upperEndpoint(); - PartitionKey lower2 = items2.lowerEndpoint(); - int compareLow = lower1.compareTo(lower2); - return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; - } - return 0; - } - } - @VisibleForTesting public boolean isValidRelatedTableCached() { return isValidRelatedTableCached; @@ -557,14 +257,6 @@ public void setIsValidRelatedTableCached(boolean isCached) { this.isValidRelatedTableCached = isCached; } - public IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { - if (snapshot.isPresent()) { - return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); - } else { - return getIcebergSnapshotCacheValue(); - } - } - @Override public List getSupportedSysTables() { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index b776a8c3a472b3..f99b652b42dd16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -24,6 +24,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Iterables; @@ -123,12 +124,18 @@ private Table loadTable(IcebergMetadataCacheKey key) { @NotNull private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey key) throws AnalysisException { - IcebergExternalTable table = (IcebergExternalTable) key.catalog.getDbOrAnalysisException(key.dbName) + MTMVRelatedTableIf table = (MTMVRelatedTableIf) key.catalog.getDbOrAnalysisException(key.dbName) .getTableOrAnalysisException(key.tableName); - long snapshotId = table.getLatestSnapshotId(); - long schemaId = table.getSchemaId(snapshotId); - IcebergPartitionInfo icebergPartitionInfo = table.loadPartitionInfo(snapshotId); - return new IcebergSnapshotCacheValue(icebergPartitionInfo, new IcebergSnapshot(snapshotId, schemaId)); + IcebergSnapshot lastedIcebergSnapshot = IcebergUtils.getLastedIcebergSnapshot( + (ExternalCatalog) key.catalog, key.dbName, key.tableName); + IcebergPartitionInfo icebergPartitionInfo; + if (!table.isValidRelatedTable()) { + icebergPartitionInfo = IcebergPartitionInfo.empty(); + } else { + icebergPartitionInfo = IcebergUtils.loadPartitionInfo( + (ExternalCatalog) key.catalog, key.dbName, key.tableName, lastedIcebergSnapshot.getSnapshotId()); + } + return new IcebergSnapshotCacheValue(icebergPartitionInfo, lastedIcebergSnapshot); } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java index 9edb2137f4f389..bb46eb2122399d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java @@ -29,12 +29,18 @@ public class IcebergPartitionInfo { private final Map nameToIcebergPartition; private final Map> nameToIcebergPartitionNames; + private static final IcebergPartitionInfo EMPTY = new IcebergPartitionInfo(); + public IcebergPartitionInfo() { this.nameToPartitionItem = Maps.newHashMap(); this.nameToIcebergPartition = Maps.newHashMap(); this.nameToIcebergPartitionNames = Maps.newHashMap(); } + static IcebergPartitionInfo empty() { + return EMPTY; + } + public IcebergPartitionInfo(Map nameToPartitionItem, Map nameToIcebergPartition, Map> nameToIcebergPartitionNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index f61c226ed752d3..ab91d9c3d90d59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -31,39 +31,58 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionsTable; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.And; @@ -79,14 +98,25 @@ import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.StructProjection; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; +import java.time.DateTimeException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.Month; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -117,7 +147,15 @@ public Integer initialValue() { // nickname in spark public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; - public static final long UNKNOWN_SNAPSHOT_ID = -1; + public static final long UNKNOWN_SNAPSHOT_ID = -1; // means an empty table + public static final long NEWEST_SCHEMA_ID = -1; + + public static final String YEAR = "year"; + public static final String MONTH = "month"; + public static final String DAY = "day"; + public static final String HOUR = "hour"; + public static final String IDENTITY = "identity"; + public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { @@ -582,10 +620,6 @@ private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog : metadataCache.getIcebergTable(catalog, dbName, tblName); } - public static List getSchema(ExternalCatalog catalog, String dbName, String name) { - return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID); - } - /** * Get iceberg schema from catalog and convert them to doris schema */ @@ -594,7 +628,7 @@ public static List getSchema(ExternalCatalog catalog, String dbName, Str return catalog.getPreExecutionAuthenticator().execute(() -> { org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); Schema schema; - if (schemaId == UNKNOWN_SNAPSHOT_ID || icebergTable.currentSnapshot() == null) { + if (schemaId == NEWEST_SCHEMA_ID || icebergTable.currentSnapshot() == null) { schema = icebergTable.schema(); } else { schema = icebergTable.schemas().get((int) schemaId); @@ -725,25 +759,6 @@ public static HiveCatalog createIcebergHiveCatalog(ExternalCatalog externalCatal return hiveCatalog; } - // load table schema from iceberg API to external schema cache. - public static Optional loadSchemaCacheValue( - ExternalCatalog catalog, String dbName, String tbName, long schemaId) { - Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); - List schema = IcebergUtils.getSchema(catalog, dbName, tbName, schemaId); - List tmpColumns = Lists.newArrayList(); - PartitionSpec spec = table.spec(); - for (PartitionField field : spec.fields()) { - Types.NestedField col = table.schema().findField(field.sourceId()); - for (Column c : schema) { - if (c.getName().equalsIgnoreCase(col.name())) { - tmpColumns.add(c); - break; - } - } - } - return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns)); - } - // Retrieve the manifest files that match the query based on partitions in filter public static CloseableIterable getMatchingManifest( List dataManifests, @@ -772,4 +787,330 @@ public static CloseableIterable getMatchingManifest( return matchingManifests; } + + // get snapshot id from query like 'for version/time as of' + public static long getQuerySpecSnapshot(Table table, TableSnapshot queryTableSnapshot) { + TableSnapshot.VersionType type = queryTableSnapshot.getType(); + if (type == TableSnapshot.VersionType.VERSION) { + return queryTableSnapshot.getVersion(); + } else { + long timestamp = TimeUtils.timeStringToLong(queryTableSnapshot.getTime(), TimeUtils.getTimeZone()); + if (timestamp < 0) { + throw new DateTimeException("can't parse time: " + queryTableSnapshot.getTime()); + } + return SnapshotUtil.snapshotIdAsOfTime(table, timestamp); + } + } + + // read schema from external schema cache + public static IcebergSchemaCacheValue getSchemaCacheValue( + ExternalCatalog catalog, String dbName, String name, long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new IcebergSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); + } + return (IcebergSchemaCacheValue) schemaCacheValue.get(); + } + + public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog catalog, String dbName, String tbName) { + Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); + Snapshot snapshot = table.currentSnapshot(); + long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : snapshot.snapshotId(); + return new IcebergSnapshot(snapshotId, table.schema().schemaId()); + } + + public static IcebergPartitionInfo loadPartitionInfo( + ExternalCatalog catalog, String dbName, String tbName, long snapshotId) throws AnalysisException { + // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. + if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { + return IcebergPartitionInfo.empty(); + } + Table table = getIcebergTable(catalog, dbName, tbName); + List icebergPartitions = loadIcebergPartition(table, snapshotId); + Map nameToPartition = Maps.newHashMap(); + Map nameToPartitionItem = Maps.newHashMap(); + + List partitionColumns = IcebergUtils.getSchemaCacheValue( + catalog, dbName, tbName, table.snapshot(snapshotId).schemaId()).getPartitionColumns(); + for (IcebergPartition partition : icebergPartitions) { + nameToPartition.put(partition.getPartitionName(), partition); + String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); + Range partitionRange = getPartitionRange( + partition.getPartitionValues().get(0), transform, partitionColumns); + PartitionItem item = new RangePartitionItem(partitionRange); + nameToPartitionItem.put(partition.getPartitionName(), item); + } + Map> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); + return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); + } + + private static List loadIcebergPartition(Table table, long snapshotId) { + PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils + .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); + List partitions = Lists.newArrayList(); + try (CloseableIterable tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { + for (FileScanTask task : tasks) { + CloseableIterable rows = task.asDataTask().rows(); + for (StructLike row : rows) { + partitions.add(generateIcebergPartition(table, row)); + } + } + } catch (IOException e) { + LOG.warn("Failed to get Iceberg table {} partition info.", table.name(), e); + } + return partitions; + } + + private static IcebergPartition generateIcebergPartition(Table table, StructLike row) { + // row format : + // 0. partitionData, + // 1. spec_id, + // 2. record_count, + // 3. file_count, + // 4. total_data_file_size_in_bytes, + // 5. position_delete_record_count, + // 6. position_delete_file_count, + // 7. equality_delete_record_count, + // 8. equality_delete_file_count, + // 9. last_updated_at, + // 10. last_updated_snapshot_id + Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); + int specId = row.get(1, Integer.class); + PartitionSpec partitionSpec = table.specs().get(specId); + StructProjection partitionData = row.get(0, StructProjection.class); + StringBuilder sb = new StringBuilder(); + List partitionValues = Lists.newArrayList(); + List transforms = Lists.newArrayList(); + for (int i = 0; i < partitionSpec.fields().size(); ++i) { + PartitionField partitionField = partitionSpec.fields().get(i); + Class fieldClass = partitionSpec.javaClasses()[i]; + int fieldId = partitionField.fieldId(); + // Iceberg partition field id starts at PARTITION_DATA_ID_START, + // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START + int index = fieldId - PARTITION_DATA_ID_START; + Object o = partitionData.get(index, fieldClass); + String fieldValue = o == null ? null : o.toString(); + String fieldName = partitionField.name(); + sb.append(fieldName); + sb.append("="); + sb.append(fieldValue); + sb.append("/"); + partitionValues.add(fieldValue); + transforms.add(partitionField.transform().toString()); + } + if (sb.length() > 0) { + sb.delete(sb.length() - 1, sb.length()); + } + String partitionName = sb.toString(); + long recordCount = row.get(2, Long.class); + long fileCount = row.get(3, Integer.class); + long fileSizeInBytes = row.get(4, Long.class); + long lastUpdateTime = row.get(9, Long.class); + long lastUpdateSnapShotId = row.get(10, Long.class); + return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, + lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); + } + + @VisibleForTesting + public static Range getPartitionRange(String value, String transform, List partitionColumns) + throws AnalysisException { + // For NULL value, create a minimum partition for it. + if (value == null) { + PartitionKey nullLowKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); + PartitionKey nullUpKey = nullLowKey.successor(); + return Range.closedOpen(nullLowKey, nullUpKey); + } + LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); + LocalDateTime target; + LocalDateTime lower; + LocalDateTime upper; + long longValue = Long.parseLong(value); + switch (transform) { + case HOUR: + target = epoch.plusHours(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), + target.getHour(), 0, 0); + upper = lower.plusHours(1); + break; + case DAY: + target = epoch.plusDays(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); + upper = lower.plusDays(1); + break; + case MONTH: + target = epoch.plusMonths(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); + upper = lower.plusMonths(1); + break; + case YEAR: + target = epoch.plusYears(longValue); + lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); + upper = lower.plusYears(1); + break; + default: + throw new RuntimeException("Unsupported transform " + transform); + } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + Column c = partitionColumns.get(0); + Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); + if (c.getType().isDate() || c.getType().isDateV2()) { + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + } + PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); + PartitionValue upperValue = new PartitionValue(upper.format(formatter)); + PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); + PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); + return Range.closedOpen(lowKey, upperKey); + } + + /** + * Merge overlapped iceberg partitions into one Doris partition. + */ + @VisibleForTesting + public static Map> mergeOverlapPartitions(Map originPartitions) { + List> entries = sortPartitionMap(originPartitions); + Map> map = Maps.newHashMap(); + for (int i = 0; i < entries.size() - 1; i++) { + Range firstValue = entries.get(i).getValue().getItems(); + String firstKey = entries.get(i).getKey(); + Range secondValue = entries.get(i + 1).getValue().getItems(); + String secondKey = entries.get(i + 1).getKey(); + // If the first entry enclose the second one, remove the second entry and keep a record in the return map. + // So we can track the iceberg partitions those contained by one Doris partition. + while (i < entries.size() && firstValue.encloses(secondValue)) { + originPartitions.remove(secondKey); + map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); + String finalSecondKey = secondKey; + map.computeIfPresent(firstKey, (key, value) -> { + value.add(finalSecondKey); + return value; + }); + i++; + if (i >= entries.size() - 1) { + break; + } + secondValue = entries.get(i + 1).getValue().getItems(); + secondKey = entries.get(i + 1).getKey(); + } + } + return map; + } + + /** + * Sort the given map entries by PartitionItem Range(LOW, HIGH) + * When comparing two ranges, the one with smaller LOW value is smaller than the other one. + * If two ranges have same values of LOW, the one with larger HIGH value is smaller. + * + * For now, we only support year, month, day and hour, + * so it is impossible to have two partially intersect partitions. + * One range is either enclosed by another or has no intersection at all with another. + * + * + * For example, we have these 4 ranges: + * [10, 20), [30, 40), [0, 30), [10, 15) + * + * After sort, they become: + * [0, 30), [10, 20), [10, 15), [30, 40) + */ + @VisibleForTesting + public static List> sortPartitionMap(Map originPartitions) { + List> entries = new ArrayList<>(originPartitions.entrySet()); + entries.sort(new RangeComparator()); + return entries; + } + + public static class RangeComparator implements Comparator> { + @Override + public int compare(Map.Entry p1, Map.Entry p2) { + PartitionItem value1 = p1.getValue(); + PartitionItem value2 = p2.getValue(); + if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { + Range items1 = value1.getItems(); + Range items2 = value2.getItems(); + if (!items1.hasLowerBound()) { + return -1; + } + if (!items2.hasLowerBound()) { + return 1; + } + PartitionKey upper1 = items1.upperEndpoint(); + PartitionKey lower1 = items1.lowerEndpoint(); + PartitionKey upper2 = items2.upperEndpoint(); + PartitionKey lower2 = items2.lowerEndpoint(); + int compareLow = lower1.compareTo(lower2); + return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; + } + return 0; + } + } + + public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue( + Optional tableSnapshot, + ExternalCatalog catalog, + String dbName, + String tbName) { + IcebergSnapshotCacheValue snapshotCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() + .getSnapshotCache(catalog, dbName, tbName); + if (tableSnapshot.isPresent()) { + // If a snapshot is specified, + // use the specified snapshot and the corresponding schema(not the latest schema). + Table icebergTable = getIcebergTable(catalog, dbName, tbName); + TableSnapshot snapshot = tableSnapshot.get(); + long querySpecSnapshot = getQuerySpecSnapshot(icebergTable, snapshot); + return new IcebergSnapshotCacheValue( + IcebergPartitionInfo.empty(), + new IcebergSnapshot(querySpecSnapshot, icebergTable.snapshot(querySpecSnapshot).schemaId())); + } else { + // Otherwise, use the latest snapshot and the latest schema. + return snapshotCache; + } + } + + // load table schema from iceberg API to external schema cache. + public static Optional loadSchemaCacheValue( + ExternalCatalog catalog, String dbName, String tbName, long schemaId) { + Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); + List schema = IcebergUtils.getSchema(catalog, dbName, tbName, schemaId); + List tmpColumns = Lists.newArrayList(); + PartitionSpec spec = table.spec(); + for (PartitionField field : spec.fields()) { + Types.NestedField col = table.schema().findField(field.sourceId()); + for (Column c : schema) { + if (c.getName().equalsIgnoreCase(col.name())) { + tmpColumns.add(c); + break; + } + } + } + return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns)); + } + + public static List getIcebergSchema( + TableIf tableIf, + ExternalCatalog catalog, + String dbName, + String tbName) { + Optional snapshotFromContext = MvccUtil.getSnapshotFromContext(tableIf); + IcebergSnapshotCacheValue cacheValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext, catalog, dbName, tbName); + return IcebergUtils.getSchemaCacheValue( + catalog, dbName, tbName, cacheValue.getSnapshot().getSchemaId()) + .getSchema(); + } + + public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue( + Optional snapshot, + ExternalCatalog catalog, + String dbName, + String tbName) { + if (snapshot.isPresent()) { + return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), catalog, dbName, tbName); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 34efa59a459e00..d2f31214b0bb1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -28,7 +28,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; @@ -68,13 +67,11 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.time.DateTimeException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -446,16 +443,7 @@ public boolean isBatchMode() { public Long getSpecifiedSnapshot() { TableSnapshot tableSnapshot = getQueryTableSnapshot(); if (tableSnapshot != null) { - TableSnapshot.VersionType type = tableSnapshot.getType(); - if (type == TableSnapshot.VersionType.VERSION) { - return tableSnapshot.getVersion(); - } else { - long timestamp = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - if (timestamp < 0) { - throw new DateTimeException("can't parse time: " + tableSnapshot.getTime()); - } - return SnapshotUtil.snapshotIdAsOfTime(icebergTable, timestamp); - } + return IcebergUtils.getQuerySpecSnapshot(icebergTable, tableSnapshot); } return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index e68ab2476d92bd..edc54e230be5b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { protected final List topnFilterSortNodes = Lists.newArrayList(); protected TableSnapshot tableSnapshot; + protected List columns; // Save the id of backends which this scan node will be executed on. // This is also important for local shuffle logic. @@ -141,6 +142,13 @@ protected static TNetworkAddress addressToTNetworkAddress(String address) { return result; } + protected List getColumns() { + if (columns == null && desc.getTable() != null) { + columns = desc.getTable().getBaseSchema(); + } + return columns; + } + public TupleDescriptor getTupleDesc() { return desc; } @@ -233,7 +241,7 @@ public void computeColumnsFilter() { // for load scan node, table is null // partitionsInfo maybe null for other scan node, eg: ExternalScanNode... if (desc.getTable() != null) { - computeColumnsFilter(desc.getTable().getBaseSchema(), partitionsInfo); + computeColumnsFilter(getColumns(), partitionsInfo); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java index 251f4d8f6b6757..7408898ab74db6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -17,8 +17,16 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.common.AnalysisException; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Range; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -32,8 +40,10 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; public class IcebergExternalTableTest { @@ -136,4 +146,108 @@ public void testIsSupportedPartitionTable() { Assertions.assertTrue(spyTable.isValidRelatedTableCached()); Assertions.assertTrue(spyTable.validRelatedTableCache()); } + + @Test + public void testGetPartitionRange() throws AnalysisException { + Column c = new Column("ts", PrimitiveType.DATETIMEV2); + List partitionColumns = Lists.newArrayList(c); + + // Test null partition value + Range nullRange = IcebergUtils.getPartitionRange(null, "hour", partitionColumns); + Assertions.assertEquals("0000-01-01 00:00:00", + nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("0000-01-01 00:00:01", + nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); + + // Test hour transform. + Range hour = IcebergUtils.getPartitionRange("100", "hour", partitionColumns); + PartitionKey lowKey = hour.lowerEndpoint(); + PartitionKey upKey = hour.upperEndpoint(); + Assertions.assertEquals("1970-01-05 04:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-01-05 05:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test day transform. + Range day = IcebergUtils.getPartitionRange("100", "day", partitionColumns); + lowKey = day.lowerEndpoint(); + upKey = day.upperEndpoint(); + Assertions.assertEquals("1970-04-11 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-04-12 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test month transform. + Range month = IcebergUtils.getPartitionRange("100", "month", partitionColumns); + lowKey = month.lowerEndpoint(); + upKey = month.upperEndpoint(); + Assertions.assertEquals("1978-05-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1978-06-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test year transform. + Range year = IcebergUtils.getPartitionRange("100", "year", partitionColumns); + lowKey = year.lowerEndpoint(); + upKey = year.upperEndpoint(); + Assertions.assertEquals("2070-01-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("2071-01-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test unsupported transform + Exception exception = Assertions.assertThrows(RuntimeException.class, () -> { + IcebergUtils.getPartitionRange("100", "bucket", partitionColumns); + }); + Assertions.assertEquals("Unsupported transform bucket", exception.getMessage()); + } + + @Test + public void testSortRange() throws AnalysisException { + Column c = new Column("c", PrimitiveType.DATETIMEV2); + ArrayList columns = Lists.newArrayList(c); + PartitionItem nullRange = new RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns)); + PartitionItem year1970 = new RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns)); + PartitionItem year1971 = new RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns)); + PartitionItem month197002 = new RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns)); + PartitionItem month197103 = new RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns)); + PartitionItem month197204 = new RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns)); + PartitionItem day19700202 = new RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns)); + PartitionItem day19730101 = new RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns)); + Map map = Maps.newHashMap(); + map.put("nullRange", nullRange); + map.put("year1970", year1970); + map.put("year1971", year1971); + map.put("month197002", month197002); + map.put("month197103", month197103); + map.put("month197204", month197204); + map.put("day19700202", day19700202); + map.put("day19730101", day19730101); + List> entries = IcebergUtils.sortPartitionMap(map); + Assertions.assertEquals(8, entries.size()); + Assertions.assertEquals("nullRange", entries.get(0).getKey()); + Assertions.assertEquals("year1970", entries.get(1).getKey()); + Assertions.assertEquals("month197002", entries.get(2).getKey()); + Assertions.assertEquals("day19700202", entries.get(3).getKey()); + Assertions.assertEquals("year1971", entries.get(4).getKey()); + Assertions.assertEquals("month197103", entries.get(5).getKey()); + Assertions.assertEquals("month197204", entries.get(6).getKey()); + Assertions.assertEquals("day19730101", entries.get(7).getKey()); + + Map> stringSetMap = IcebergUtils.mergeOverlapPartitions(map); + Assertions.assertEquals(2, stringSetMap.size()); + Assertions.assertTrue(stringSetMap.containsKey("year1970")); + Assertions.assertTrue(stringSetMap.containsKey("year1971")); + + Set names1970 = stringSetMap.get("year1970"); + Assertions.assertEquals(3, names1970.size()); + Assertions.assertTrue(names1970.contains("year1970")); + Assertions.assertTrue(names1970.contains("month197002")); + Assertions.assertTrue(names1970.contains("day19700202")); + + Set names1971 = stringSetMap.get("year1971"); + Assertions.assertEquals(2, names1971.size()); + Assertions.assertTrue(names1971.contains("year1971")); + Assertions.assertTrue(names1971.contains("month197103")); + + Assertions.assertEquals(5, map.size()); + Assertions.assertTrue(map.containsKey("nullRange")); + Assertions.assertTrue(map.containsKey("year1970")); + Assertions.assertTrue(map.containsKey("year1971")); + Assertions.assertTrue(map.containsKey("month197204")); + Assertions.assertTrue(map.containsKey("day19730101")); + } } + diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out new file mode 100644 index 00000000000000..27e9359b71346c --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out @@ -0,0 +1,77 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q0 -- +c1 int Yes true \N +c3 int Yes true \N +c2 int Yes true \N +c4 int Yes true \N + +-- !q1 -- +1 \N \N \N +2 \N \N \N +4 6 \N \N +7 8 \N \N +9 10 11 \N + +-- !q2 -- +1 + +-- !q3 -- +1 \N +2 3 + +-- !q4 -- +1 \N \N +2 3 \N +4 5 6 + +-- !q5 -- +1 \N +2 \N +4 6 +7 8 + +-- !q6 -- +1 \N \N +2 \N \N +4 6 \N +7 8 \N +9 10 11 + +-- !q0 -- +c1 int Yes true \N +c3 int Yes true \N +c2 int Yes true \N +c4 int Yes true \N + +-- !q1 -- +1 \N \N \N +2 \N \N \N +4 6 \N \N +7 8 \N \N +9 10 11 \N + +-- !q2 -- +1 + +-- !q3 -- +1 \N +2 3 + +-- !q4 -- +1 \N \N +2 3 \N +4 5 6 + +-- !q5 -- +1 \N +2 \N +4 6 +7 8 + +-- !q6 -- +1 \N \N +2 \N \N +4 6 \N +7 8 \N +9 10 11 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy new file mode 100644 index 00000000000000..a376dfc210d41f --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + + +suite("iceberg_schema_change_with_timetravel", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_schema_change_with_timetravel" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + + def executeTimeTravelingQueries = { String tableName -> + def snapshots = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by committed_at; """ + def snapshotIds = [ + s0: snapshots.get(0)[0], + s1: snapshots.get(1)[0], + s2: snapshots.get(2)[0], + s3: snapshots.get(3)[0], + s4: snapshots.get(4)[0] + ] + + qt_q0 """ desc ${tableName} """ + qt_q1 """ select * from ${tableName} order by c1 """ + qt_q2 """ select * from ${tableName} for version as of ${snapshotIds.s0} order by c1 """ + qt_q3 """ select * from ${tableName} for version as of ${snapshotIds.s1} order by c1 """ + qt_q4 """ select * from ${tableName} for version as of ${snapshotIds.s2} order by c1 """ + qt_q5 """ select * from ${tableName} for version as of ${snapshotIds.s3} order by c1 """ + qt_q6 """ select * from ${tableName} for version as of ${snapshotIds.s4} order by c1 """ + } + + executeTimeTravelingQueries("schema_change_with_time_travel") + executeTimeTravelingQueries("schema_change_with_time_travel_orc") + +} + +/* +create table schema_change_with_time_travel (c1 int); +insert into schema_change_with_time_travel values (1); +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (2,3); +alter table schema_change_with_time_travel add column c3 int; +insert into schema_change_with_time_travel values (4,5,6); +alter table schema_change_with_time_travel drop column c2; +insert into schema_change_with_time_travel values (7,8); +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (9,10,11); +alter table schema_change_with_time_travel add column c4 int; +*/ +