diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index cde08113373aee..d7cbee18c74c7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -41,6 +41,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; @@ -432,13 +433,13 @@ private void refreshOnlyCatalogCache(boolean invalidCache) { } } - public final Optional getSchema(String dbName, String tblName) { + public final Optional getSchema(SchemaCacheKey key) { makeSureInitialized(); - Optional> db = getDb(dbName); + Optional> db = getDb(key.getDbName()); if (db.isPresent()) { - Optional table = db.get().getTable(tblName); + Optional table = db.get().getTable(key.getTblName()); if (table.isPresent()) { - return table.get().initSchemaAndUpdateTime(); + return table.get().initSchemaAndUpdateTime(key); } } return Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index cc40ad292ce182..24f55e74266863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -31,6 +31,8 @@ import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr; import org.apache.doris.datasource.metacache.MetaCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.nereids.exceptions.NotSupportedException; @@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr { private ExternalRowCountCache rowCountCache; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; + private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; public ExternalMetaCacheMgr() { rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -122,6 +125,7 @@ public ExternalMetaCacheMgr() { hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); + paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); } public ExecutorService getFileListingExecutor() { @@ -167,6 +171,10 @@ public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCacheMgr.getPaimonMetadataCache(); + } + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId); } @@ -189,6 +197,7 @@ public void removeCache(long catalogId) { hudiPartitionMgr.removePartitionProcessor(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); + paimonMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -204,6 +213,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); + paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); if (LOG.isDebugEnabled()) { LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -222,6 +232,7 @@ public void invalidateDbCache(long catalogId, String dbName) { hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); if (LOG.isDebugEnabled()) { LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -239,6 +250,7 @@ public void invalidateCatalogCache(long catalogId) { hudiPartitionMgr.cleanPartitionProcess(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); + paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a0558766e81400..de3eeff75d97fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -74,7 +74,7 @@ public Long getValue() { } private Optional loadSchema(SchemaCacheKey key) { - Optional schema = catalog.getSchema(key.dbName, key.tblName); + Optional schema = catalog.getSchema(key); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } @@ -83,6 +83,10 @@ private Optional loadSchema(SchemaCacheKey key) { public Optional getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + return getSchemaValue(key); + } + + public Optional getSchemaValue(SchemaCacheKey key) { return schemaCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 5451a219edfd3c..91df061678f154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -317,8 +318,12 @@ public Optional getColumnStatistic(String colName) { * * @return */ - public Optional initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { schemaUpdateTime = System.currentTimeMillis(); + return initSchema(key); + } + + public Optional initSchema(SchemaCacheKey key) { return initSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 7584b5b392feb5..da4670d6d0589d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; @@ -501,6 +502,10 @@ public Set getPartitionNames() { } @Override + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { + return initSchemaAndUpdateTime(); + } + public Optional initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java new file mode 100644 index 00000000000000..ffdaff770e21a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.qe.ConnectContext; + +import java.util.Optional; + +public class MvccUtil { + /** + * get Snapshot From StatementContext + * + * @param tableIf + * @return MvccSnapshot + */ + public static Optional getSnapshotFromContext(TableIf tableIf) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return Optional.empty(); + } + StatementContext statementContext = connectContext.getStatementContext(); + if (statementContext == null) { + return Optional.empty(); + } + return statementContext.getSnapshot(tableIf); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 7fe3c858448e3f..8ddfca886d9fe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -22,13 +22,16 @@ import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -45,25 +48,20 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.system.PartitionsTable; -import org.apache.paimon.table.system.SnapshotsTable; -import org.apache.paimon.types.ArrayType; +import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.RowType; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,12 +69,15 @@ import java.util.Set; import java.util.stream.Collectors; -public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { +public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); + private final Table paimonTable; + public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); + this.paimonTable = catalog.getPaimonTable(dbName, name); } public String getPaimonCatalogType() { @@ -90,176 +91,27 @@ protected synchronized void makeSureInitialized() { } } - public Table getPaimonTable() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); + public Table getPaimonTable(Optional snapshot) { + return paimonTable.copy( + Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), + String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - private PaimonPartitionInfo getPartitionInfoFromCache() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); + public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new PaimonSchemaCacheKey(dbName, name, schemaId)); if (!schemaCacheValue.isPresent()) { - return new PaimonPartitionInfo(); + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + return (PaimonSchemaCacheValue) schemaCacheValue.get(); } - private List getPartitionColumnsFromCache() { + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - return Lists.newArrayList(); - } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); - } - - public long getLatestSnapshotIdFromCache() throws AnalysisException { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("not present"); - } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId(); - } - - @Override - public Optional initSchema() { - Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - TableSchema schema = ((FileStoreTable) paimonTable).schema(); - List columns = schema.fields(); - List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - Set partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys()); - List partitionColumns = Lists.newArrayList(); - for (DataField field : columns) { - Column column = new Column(field.name().toLowerCase(), - paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id()); - tmpSchema.add(column); - if (partitionColumnNames.contains(field.name())) { - partitionColumns.add(column); - } - } - try { - // after 0.9.0 paimon will support table.getLatestSnapshotId() - long latestSnapshotId = loadLatestSnapshotId(); - PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns); - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId, - partitionInfo)); - } catch (IOException | AnalysisException e) { - LOG.warn(e); - return Optional.empty(); - } - } - - private long loadLatestSnapshotId() throws IOException { - Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, - name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); - // snapshotId - List rows = PaimonUtil.read(table, new int[][] {{0}}); - long latestSnapshotId = 0L; - for (InternalRow row : rows) { - long snapshotId = row.getLong(0); - if (snapshotId > latestSnapshotId) { - latestSnapshotId = snapshotId; - } - } - return latestSnapshotId; - } - - private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { - if (CollectionUtils.isEmpty(partitionColumns)) { - return new PaimonPartitionInfo(); - } - List paimonPartitions = loadPartitions(); - return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); - } - - private List loadPartitions() - throws IOException { - Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, - name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); - List rows = PaimonUtil.read(table, null); - List res = Lists.newArrayListWithCapacity(rows.size()); - for (InternalRow row : rows) { - res.add(PaimonUtil.rowToPartition(row)); - } - return res; - } - - private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { - int tsScale = 3; // default - switch (dataType.getTypeRoot()) { - case BOOLEAN: - return Type.BOOLEAN; - case INTEGER: - return Type.INT; - case BIGINT: - return Type.BIGINT; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case SMALLINT: - return Type.SMALLINT; - case TINYINT: - return Type.TINYINT; - case VARCHAR: - case BINARY: - case CHAR: - case VARBINARY: - return Type.STRING; - case DECIMAL: - DecimalType decimal = (DecimalType) dataType; - return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); - case DATE: - return ScalarType.createDateV2Type(); - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.TimestampType) { - tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case ARRAY: - ArrayType arrayType = (ArrayType) dataType; - Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); - return org.apache.doris.catalog.ArrayType.create(innerType, true); - case MAP: - MapType mapType = (MapType) dataType; - return new org.apache.doris.catalog.MapType( - paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); - case ROW: - RowType rowType = (RowType) dataType; - List fields = rowType.getFields(); - return new org.apache.doris.catalog.StructType(fields.stream() - .map(field -> new org.apache.doris.catalog.StructField(field.name(), - paimonTypeToDorisType(field.type()))) - .collect(Collectors.toCollection(ArrayList::new))); - case TIME_WITHOUT_TIME_ZONE: - return Type.UNSUPPORTED; - default: - LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); - return Type.UNSUPPORTED; - } - } - - protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { - return paimonPrimitiveTypeToDorisType(type); + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(catalog, dbName, name); } @Override @@ -289,13 +141,6 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { public long fetchRowCount() { makeSureInitialized(); long rowCount = 0; - Optional schemaCacheValue = getSchemaCacheValue(); - Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) - .orElse(null); - if (paimonTable == null) { - LOG.info("Paimon table {} is null.", name); - return UNKNOWN_ROW_COUNT; - } List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); @@ -314,30 +159,31 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { @Override public Map getAndCopyPartitionItems(Optional snapshot) { - return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + return Maps.newHashMap(getNameToPartitionItems(snapshot)); } @Override public PartitionType getPartitionType(Optional snapshot) { - return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override public Set getPartitionColumnNames(Optional snapshot) { - return getPartitionColumnsFromCache().stream() + return getPartitionColumns(snapshot).stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override public List getPartitionColumns(Optional snapshot) { - return getPartitionColumnsFromCache(); + return getPaimonSchemaCacheValue(snapshot).getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + PaimonPartition paimonPartition = getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition() + .get(partitionName); if (paimonPartition == null) { throw new AnalysisException("can not find partition: " + partitionName); } @@ -347,7 +193,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont @Override public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + PaimonSnapshotCacheValue paimonSnapshot = getOrFetchSnapshotCacheValue(snapshot); + return new MTMVVersionSnapshot(paimonSnapshot.getSnapshot().getSnapshotId()); } @Override @@ -359,4 +206,83 @@ public boolean isPartitionColumnAllowNull() { // The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data. return true; } + + @Override + public MvccSnapshot loadSnapshot() { + return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); + } + + @Override + public Map getNameToPartitionItems(Optional snapshot) { + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + } + + @Override + public boolean supportInternalPartitionPruned() { + return true; + } + + @Override + public List getFullSchema() { + return getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema(); + } + + @Override + public Optional initSchema(SchemaCacheKey key) { + makeSureInitialized(); + PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; + try { + PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey); + List columns = schema.getFields(); + List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + List partitionColumns = Lists.newArrayList(); + for (DataField field : columns) { + Column column = new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id()); + dorisColumns.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + return Optional.of(new PaimonSchemaCacheValue(dorisColumns, partitionColumns)); + } catch (Exception e) { + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), + paimonSchemaCacheKey.getSchemaId()); + } + + } + + private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), + name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + Predicate predicate = builder.equal(0, key.getSchemaId()); + // Adding predicates will also return excess data + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + for (InternalRow row : rows) { + PaimonSchema schema = PaimonUtil.rowToSchema(row); + if (schema.getSchemaId() == key.getSchemaId()) { + return schema; + } + } + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId()); + } + + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { + PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); + return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); + } + + private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getPaimonSnapshotCacheValue(); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java new file mode 100644 index 00000000000000..5b711e070667b7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalMetaCacheMgr; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCache { + + private final LoadingCache snapshotCache; + + public PaimonMetadataCache(ExecutorService executor) { + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); + } + + @NotNull + private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { + try { + PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonExternalTable table = (PaimonExternalTable) key.getCatalog().getDbOrAnalysisException(key.getDbName()) + .getTableOrAnalysisException(key.getTableName()); + List partitionColumns = table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId()) + .getPartitionColumns(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); + return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); + } catch (IOException | AnalysisException e) { + throw new CacheException("failed to loadSnapshot for: %s.%s.%s", + e, key.getCatalog().getName(), key.getDbName(), key.getTableName()); + } + } + + private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List partitionColumns) + throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List paimonPartitions = loadPartitions(key); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions(PaimonSnapshotCacheKey key) + throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); + } + return res; + } + + private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId and schemaId + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); + long latestSnapshotId = 0L; + long latestSchemaId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + latestSchemaId = row.getLong(1); + } + } + return new PaimonSnapshot(latestSnapshotId, latestSchemaId); + } + + public void invalidateCatalogCache(long catalogId) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId) + .forEach(snapshotCache::invalidate); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) + && key.getTableName().equals( + tblName)) + .forEach(snapshotCache::invalidate); + } + + public void invalidateDbCache(long catalogId, String dbName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) + .forEach(snapshotCache::invalidate); + } + + public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, dbName, tbName); + return snapshotCache.get(key); + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java new file mode 100644 index 00000000000000..a282fde665b197 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCacheMgr { + + private PaimonMetadataCache paimonMetadataCache; + + public PaimonMetadataCacheMgr(ExecutorService executor) { + this.paimonMetadataCache = new PaimonMetadataCache(executor); + } + + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCache; + } + + public void removeCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName); + } + + public void invalidateDbCache(long catalogId, String dbName) { + paimonMetadataCache.invalidateDbCache(catalogId, dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java new file mode 100644 index 00000000000000..2307e91adb3911 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +public class PaimonMvccSnapshot implements MvccSnapshot { + private final PaimonSnapshotCacheValue snapshotCacheValue; + + public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; + } + + public PaimonSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index 8f54f0834e481b..4d3326f8e48376 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -24,8 +24,8 @@ import java.util.Map; public class PaimonPartitionInfo { - private Map nameToPartitionItem; - private Map nameToPartition; + private final Map nameToPartitionItem; + private final Map nameToPartition; public PaimonPartitionInfo() { this.nameToPartitionItem = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java new file mode 100644 index 00000000000000..ef26e1ed20879d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.paimon.types.DataField; + +import java.util.List; + +public class PaimonSchema { + private final long schemaId; + private final List fields; + private final List partitionKeys; + + public PaimonSchema(long schemaId, List fields, List partitionKeys) { + this.schemaId = schemaId; + this.fields = fields; + this.partitionKeys = partitionKeys; + } + + public long getSchemaId() { + return schemaId; + } + + public List getFields() { + return fields; + } + + public List getPartitionKeys() { + return partitionKeys; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java new file mode 100644 index 00000000000000..f74555b369b380 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +public class PaimonSchemaCacheKey extends SchemaCacheKey { + private final long schemaId; + + public PaimonSchemaCacheKey(String dbName, String tableName, long schemaId) { + super(dbName, tableName); + this.schemaId = schemaId; + } + + public long getSchemaId() { + return schemaId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PaimonSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { + return false; + } + PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o; + return schemaId == that.schemaId; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), schemaId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index 20d27b2425df24..ccb530a3cbccc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -20,41 +20,18 @@ import org.apache.doris.catalog.Column; import org.apache.doris.datasource.SchemaCacheValue; -import org.apache.paimon.table.Table; - import java.util.List; public class PaimonSchemaCacheValue extends SchemaCacheValue { - private Table paimonTable; private List partitionColumns; - private PaimonPartitionInfo partitionInfo; - - private long snapshootId; - public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable, - long snapshootId, - PaimonPartitionInfo partitionInfo) { + public PaimonSchemaCacheValue(List schema, List partitionColumns) { super(schema); this.partitionColumns = partitionColumns; - this.paimonTable = paimonTable; - this.snapshootId = snapshootId; - this.partitionInfo = partitionInfo; - } - - public Table getPaimonTable() { - return paimonTable; } public List getPartitionColumns() { return partitionColumns; } - - public PaimonPartitionInfo getPartitionInfo() { - return partitionInfo; - } - - public long getSnapshootId() { - return snapshootId; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java new file mode 100644 index 00000000000000..4a536dd72cc901 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshot { + private final long snapshotId; + private final long schemaId; + + public PaimonSnapshot(long snapshotId, long schemaId) { + this.snapshotId = snapshotId; + this.schemaId = schemaId; + } + + public long getSnapshotId() { + return snapshotId; + } + + public long getSchemaId() { + return schemaId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java new file mode 100644 index 00000000000000..970f111a72133f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogIf; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PaimonSnapshotCacheKey { + private final CatalogIf catalog; + private final String dbName; + private final String tableName; + + public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; + this.dbName = dbName; + this.tableName = tableName; + } + + public CatalogIf getCatalog() { + return catalog; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; + return catalog.getId() == that.catalog.getId() + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(catalog.getId(), dbName, tableName); + } + + @Override + public String toString() { + return new StringJoiner(", ", PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]") + .add("catalog=" + catalog) + .add("dbName='" + dbName + "'") + .add("tableName='" + tableName + "'") + .toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java new file mode 100644 index 00000000000000..c50ecdabfde3df --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshotCacheValue { + + private final PaimonPartitionInfo partitionInfo; + private final PaimonSnapshot snapshot; + + public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, PaimonSnapshot snapshot) { + this.partitionInfo = partitionInfo; + this.snapshot = snapshot; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public PaimonSnapshot getSnapshot() { + return snapshot; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 8b7017cac29486..1f7576dca51d93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.hive.HiveUtil; @@ -30,12 +31,22 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; @@ -48,8 +59,11 @@ import javax.annotation.Nullable; public class PaimonUtil { + private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); + public static List read( - Table table, @Nullable int[][] projection, Pair, String>... dynamicOptions) + Table table, @Nullable int[][] projection, @Nullable Predicate predicate, + Pair, String>... dynamicOptions) throws IOException { Map options = new HashMap<>(); for (Pair, String> pair : dynamicOptions) { @@ -60,6 +74,9 @@ public static List read( if (projection != null) { readBuilder.withProjection(projection); } + if (predicate != null) { + readBuilder.withFilter(predicate); + } RecordReader reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); InternalRowSerializer serializer = @@ -152,4 +169,107 @@ public static ListPartitionItem toListPartitionItem(String partitionName, List 6) { + tsScale = 6; + } + } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case ARRAY: + ArrayType arrayType = (ArrayType) dataType; + Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); + return org.apache.doris.catalog.ArrayType.create(innerType, true); + case MAP: + MapType mapType = (MapType) dataType; + return new org.apache.doris.catalog.MapType( + paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); + case ROW: + RowType rowType = (RowType) dataType; + List fields = rowType.getFields(); + return new org.apache.doris.catalog.StructType(fields.stream() + .map(field -> new org.apache.doris.catalog.StructField(field.name(), + paimonTypeToDorisType(field.type()))) + .collect(Collectors.toCollection(ArrayList::new))); + case TIME_WITHOUT_TIME_ZONE: + return Type.UNSUPPORTED; + default: + LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); + return Type.UNSUPPORTED; + } + } + + public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { + return paimonPrimitiveTypeToDorisType(type); + } + + /** + * https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table + * demo: + * 0 + * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"}, + * {"id":1,"name":"item_id","type":"BIGINT"}, + * {"id":2,"name":"behavior","type":"STRING"}, + * {"id":3,"name":"dt","type":"STRING NOT NULL"}, + * {"id":4,"name":"hh","type":"STRING NOT NULL"}] + * ["dt"] + * ["dt","hh","user_id"] + * {"owner":"hadoop","provider":"paimon"} + * 2024-12-03 15:38:14.734 + * + * @param row + * @return + */ + public static PaimonSchema rowToSchema(InternalRow row) { + long schemaId = row.getLong(0); + String fieldsStr = row.getString(1).toString(); + String partitionKeysStr = row.getString(2).toString(); + List fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference>() { + }); + List partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference>() { + }); + return new PaimonSchema(schemaId, fields, partitionKeys); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 885eba06ed956d..a8bb814f1d353b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.thrift.TFileAttributes; @@ -36,7 +37,7 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - this.originTable = paimonExtTable.getPaimonTable(); + this.originTable = paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 68ed0cc9b23592..353e024ed97348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -29,6 +29,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; @@ -70,6 +73,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -141,6 +145,8 @@ public enum MTMVTaskRefreshMode { private StmtExecutor executor; private Map partitionSnapshots; + private final Map snapshots = Maps.newHashMap(); + public MTMVTask() { } @@ -218,6 +224,9 @@ private void exec(Set refreshPartitionNames, throws Exception { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); + for (Entry entry : snapshots.entrySet()) { + statementContext.setSnapshot(entry.getKey(), entry.getValue()); + } ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); @@ -305,6 +314,11 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException { MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf; baseTableIf.beforeMTMVRefresh(mtmv); } + if (tableIf instanceof MvccTable) { + MvccTable mvccTable = (MvccTable) tableIf; + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(); + snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 008a2c8ac70da2..f21600ac585763 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -555,7 +555,11 @@ public void loadSnapshots(Map, TableIf> tables) { } for (TableIf tableIf : tables.values()) { if (tableIf instanceof MvccTable) { - snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + // may be set by MTMV, we can not load again + if (!snapshots.containsKey(mvccTableInfo)) { + snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot()); + } } } } @@ -563,11 +567,25 @@ public void loadSnapshots(Map, TableIf> tables) { /** * Obtain snapshot information of mvcc * - * @param mvccTable mvccTable + * @param tableIf tableIf * @return MvccSnapshot */ - public MvccSnapshot getSnapshot(MvccTable mvccTable) { - return snapshots.get(new MvccTableInfo(mvccTable)); + public Optional getSnapshot(TableIf tableIf) { + if (!(tableIf instanceof MvccTable)) { + return Optional.empty(); + } + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + return Optional.ofNullable(snapshots.get(mvccTableInfo)); + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTableInfo mvccTableInfo + * @param snapshot snapshot + */ + public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot snapshot) { + snapshots.put(mvccTableInfo, snapshot); } private static class CloseableResource implements Closeable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index ba8b270d1f397d..e99906f5e13dc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,8 +73,8 @@ public Rule build() { private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); - // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -83,8 +82,8 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, Map scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) + List partitionSlots = externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 869c2d0b38bd4c..b5be72950cd47a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; @@ -315,10 +316,8 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, } if (targetTable instanceof ExternalTable) { // Add filter only when partition has data when external table - // TODO: 2024/12/4 real snapshot - partitionHasDataItems.add( - ((ExternalTable) targetTable).getNameToPartitionItems(Optional.empty()) - .get(partitionName)); + partitionHasDataItems.add(((ExternalTable) targetTable).getNameToPartitionItems( + MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName)); } } if (partitionHasDataItems.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 96b8e032d11274..1f5f71f7bafe59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.TableSample; @@ -60,10 +61,10 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali } public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, - Optional tableSample, Optional tableSnapshot) { - // todo: real snapshotId + Optional tableSample, Optional tableSnapshot) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)), + tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out new file mode 100644 index 00000000000000..63bda82c1db5bd --- /dev/null +++ b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !refresh_one_partition -- +a 10 + +-- !refresh_one_partition_rewrite -- +a 10 +b 10 + +-- !refresh_auto -- +a 10 +b 10 + +-- !refresh_all_partition_rewrite -- +a 10 +b 10 + diff --git a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy new file mode 100644 index 00000000000000..985443875c7b26 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_rewrite_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_rewrite_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT par,count(*) as num FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format group by par;"; + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """analyze table ${catalogName}.`test_paimon_spark`.test_tb_mix_format with sync""" + sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format modify column par set stats ('row_count'='20');""" + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mvSql} + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" +} +