diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql new file mode 100644 index 00000000000000..c5795883c4defd --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql @@ -0,0 +1,37 @@ +use demo.test_db; + +create table schema_change_with_time_travel (c1 int); +insert into schema_change_with_time_travel values (1); + +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (2,3); + +alter table schema_change_with_time_travel add column c3 int; +insert into schema_change_with_time_travel values (4,5,6); + +alter table schema_change_with_time_travel drop column c2; +insert into schema_change_with_time_travel values (7,8); + +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (9,10,11); + +alter table schema_change_with_time_travel add column c4 int; + + +create table schema_change_with_time_travel_orc (c1 int) tblproperties ("write.format.default"="orc"); +insert into schema_change_with_time_travel_orc values (1); + +alter table schema_change_with_time_travel_orc add column c2 int; +insert into schema_change_with_time_travel_orc values (2,3); + +alter table schema_change_with_time_travel_orc add column c3 int; +insert into schema_change_with_time_travel_orc values (4,5,6); + +alter table schema_change_with_time_travel_orc drop column c2; +insert into schema_change_with_time_travel_orc values (7,8); + +alter table schema_change_with_time_travel_orc add column c2 int; +insert into schema_change_with_time_travel_orc values (9,10,11); + +alter table schema_change_with_time_travel_orc add column c4 int; + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 78fbc77698d34f..7d95d959ba618d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -143,17 +143,6 @@ default List getBaseSchemaOrEmpty() { Column getColumn(String name); - default int getBaseColumnIdxByName(String colName) { - int i = 0; - for (Column col : getBaseSchema()) { - if (col.getName().equalsIgnoreCase(colName)) { - return i; - } - ++i; - } - return -1; - } - String getMysqlType(); String getEngine(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 25b74bb6eff2a2..5c2857cc0b61f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -133,6 +133,8 @@ public abstract class ExternalCatalog CREATE_TIME, USE_META_CACHE); + protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index 0fb66c4e3f9e45..da9a25d71be7ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -100,11 +100,9 @@ public void addSchemaForTest(String dbName, String tblName, ImmutableList key.dbName.equals(dbName) && key.tblName.equals(tblName)) + .forEach(schemaCache::invalidate); } public void invalidateDbCache(String dbName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 56fecb24afcef3..fc1058105f102a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -252,7 +252,14 @@ private void setColumnPositionMapping() } SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId()); String colName = slotDesc.getColumn().getName(); - int idx = tbl.getBaseColumnIdxByName(colName); + int idx = -1; + List columns = getColumns(); + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(colName)) { + idx = i; + break; + } + } if (idx == -1) { throw new UserException("Column " + colName + " not found in table " + tbl.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 8d3aeaa6a267a0..d2c6230ba6bba3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -193,7 +193,7 @@ protected void setDefaultValueExprs(TableIf tbl, TExpr tExpr = new TExpr(); tExpr.setNodes(Lists.newArrayList()); - for (Column column : tbl.getBaseSchema()) { + for (Column column : getColumns()) { Expr expr; if (column.getDefaultValue() != null) { if (column.getDefaultValueExprDef() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java index 7894279b2950ac..034ce1ae443644 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java @@ -73,4 +73,13 @@ abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional getFullSchema() { if (getDlaType() == DLAType.HUDI) { return ((HudiDlaTable) dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)) .getSchema(); + } else if (getDlaType() == DLAType.ICEBERG) { + return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName()); } Optional schemaCacheValue = cache.getSchemaValue(dbName, name); return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null); @@ -1047,8 +1050,12 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { public MvccSnapshot loadSnapshot(Optional tableSnapshot) { if (getDlaType() == DLAType.HUDI) { return new HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this)); + } else if (getDlaType() == DLAType.ICEBERG) { + return new IcebergMvccSnapshot( + IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, getCatalog(), getDbName(), getName())); + } else { + return new EmptyMvccSnapshot(); } - return new EmptyMvccSnapshot(); } public boolean firstColumnIsString() { @@ -1084,4 +1091,9 @@ public List getSupportedSysTables() { return Lists.newArrayList(); } } + + public boolean isValidRelatedTable() { + makeSureInitialized(); + return dlaTable.isValidRelatedTable(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java new file mode 100644 index 00000000000000..36b871282a97f7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue; +import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue; +import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot; +import org.apache.doris.mtmv.MTMVSnapshotIf; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class IcebergDlaTable extends HMSDlaTable { + + private boolean isValidRelatedTableCached = false; + private boolean isValidRelatedTable = false; + + public IcebergDlaTable(HMSExternalTable table) { + super(table); + } + + @Override + public Map getAndCopyPartitionItems(Optional snapshot) { + return Maps.newHashMap( + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()) + .getPartitionInfo().getNameToPartitionItem()); + } + + @Override + public PartitionType getPartitionType(Optional snapshot) { + return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames(Optional snapshot) { + return getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns(Optional snapshot) { + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue( + hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName(), + snapshotValue.getSnapshot().getSchemaId()); + return schemaValue.getPartitionColumns(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); + if (latestSnapshotId <= 0) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVSnapshotIdSnapshot(latestSnapshotId); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException { + hmsTable.makeSureInitialized(); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId()); + } + + @Override + boolean isPartitionColumnAllowNull() { + return true; + } + + @Override + protected boolean isValidRelatedTable() { + if (isValidRelatedTableCached) { + return isValidRelatedTable; + } + isValidRelatedTable = false; + Set allFields = Sets.newHashSet(); + Table table = IcebergUtils.getIcebergTable( + hmsTable.getCatalog(), + hmsTable.getDbName(), + hmsTable.getName() + ); + for (PartitionSpec spec : table.specs().values()) { + if (spec == null) { + isValidRelatedTableCached = true; + return false; + } + List fields = spec.fields(); + if (fields.size() != 1) { + isValidRelatedTableCached = true; + return false; + } + PartitionField partitionField = spec.fields().get(0); + String transformName = partitionField.transform().toString(); + if (!IcebergUtils.YEAR.equals(transformName) + && !IcebergUtils.MONTH.equals(transformName) + && !IcebergUtils.DAY.equals(transformName) + && !IcebergUtils.HOUR.equals(transformName)) { + isValidRelatedTableCached = true; + return false; + } + allFields.add(table.schema().findColumnName(partitionField.sourceId())); + } + isValidRelatedTableCached = true; + isValidRelatedTable = allFields.size() == 1; + return isValidRelatedTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index b695a268b0f649..411493700975b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -45,7 +45,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; - private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16; public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index e9bffbf56e3623..6990e8a7449c6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -17,25 +17,18 @@ package org.apache.doris.datasource.iceberg; -import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; -import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccTable; -import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.systable.SupportedSysTables; import org.apache.doris.datasource.systable.SysTable; import org.apache.doris.mtmv.MTMVBaseTableIf; @@ -52,32 +45,12 @@ import org.apache.doris.thrift.TTableType; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Range; import com.google.common.collect.Sets; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionsTable; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructProjection; - -import java.io.IOException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.Month; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Comparator; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -87,15 +60,7 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { - public static final String YEAR = "year"; - public static final String MONTH = "month"; - public static final String DAY = "day"; - public static final String HOUR = "hour"; - public static final String IDENTITY = "identity"; - public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec - private Table table; - private List partitionColumns; private boolean isValidRelatedTableCached = false; private boolean isValidRelatedTable = false; @@ -120,29 +85,9 @@ public void setTable(Table table) { this.table = table; } - @VisibleForTesting - public void setPartitionColumns(List partitionColumns) { - this.partitionColumns = partitionColumns; - } - @Override public Optional initSchema(SchemaCacheKey key) { - table = getIcebergTable(); - List schema = IcebergUtils.getSchema(catalog, dbName, name, - ((IcebergSchemaCacheKey) key).getSchemaId()); - List tmpColumns = Lists.newArrayList(); - PartitionSpec spec = table.spec(); - for (PartitionField field : spec.fields()) { - Types.NestedField col = table.schema().findField(field.sourceId()); - for (Column c : schema) { - if (c.getName().equalsIgnoreCase(col.name())) { - tmpColumns.add(c); - break; - } - } - } - partitionColumns = tmpColumns; - return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns)); + return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, ((IcebergSchemaCacheKey) key).getSchemaId()); } @Override @@ -180,23 +125,21 @@ public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } - private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() { - return Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() - .getSnapshotCache(catalog, dbName, name); - } - @Override public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } @Override public Map getAndCopyPartitionItems(Optional snapshot) { - return Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem()); + return Maps.newHashMap( + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()) + .getPartitionInfo().getNameToPartitionItem()); } @Override public Map getNameToPartitionItems(Optional snapshot) { - return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()) + .getPartitionInfo().getNameToPartitionItem(); } @Override @@ -211,15 +154,18 @@ public Set getPartitionColumnNames(Optional snapshot) thro @Override public List getPartitionColumns(Optional snapshot) { - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); - IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId()); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); + IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue( + catalog, getDbName(), getName(), snapshotValue.getSnapshot().getSchemaId()); return schemaValue.getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); if (latestSnapshotId <= 0) { throw new AnalysisException("can not find partition: " + partitionName); @@ -231,7 +177,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { makeSureInitialized(); - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId()); } @@ -266,10 +213,10 @@ public boolean isValidRelatedTable() { } PartitionField partitionField = spec.fields().get(0); String transformName = partitionField.transform().toString(); - if (!YEAR.equals(transformName) - && !MONTH.equals(transformName) - && !DAY.equals(transformName) - && !HOUR.equals(transformName)) { + if (!IcebergUtils.YEAR.equals(transformName) + && !IcebergUtils.MONTH.equals(transformName) + && !IcebergUtils.DAY.equals(transformName) + && !IcebergUtils.HOUR.equals(transformName)) { isValidRelatedTableCached = true; return false; } @@ -282,27 +229,13 @@ public boolean isValidRelatedTable() { @Override public MvccSnapshot loadSnapshot(Optional tableSnapshot) { - return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue()); - } - - public long getLatestSnapshotId() { - table = getIcebergTable(); - Snapshot snapshot = table.currentSnapshot(); - return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : table.currentSnapshot().snapshotId(); - } - - public long getSchemaId(long snapshotId) { - table = getIcebergTable(); - return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID - ? IcebergUtils.UNKNOWN_SNAPSHOT_ID - : table.snapshot(snapshotId).schemaId(); + return new IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue( + tableSnapshot, getCatalog(), getDbName(), getName())); } @Override public List getFullSchema() { - Optional snapshotFromContext = MvccUtil.getSnapshotFromContext(this); - IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue(snapshotFromContext); - return getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema(); + return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName()); } @Override @@ -310,239 +243,6 @@ public boolean supportInternalPartitionPruned() { return true; } - public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) { - ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); - Optional schemaCacheValue = cache.getSchemaValue( - new IcebergSchemaCacheKey(dbName, name, schemaId)); - if (!schemaCacheValue.isPresent()) { - throw new CacheException("failed to getSchema for: %s.%s.%s.%s", - null, catalog.getName(), dbName, name, schemaId); - } - return (IcebergSchemaCacheValue) schemaCacheValue.get(); - } - - public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws AnalysisException { - // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. - if (!isValidRelatedTable() || snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { - return new IcebergPartitionInfo(); - } - List icebergPartitions = loadIcebergPartition(snapshotId); - Map nameToPartition = Maps.newHashMap(); - Map nameToPartitionItem = Maps.newHashMap(); - table = getIcebergTable(); - partitionColumns = getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns(); - for (IcebergPartition partition : icebergPartitions) { - nameToPartition.put(partition.getPartitionName(), partition); - String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); - Range partitionRange = getPartitionRange( - partition.getPartitionValues().get(0), transform, partitionColumns); - PartitionItem item = new RangePartitionItem(partitionRange); - nameToPartitionItem.put(partition.getPartitionName(), item); - } - Map> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); - return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); - } - - public List loadIcebergPartition(long snapshotId) { - PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils - .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); - List partitions = Lists.newArrayList(); - try (CloseableIterable tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { - for (FileScanTask task : tasks) { - CloseableIterable rows = task.asDataTask().rows(); - for (StructLike row : rows) { - partitions.add(generateIcebergPartition(row)); - } - } - } catch (IOException e) { - LOG.warn("Failed to get Iceberg table {} partition info.", name, e); - } - return partitions; - } - - public IcebergPartition generateIcebergPartition(StructLike row) { - // row format : - // 0. partitionData, - // 1. spec_id, - // 2. record_count, - // 3. file_count, - // 4. total_data_file_size_in_bytes, - // 5. position_delete_record_count, - // 6. position_delete_file_count, - // 7. equality_delete_record_count, - // 8. equality_delete_file_count, - // 9. last_updated_at, - // 10. last_updated_snapshot_id - table = getIcebergTable(); - Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); - int specId = row.get(1, Integer.class); - PartitionSpec partitionSpec = table.specs().get(specId); - StructProjection partitionData = row.get(0, StructProjection.class); - StringBuilder sb = new StringBuilder(); - List partitionValues = Lists.newArrayList(); - List transforms = Lists.newArrayList(); - for (int i = 0; i < partitionSpec.fields().size(); ++i) { - PartitionField partitionField = partitionSpec.fields().get(i); - Class fieldClass = partitionSpec.javaClasses()[i]; - int fieldId = partitionField.fieldId(); - // Iceberg partition field id starts at PARTITION_DATA_ID_START, - // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START - int index = fieldId - PARTITION_DATA_ID_START; - Object o = partitionData.get(index, fieldClass); - String fieldValue = o == null ? null : o.toString(); - String fieldName = partitionField.name(); - sb.append(fieldName); - sb.append("="); - sb.append(fieldValue); - sb.append("/"); - partitionValues.add(fieldValue); - transforms.add(partitionField.transform().toString()); - } - if (sb.length() > 0) { - sb.delete(sb.length() - 1, sb.length()); - } - String partitionName = sb.toString(); - long recordCount = row.get(2, Long.class); - long fileCount = row.get(3, Integer.class); - long fileSizeInBytes = row.get(4, Long.class); - long lastUpdateTime = row.get(9, Long.class); - long lastUpdateSnapShotId = row.get(10, Long.class); - return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, - lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); - } - - @VisibleForTesting - public Range getPartitionRange(String value, String transform, List partitionColumns) - throws AnalysisException { - // For NULL value, create a minimum partition for it. - if (value == null) { - PartitionKey nullLowKey = PartitionKey.createPartitionKey( - Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); - PartitionKey nullUpKey = nullLowKey.successor(); - return Range.closedOpen(nullLowKey, nullUpKey); - } - LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); - LocalDateTime target; - LocalDateTime lower; - LocalDateTime upper; - long longValue = Long.parseLong(value); - switch (transform) { - case HOUR: - target = epoch.plusHours(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), - target.getHour(), 0, 0); - upper = lower.plusHours(1); - break; - case DAY: - target = epoch.plusDays(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); - upper = lower.plusDays(1); - break; - case MONTH: - target = epoch.plusMonths(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); - upper = lower.plusMonths(1); - break; - case YEAR: - target = epoch.plusYears(longValue); - lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); - upper = lower.plusYears(1); - break; - default: - throw new RuntimeException("Unsupported transform " + transform); - } - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - Column c = partitionColumns.get(0); - Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); - if (c.getType().isDate() || c.getType().isDateV2()) { - formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - } - PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); - PartitionValue upperValue = new PartitionValue(upper.format(formatter)); - PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); - PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); - return Range.closedOpen(lowKey, upperKey); - } - - /** - * Merge overlapped iceberg partitions into one Doris partition. - */ - public Map> mergeOverlapPartitions(Map originPartitions) { - List> entries = sortPartitionMap(originPartitions); - Map> map = Maps.newHashMap(); - for (int i = 0; i < entries.size() - 1; i++) { - Range firstValue = entries.get(i).getValue().getItems(); - String firstKey = entries.get(i).getKey(); - Range secondValue = entries.get(i + 1).getValue().getItems(); - String secondKey = entries.get(i + 1).getKey(); - // If the first entry enclose the second one, remove the second entry and keep a record in the return map. - // So we can track the iceberg partitions those contained by one Doris partition. - while (i < entries.size() && firstValue.encloses(secondValue)) { - originPartitions.remove(secondKey); - map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); - String finalSecondKey = secondKey; - map.computeIfPresent(firstKey, (key, value) -> { - value.add(finalSecondKey); - return value; - }); - i++; - if (i >= entries.size() - 1) { - break; - } - secondValue = entries.get(i + 1).getValue().getItems(); - secondKey = entries.get(i + 1).getKey(); - } - } - return map; - } - - /** - * Sort the given map entries by PartitionItem Range(LOW, HIGH) - * When comparing two ranges, the one with smaller LOW value is smaller than the other one. - * If two ranges have same values of LOW, the one with larger HIGH value is smaller. - * - * For now, we only support year, month, day and hour, - * so it is impossible to have two partially intersect partitions. - * One range is either enclosed by another or has no intersection at all with another. - * - * - * For example, we have these 4 ranges: - * [10, 20), [30, 40), [0, 30), [10, 15) - * - * After sort, they become: - * [0, 30), [10, 20), [10, 15), [30, 40) - */ - public List> sortPartitionMap(Map originPartitions) { - List> entries = new ArrayList<>(originPartitions.entrySet()); - entries.sort(new RangeComparator()); - return entries; - } - - public static class RangeComparator implements Comparator> { - @Override - public int compare(Map.Entry p1, Map.Entry p2) { - PartitionItem value1 = p1.getValue(); - PartitionItem value2 = p2.getValue(); - if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { - Range items1 = value1.getItems(); - Range items2 = value2.getItems(); - if (!items1.hasLowerBound()) { - return -1; - } - if (!items2.hasLowerBound()) { - return 1; - } - PartitionKey upper1 = items1.upperEndpoint(); - PartitionKey lower1 = items1.lowerEndpoint(); - PartitionKey upper2 = items2.upperEndpoint(); - PartitionKey lower2 = items2.lowerEndpoint(); - int compareLow = lower1.compareTo(lower2); - return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; - } - return 0; - } - } - @VisibleForTesting public boolean isValidRelatedTableCached() { return isValidRelatedTableCached; @@ -557,14 +257,6 @@ public void setIsValidRelatedTableCached(boolean isCached) { this.isValidRelatedTableCached = isCached; } - public IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { - if (snapshot.isPresent()) { - return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); - } else { - return getIcebergSnapshotCacheValue(); - } - } - @Override public List getSupportedSysTables() { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index b776a8c3a472b3..f99b652b42dd16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -24,6 +24,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Iterables; @@ -123,12 +124,18 @@ private Table loadTable(IcebergMetadataCacheKey key) { @NotNull private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey key) throws AnalysisException { - IcebergExternalTable table = (IcebergExternalTable) key.catalog.getDbOrAnalysisException(key.dbName) + MTMVRelatedTableIf table = (MTMVRelatedTableIf) key.catalog.getDbOrAnalysisException(key.dbName) .getTableOrAnalysisException(key.tableName); - long snapshotId = table.getLatestSnapshotId(); - long schemaId = table.getSchemaId(snapshotId); - IcebergPartitionInfo icebergPartitionInfo = table.loadPartitionInfo(snapshotId); - return new IcebergSnapshotCacheValue(icebergPartitionInfo, new IcebergSnapshot(snapshotId, schemaId)); + IcebergSnapshot lastedIcebergSnapshot = IcebergUtils.getLastedIcebergSnapshot( + (ExternalCatalog) key.catalog, key.dbName, key.tableName); + IcebergPartitionInfo icebergPartitionInfo; + if (!table.isValidRelatedTable()) { + icebergPartitionInfo = IcebergPartitionInfo.empty(); + } else { + icebergPartitionInfo = IcebergUtils.loadPartitionInfo( + (ExternalCatalog) key.catalog, key.dbName, key.tableName, lastedIcebergSnapshot.getSnapshotId()); + } + return new IcebergSnapshotCacheValue(icebergPartitionInfo, lastedIcebergSnapshot); } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java index 9edb2137f4f389..bb46eb2122399d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java @@ -29,12 +29,18 @@ public class IcebergPartitionInfo { private final Map nameToIcebergPartition; private final Map> nameToIcebergPartitionNames; + private static final IcebergPartitionInfo EMPTY = new IcebergPartitionInfo(); + public IcebergPartitionInfo() { this.nameToPartitionItem = Maps.newHashMap(); this.nameToIcebergPartition = Maps.newHashMap(); this.nameToIcebergPartitionNames = Maps.newHashMap(); } + static IcebergPartitionInfo empty() { + return EMPTY; + } + public IcebergPartitionInfo(Map nameToPartitionItem, Map nameToIcebergPartition, Map> nameToIcebergPartitionNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index f61c226ed752d3..ab91d9c3d90d59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -31,39 +31,58 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionsTable; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.And; @@ -79,14 +98,25 @@ import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.StructProjection; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; +import java.time.DateTimeException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.Month; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -117,7 +147,15 @@ public Integer initialValue() { // nickname in spark public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; - public static final long UNKNOWN_SNAPSHOT_ID = -1; + public static final long UNKNOWN_SNAPSHOT_ID = -1; // means an empty table + public static final long NEWEST_SCHEMA_ID = -1; + + public static final String YEAR = "year"; + public static final String MONTH = "month"; + public static final String DAY = "day"; + public static final String HOUR = "hour"; + public static final String IDENTITY = "identity"; + public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { @@ -582,10 +620,6 @@ private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog : metadataCache.getIcebergTable(catalog, dbName, tblName); } - public static List getSchema(ExternalCatalog catalog, String dbName, String name) { - return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID); - } - /** * Get iceberg schema from catalog and convert them to doris schema */ @@ -594,7 +628,7 @@ public static List getSchema(ExternalCatalog catalog, String dbName, Str return catalog.getPreExecutionAuthenticator().execute(() -> { org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); Schema schema; - if (schemaId == UNKNOWN_SNAPSHOT_ID || icebergTable.currentSnapshot() == null) { + if (schemaId == NEWEST_SCHEMA_ID || icebergTable.currentSnapshot() == null) { schema = icebergTable.schema(); } else { schema = icebergTable.schemas().get((int) schemaId); @@ -725,25 +759,6 @@ public static HiveCatalog createIcebergHiveCatalog(ExternalCatalog externalCatal return hiveCatalog; } - // load table schema from iceberg API to external schema cache. - public static Optional loadSchemaCacheValue( - ExternalCatalog catalog, String dbName, String tbName, long schemaId) { - Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); - List schema = IcebergUtils.getSchema(catalog, dbName, tbName, schemaId); - List tmpColumns = Lists.newArrayList(); - PartitionSpec spec = table.spec(); - for (PartitionField field : spec.fields()) { - Types.NestedField col = table.schema().findField(field.sourceId()); - for (Column c : schema) { - if (c.getName().equalsIgnoreCase(col.name())) { - tmpColumns.add(c); - break; - } - } - } - return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns)); - } - // Retrieve the manifest files that match the query based on partitions in filter public static CloseableIterable getMatchingManifest( List dataManifests, @@ -772,4 +787,330 @@ public static CloseableIterable getMatchingManifest( return matchingManifests; } + + // get snapshot id from query like 'for version/time as of' + public static long getQuerySpecSnapshot(Table table, TableSnapshot queryTableSnapshot) { + TableSnapshot.VersionType type = queryTableSnapshot.getType(); + if (type == TableSnapshot.VersionType.VERSION) { + return queryTableSnapshot.getVersion(); + } else { + long timestamp = TimeUtils.timeStringToLong(queryTableSnapshot.getTime(), TimeUtils.getTimeZone()); + if (timestamp < 0) { + throw new DateTimeException("can't parse time: " + queryTableSnapshot.getTime()); + } + return SnapshotUtil.snapshotIdAsOfTime(table, timestamp); + } + } + + // read schema from external schema cache + public static IcebergSchemaCacheValue getSchemaCacheValue( + ExternalCatalog catalog, String dbName, String name, long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new IcebergSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); + } + return (IcebergSchemaCacheValue) schemaCacheValue.get(); + } + + public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog catalog, String dbName, String tbName) { + Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); + Snapshot snapshot = table.currentSnapshot(); + long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : snapshot.snapshotId(); + return new IcebergSnapshot(snapshotId, table.schema().schemaId()); + } + + public static IcebergPartitionInfo loadPartitionInfo( + ExternalCatalog catalog, String dbName, String tbName, long snapshotId) throws AnalysisException { + // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. + if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { + return IcebergPartitionInfo.empty(); + } + Table table = getIcebergTable(catalog, dbName, tbName); + List icebergPartitions = loadIcebergPartition(table, snapshotId); + Map nameToPartition = Maps.newHashMap(); + Map nameToPartitionItem = Maps.newHashMap(); + + List partitionColumns = IcebergUtils.getSchemaCacheValue( + catalog, dbName, tbName, table.snapshot(snapshotId).schemaId()).getPartitionColumns(); + for (IcebergPartition partition : icebergPartitions) { + nameToPartition.put(partition.getPartitionName(), partition); + String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); + Range partitionRange = getPartitionRange( + partition.getPartitionValues().get(0), transform, partitionColumns); + PartitionItem item = new RangePartitionItem(partitionRange); + nameToPartitionItem.put(partition.getPartitionName(), item); + } + Map> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); + return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); + } + + private static List loadIcebergPartition(Table table, long snapshotId) { + PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils + .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); + List partitions = Lists.newArrayList(); + try (CloseableIterable tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { + for (FileScanTask task : tasks) { + CloseableIterable rows = task.asDataTask().rows(); + for (StructLike row : rows) { + partitions.add(generateIcebergPartition(table, row)); + } + } + } catch (IOException e) { + LOG.warn("Failed to get Iceberg table {} partition info.", table.name(), e); + } + return partitions; + } + + private static IcebergPartition generateIcebergPartition(Table table, StructLike row) { + // row format : + // 0. partitionData, + // 1. spec_id, + // 2. record_count, + // 3. file_count, + // 4. total_data_file_size_in_bytes, + // 5. position_delete_record_count, + // 6. position_delete_file_count, + // 7. equality_delete_record_count, + // 8. equality_delete_file_count, + // 9. last_updated_at, + // 10. last_updated_snapshot_id + Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); + int specId = row.get(1, Integer.class); + PartitionSpec partitionSpec = table.specs().get(specId); + StructProjection partitionData = row.get(0, StructProjection.class); + StringBuilder sb = new StringBuilder(); + List partitionValues = Lists.newArrayList(); + List transforms = Lists.newArrayList(); + for (int i = 0; i < partitionSpec.fields().size(); ++i) { + PartitionField partitionField = partitionSpec.fields().get(i); + Class fieldClass = partitionSpec.javaClasses()[i]; + int fieldId = partitionField.fieldId(); + // Iceberg partition field id starts at PARTITION_DATA_ID_START, + // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START + int index = fieldId - PARTITION_DATA_ID_START; + Object o = partitionData.get(index, fieldClass); + String fieldValue = o == null ? null : o.toString(); + String fieldName = partitionField.name(); + sb.append(fieldName); + sb.append("="); + sb.append(fieldValue); + sb.append("/"); + partitionValues.add(fieldValue); + transforms.add(partitionField.transform().toString()); + } + if (sb.length() > 0) { + sb.delete(sb.length() - 1, sb.length()); + } + String partitionName = sb.toString(); + long recordCount = row.get(2, Long.class); + long fileCount = row.get(3, Integer.class); + long fileSizeInBytes = row.get(4, Long.class); + long lastUpdateTime = row.get(9, Long.class); + long lastUpdateSnapShotId = row.get(10, Long.class); + return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, + lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); + } + + @VisibleForTesting + public static Range getPartitionRange(String value, String transform, List partitionColumns) + throws AnalysisException { + // For NULL value, create a minimum partition for it. + if (value == null) { + PartitionKey nullLowKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); + PartitionKey nullUpKey = nullLowKey.successor(); + return Range.closedOpen(nullLowKey, nullUpKey); + } + LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); + LocalDateTime target; + LocalDateTime lower; + LocalDateTime upper; + long longValue = Long.parseLong(value); + switch (transform) { + case HOUR: + target = epoch.plusHours(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), + target.getHour(), 0, 0); + upper = lower.plusHours(1); + break; + case DAY: + target = epoch.plusDays(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); + upper = lower.plusDays(1); + break; + case MONTH: + target = epoch.plusMonths(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); + upper = lower.plusMonths(1); + break; + case YEAR: + target = epoch.plusYears(longValue); + lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); + upper = lower.plusYears(1); + break; + default: + throw new RuntimeException("Unsupported transform " + transform); + } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + Column c = partitionColumns.get(0); + Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); + if (c.getType().isDate() || c.getType().isDateV2()) { + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + } + PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); + PartitionValue upperValue = new PartitionValue(upper.format(formatter)); + PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); + PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); + return Range.closedOpen(lowKey, upperKey); + } + + /** + * Merge overlapped iceberg partitions into one Doris partition. + */ + @VisibleForTesting + public static Map> mergeOverlapPartitions(Map originPartitions) { + List> entries = sortPartitionMap(originPartitions); + Map> map = Maps.newHashMap(); + for (int i = 0; i < entries.size() - 1; i++) { + Range firstValue = entries.get(i).getValue().getItems(); + String firstKey = entries.get(i).getKey(); + Range secondValue = entries.get(i + 1).getValue().getItems(); + String secondKey = entries.get(i + 1).getKey(); + // If the first entry enclose the second one, remove the second entry and keep a record in the return map. + // So we can track the iceberg partitions those contained by one Doris partition. + while (i < entries.size() && firstValue.encloses(secondValue)) { + originPartitions.remove(secondKey); + map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); + String finalSecondKey = secondKey; + map.computeIfPresent(firstKey, (key, value) -> { + value.add(finalSecondKey); + return value; + }); + i++; + if (i >= entries.size() - 1) { + break; + } + secondValue = entries.get(i + 1).getValue().getItems(); + secondKey = entries.get(i + 1).getKey(); + } + } + return map; + } + + /** + * Sort the given map entries by PartitionItem Range(LOW, HIGH) + * When comparing two ranges, the one with smaller LOW value is smaller than the other one. + * If two ranges have same values of LOW, the one with larger HIGH value is smaller. + * + * For now, we only support year, month, day and hour, + * so it is impossible to have two partially intersect partitions. + * One range is either enclosed by another or has no intersection at all with another. + * + * + * For example, we have these 4 ranges: + * [10, 20), [30, 40), [0, 30), [10, 15) + * + * After sort, they become: + * [0, 30), [10, 20), [10, 15), [30, 40) + */ + @VisibleForTesting + public static List> sortPartitionMap(Map originPartitions) { + List> entries = new ArrayList<>(originPartitions.entrySet()); + entries.sort(new RangeComparator()); + return entries; + } + + public static class RangeComparator implements Comparator> { + @Override + public int compare(Map.Entry p1, Map.Entry p2) { + PartitionItem value1 = p1.getValue(); + PartitionItem value2 = p2.getValue(); + if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { + Range items1 = value1.getItems(); + Range items2 = value2.getItems(); + if (!items1.hasLowerBound()) { + return -1; + } + if (!items2.hasLowerBound()) { + return 1; + } + PartitionKey upper1 = items1.upperEndpoint(); + PartitionKey lower1 = items1.lowerEndpoint(); + PartitionKey upper2 = items2.upperEndpoint(); + PartitionKey lower2 = items2.lowerEndpoint(); + int compareLow = lower1.compareTo(lower2); + return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; + } + return 0; + } + } + + public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue( + Optional tableSnapshot, + ExternalCatalog catalog, + String dbName, + String tbName) { + IcebergSnapshotCacheValue snapshotCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() + .getSnapshotCache(catalog, dbName, tbName); + if (tableSnapshot.isPresent()) { + // If a snapshot is specified, + // use the specified snapshot and the corresponding schema(not the latest schema). + Table icebergTable = getIcebergTable(catalog, dbName, tbName); + TableSnapshot snapshot = tableSnapshot.get(); + long querySpecSnapshot = getQuerySpecSnapshot(icebergTable, snapshot); + return new IcebergSnapshotCacheValue( + IcebergPartitionInfo.empty(), + new IcebergSnapshot(querySpecSnapshot, icebergTable.snapshot(querySpecSnapshot).schemaId())); + } else { + // Otherwise, use the latest snapshot and the latest schema. + return snapshotCache; + } + } + + // load table schema from iceberg API to external schema cache. + public static Optional loadSchemaCacheValue( + ExternalCatalog catalog, String dbName, String tbName, long schemaId) { + Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); + List schema = IcebergUtils.getSchema(catalog, dbName, tbName, schemaId); + List tmpColumns = Lists.newArrayList(); + PartitionSpec spec = table.spec(); + for (PartitionField field : spec.fields()) { + Types.NestedField col = table.schema().findField(field.sourceId()); + for (Column c : schema) { + if (c.getName().equalsIgnoreCase(col.name())) { + tmpColumns.add(c); + break; + } + } + } + return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns)); + } + + public static List getIcebergSchema( + TableIf tableIf, + ExternalCatalog catalog, + String dbName, + String tbName) { + Optional snapshotFromContext = MvccUtil.getSnapshotFromContext(tableIf); + IcebergSnapshotCacheValue cacheValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext, catalog, dbName, tbName); + return IcebergUtils.getSchemaCacheValue( + catalog, dbName, tbName, cacheValue.getSnapshot().getSchemaId()) + .getSchema(); + } + + public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue( + Optional snapshot, + ExternalCatalog catalog, + String dbName, + String tbName) { + if (snapshot.isPresent()) { + return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), catalog, dbName, tbName); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 34efa59a459e00..d2f31214b0bb1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -28,7 +28,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; @@ -68,13 +67,11 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.time.DateTimeException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -446,16 +443,7 @@ public boolean isBatchMode() { public Long getSpecifiedSnapshot() { TableSnapshot tableSnapshot = getQueryTableSnapshot(); if (tableSnapshot != null) { - TableSnapshot.VersionType type = tableSnapshot.getType(); - if (type == TableSnapshot.VersionType.VERSION) { - return tableSnapshot.getVersion(); - } else { - long timestamp = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - if (timestamp < 0) { - throw new DateTimeException("can't parse time: " + tableSnapshot.getTime()); - } - return SnapshotUtil.snapshotIdAsOfTime(icebergTable, timestamp); - } + return IcebergUtils.getQuerySpecSnapshot(icebergTable, tableSnapshot); } return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index e68ab2476d92bd..edc54e230be5b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { protected final List topnFilterSortNodes = Lists.newArrayList(); protected TableSnapshot tableSnapshot; + protected List columns; // Save the id of backends which this scan node will be executed on. // This is also important for local shuffle logic. @@ -141,6 +142,13 @@ protected static TNetworkAddress addressToTNetworkAddress(String address) { return result; } + protected List getColumns() { + if (columns == null && desc.getTable() != null) { + columns = desc.getTable().getBaseSchema(); + } + return columns; + } + public TupleDescriptor getTupleDesc() { return desc; } @@ -233,7 +241,7 @@ public void computeColumnsFilter() { // for load scan node, table is null // partitionsInfo maybe null for other scan node, eg: ExternalScanNode... if (desc.getTable() != null) { - computeColumnsFilter(desc.getTable().getBaseSchema(), partitionsInfo); + computeColumnsFilter(getColumns(), partitionsInfo); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java index 251f4d8f6b6757..7408898ab74db6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -17,8 +17,16 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.common.AnalysisException; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Range; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -32,8 +40,10 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; public class IcebergExternalTableTest { @@ -136,4 +146,108 @@ public void testIsSupportedPartitionTable() { Assertions.assertTrue(spyTable.isValidRelatedTableCached()); Assertions.assertTrue(spyTable.validRelatedTableCache()); } + + @Test + public void testGetPartitionRange() throws AnalysisException { + Column c = new Column("ts", PrimitiveType.DATETIMEV2); + List partitionColumns = Lists.newArrayList(c); + + // Test null partition value + Range nullRange = IcebergUtils.getPartitionRange(null, "hour", partitionColumns); + Assertions.assertEquals("0000-01-01 00:00:00", + nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("0000-01-01 00:00:01", + nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); + + // Test hour transform. + Range hour = IcebergUtils.getPartitionRange("100", "hour", partitionColumns); + PartitionKey lowKey = hour.lowerEndpoint(); + PartitionKey upKey = hour.upperEndpoint(); + Assertions.assertEquals("1970-01-05 04:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-01-05 05:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test day transform. + Range day = IcebergUtils.getPartitionRange("100", "day", partitionColumns); + lowKey = day.lowerEndpoint(); + upKey = day.upperEndpoint(); + Assertions.assertEquals("1970-04-11 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-04-12 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test month transform. + Range month = IcebergUtils.getPartitionRange("100", "month", partitionColumns); + lowKey = month.lowerEndpoint(); + upKey = month.upperEndpoint(); + Assertions.assertEquals("1978-05-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1978-06-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test year transform. + Range year = IcebergUtils.getPartitionRange("100", "year", partitionColumns); + lowKey = year.lowerEndpoint(); + upKey = year.upperEndpoint(); + Assertions.assertEquals("2070-01-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("2071-01-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test unsupported transform + Exception exception = Assertions.assertThrows(RuntimeException.class, () -> { + IcebergUtils.getPartitionRange("100", "bucket", partitionColumns); + }); + Assertions.assertEquals("Unsupported transform bucket", exception.getMessage()); + } + + @Test + public void testSortRange() throws AnalysisException { + Column c = new Column("c", PrimitiveType.DATETIMEV2); + ArrayList columns = Lists.newArrayList(c); + PartitionItem nullRange = new RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns)); + PartitionItem year1970 = new RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns)); + PartitionItem year1971 = new RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns)); + PartitionItem month197002 = new RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns)); + PartitionItem month197103 = new RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns)); + PartitionItem month197204 = new RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns)); + PartitionItem day19700202 = new RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns)); + PartitionItem day19730101 = new RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns)); + Map map = Maps.newHashMap(); + map.put("nullRange", nullRange); + map.put("year1970", year1970); + map.put("year1971", year1971); + map.put("month197002", month197002); + map.put("month197103", month197103); + map.put("month197204", month197204); + map.put("day19700202", day19700202); + map.put("day19730101", day19730101); + List> entries = IcebergUtils.sortPartitionMap(map); + Assertions.assertEquals(8, entries.size()); + Assertions.assertEquals("nullRange", entries.get(0).getKey()); + Assertions.assertEquals("year1970", entries.get(1).getKey()); + Assertions.assertEquals("month197002", entries.get(2).getKey()); + Assertions.assertEquals("day19700202", entries.get(3).getKey()); + Assertions.assertEquals("year1971", entries.get(4).getKey()); + Assertions.assertEquals("month197103", entries.get(5).getKey()); + Assertions.assertEquals("month197204", entries.get(6).getKey()); + Assertions.assertEquals("day19730101", entries.get(7).getKey()); + + Map> stringSetMap = IcebergUtils.mergeOverlapPartitions(map); + Assertions.assertEquals(2, stringSetMap.size()); + Assertions.assertTrue(stringSetMap.containsKey("year1970")); + Assertions.assertTrue(stringSetMap.containsKey("year1971")); + + Set names1970 = stringSetMap.get("year1970"); + Assertions.assertEquals(3, names1970.size()); + Assertions.assertTrue(names1970.contains("year1970")); + Assertions.assertTrue(names1970.contains("month197002")); + Assertions.assertTrue(names1970.contains("day19700202")); + + Set names1971 = stringSetMap.get("year1971"); + Assertions.assertEquals(2, names1971.size()); + Assertions.assertTrue(names1971.contains("year1971")); + Assertions.assertTrue(names1971.contains("month197103")); + + Assertions.assertEquals(5, map.size()); + Assertions.assertTrue(map.containsKey("nullRange")); + Assertions.assertTrue(map.containsKey("year1970")); + Assertions.assertTrue(map.containsKey("year1971")); + Assertions.assertTrue(map.containsKey("month197204")); + Assertions.assertTrue(map.containsKey("day19730101")); + } } + diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out new file mode 100644 index 00000000000000..27e9359b71346c --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out @@ -0,0 +1,77 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q0 -- +c1 int Yes true \N +c3 int Yes true \N +c2 int Yes true \N +c4 int Yes true \N + +-- !q1 -- +1 \N \N \N +2 \N \N \N +4 6 \N \N +7 8 \N \N +9 10 11 \N + +-- !q2 -- +1 + +-- !q3 -- +1 \N +2 3 + +-- !q4 -- +1 \N \N +2 3 \N +4 5 6 + +-- !q5 -- +1 \N +2 \N +4 6 +7 8 + +-- !q6 -- +1 \N \N +2 \N \N +4 6 \N +7 8 \N +9 10 11 + +-- !q0 -- +c1 int Yes true \N +c3 int Yes true \N +c2 int Yes true \N +c4 int Yes true \N + +-- !q1 -- +1 \N \N \N +2 \N \N \N +4 6 \N \N +7 8 \N \N +9 10 11 \N + +-- !q2 -- +1 + +-- !q3 -- +1 \N +2 3 + +-- !q4 -- +1 \N \N +2 3 \N +4 5 6 + +-- !q5 -- +1 \N +2 \N +4 6 +7 8 + +-- !q6 -- +1 \N \N +2 \N \N +4 6 \N +7 8 \N +9 10 11 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy new file mode 100644 index 00000000000000..a376dfc210d41f --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + + +suite("iceberg_schema_change_with_timetravel", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_schema_change_with_timetravel" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + + def executeTimeTravelingQueries = { String tableName -> + def snapshots = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by committed_at; """ + def snapshotIds = [ + s0: snapshots.get(0)[0], + s1: snapshots.get(1)[0], + s2: snapshots.get(2)[0], + s3: snapshots.get(3)[0], + s4: snapshots.get(4)[0] + ] + + qt_q0 """ desc ${tableName} """ + qt_q1 """ select * from ${tableName} order by c1 """ + qt_q2 """ select * from ${tableName} for version as of ${snapshotIds.s0} order by c1 """ + qt_q3 """ select * from ${tableName} for version as of ${snapshotIds.s1} order by c1 """ + qt_q4 """ select * from ${tableName} for version as of ${snapshotIds.s2} order by c1 """ + qt_q5 """ select * from ${tableName} for version as of ${snapshotIds.s3} order by c1 """ + qt_q6 """ select * from ${tableName} for version as of ${snapshotIds.s4} order by c1 """ + } + + executeTimeTravelingQueries("schema_change_with_time_travel") + executeTimeTravelingQueries("schema_change_with_time_travel_orc") + +} + +/* +create table schema_change_with_time_travel (c1 int); +insert into schema_change_with_time_travel values (1); +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (2,3); +alter table schema_change_with_time_travel add column c3 int; +insert into schema_change_with_time_travel values (4,5,6); +alter table schema_change_with_time_travel drop column c2; +insert into schema_change_with_time_travel values (7,8); +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (9,10,11); +alter table schema_change_with_time_travel add column c4 int; +*/ +