From b5703b2ab4396acdea42711dcf6cb661cdfd2ee1 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Mon, 2 Dec 2024 20:05:55 +0800 Subject: [PATCH 01/16] 1 --- .../datasource/ExternalMetaCacheMgr.java | 12 ++ .../paimon/PaimonExternalTable.java | 133 ++++++---------- .../paimon/PaimonMetadataCache.java | 147 ++++++++++++++++++ .../paimon/PaimonMetadataCacheMgr.java | 49 ++++++ .../datasource/paimon/PaimonMvccSnapshot.java | 32 ++++ .../paimon/PaimonPartitionInfo.java | 4 +- .../paimon/PaimonSchemaCacheValue.java | 17 +- .../datasource/paimon/PaimonSnapshot.java | 36 +++++ .../paimon/PaimonSnapshotCacheKey.java | 75 +++++++++ .../paimon/PaimonSnapshotCacheValue.java | 37 +++++ .../paimon/source/PaimonSource.java | 7 +- .../doris/job/extensions/mtmv/MTMVTask.java | 14 ++ .../doris/nereids/StatementContext.java | 16 +- 13 files changed, 476 insertions(+), 103 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index cc40ad292ce182..24f55e74266863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -31,6 +31,8 @@ import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr; import org.apache.doris.datasource.metacache.MetaCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.nereids.exceptions.NotSupportedException; @@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr { private ExternalRowCountCache rowCountCache; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; + private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; public ExternalMetaCacheMgr() { rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -122,6 +125,7 @@ public ExternalMetaCacheMgr() { hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); + paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); } public ExecutorService getFileListingExecutor() { @@ -167,6 +171,10 @@ public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCacheMgr.getPaimonMetadataCache(); + } + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId); } @@ -189,6 +197,7 @@ public void removeCache(long catalogId) { hudiPartitionMgr.removePartitionProcessor(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); + paimonMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -204,6 +213,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); + paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); if (LOG.isDebugEnabled()) { LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -222,6 +232,7 @@ public void invalidateDbCache(long catalogId, String dbName) { hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); if (LOG.isDebugEnabled()) { LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -239,6 +250,7 @@ public void invalidateCatalogCache(long catalogId) { hudiPartitionMgr.cleanPartitionProcess(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); + paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 7fe3c858448e3f..adcf0d32617075 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -26,9 +26,11 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -45,25 +47,21 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.data.InternalRow; +import org.apache.paimon.CoreOptions; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.system.PartitionsTable; -import org.apache.paimon.table.system.SnapshotsTable; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; -import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,7 +69,7 @@ import java.util.Set; import java.util.stream.Collectors; -public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { +public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); @@ -90,37 +88,19 @@ protected synchronized void makeSureInitialized() { } } - public Table getPaimonTable() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); + public Table getPaimonTable(Optional snapshot) { + return getPaimonSchemaCacheValue().getPaimonTable().copy( + Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), + String.valueOf(getPaimonSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - private PaimonPartitionInfo getPartitionInfoFromCache() { + private PaimonSchemaCacheValue getPaimonSchemaCacheValue() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); if (!schemaCacheValue.isPresent()) { - return new PaimonPartitionInfo(); + throw new CacheException("SchemaCacheValue is empty: %s:%s:%s", null, catalog.getName(), dbName, getName()); } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); - } - - private List getPartitionColumnsFromCache() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - return Lists.newArrayList(); - } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); - } - - public long getLatestSnapshotIdFromCache() throws AnalysisException { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("not present"); - } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId(); + return (PaimonSchemaCacheValue) schemaCacheValue.get(); } @Override @@ -140,51 +120,7 @@ public Optional initSchema() { partitionColumns.add(column); } } - try { - // after 0.9.0 paimon will support table.getLatestSnapshotId() - long latestSnapshotId = loadLatestSnapshotId(); - PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns); - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId, - partitionInfo)); - } catch (IOException | AnalysisException e) { - LOG.warn(e); - return Optional.empty(); - } - } - - private long loadLatestSnapshotId() throws IOException { - Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, - name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); - // snapshotId - List rows = PaimonUtil.read(table, new int[][] {{0}}); - long latestSnapshotId = 0L; - for (InternalRow row : rows) { - long snapshotId = row.getLong(0); - if (snapshotId > latestSnapshotId) { - latestSnapshotId = snapshotId; - } - } - return latestSnapshotId; - } - - private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { - if (CollectionUtils.isEmpty(partitionColumns)) { - return new PaimonPartitionInfo(); - } - List paimonPartitions = loadPartitions(); - return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); - } - - private List loadPartitions() - throws IOException { - Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, - name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); - List rows = PaimonUtil.read(table, null); - List res = Lists.newArrayListWithCapacity(rows.size()); - for (InternalRow row : rows) { - res.add(PaimonUtil.rowToPartition(row)); - } - return res; + return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable)); } private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { @@ -314,30 +250,31 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { @Override public Map getAndCopyPartitionItems(Optional snapshot) { - return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + return Maps.newHashMap(getNameToPartitionItems(snapshot)); } @Override public PartitionType getPartitionType(Optional snapshot) { - return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override public Set getPartitionColumnNames(Optional snapshot) { - return getPartitionColumnsFromCache().stream() + return getPartitionColumns(snapshot).stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override public List getPartitionColumns(Optional snapshot) { - return getPartitionColumnsFromCache(); + return getPaimonSchemaCacheValue().getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + PaimonPartition paimonPartition = getPaimonSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition() + .get(partitionName); if (paimonPartition == null) { throw new AnalysisException("can not find partition: " + partitionName); } @@ -347,7 +284,21 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont @Override public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + PaimonSnapshotCacheValue paimonSnapshot = getPaimonSnapshotCacheValue(snapshot); + return new MTMVVersionSnapshot(paimonSnapshot.getSnapshot().getSnapshotId()); + } + + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getPaimonSnapshotCacheValue(); + } + } + + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(catalog, dbName, name); } @Override @@ -359,4 +310,20 @@ public boolean isPartitionColumnAllowNull() { // The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data. return true; } + + @Override + public MvccSnapshot loadSnapshot() { + return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); + } + + @Override + protected Map getNameToPartitionItems(Optional snapshot) { + return getPaimonSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + } + + @Override + public boolean supportInternalPartitionPruned() { + return true; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java new file mode 100644 index 00000000000000..27a60e2d50ab7b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalMetaCacheMgr; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCache { + + private final LoadingCache snapshotCache; + + public PaimonMetadataCache(ExecutorService executor) { + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); + } + + @NotNull + private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { + try { + PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonExternalTable paimonExternalTable + = (PaimonExternalTable) ((PaimonExternalCatalog) key.getCatalog()).getDbOrAnalysisException( + key.getDbName()) + .getTableOrAnalysisException( + key.getTableName()); + List partitionColumns = paimonExternalTable.getPartitionColumns(Optional.empty()); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); + return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); + } catch (IOException | AnalysisException e) { + throw new CacheException("failed to loadSnapshot for: %s.%s.%s", + e, key.getCatalog().getName(), key.getDbName(), key.getTableName()); + } + } + + private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List partitionColumns) + throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List paimonPartitions = loadPartitions(key); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions(PaimonSnapshotCacheKey key) + throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); + } + return res; + } + + private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}); + long latestSnapshotId = 0L; + long latestSchemaId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + latestSchemaId = row.getLong(1); + } + } + return new PaimonSnapshot(latestSnapshotId, latestSchemaId); + } + + public void invalidateCatalogCache(long catalogId) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId) + .forEach(snapshotCache::invalidate); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) + && key.getTableName().equals( + tblName)) + .forEach(snapshotCache::invalidate); + } + + public void invalidateDbCache(long catalogId, String dbName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) + .forEach(snapshotCache::invalidate); + } + + public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, dbName, tbName); + return snapshotCache.get(key); + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java new file mode 100644 index 00000000000000..a282fde665b197 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCacheMgr { + + private PaimonMetadataCache paimonMetadataCache; + + public PaimonMetadataCacheMgr(ExecutorService executor) { + this.paimonMetadataCache = new PaimonMetadataCache(executor); + } + + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCache; + } + + public void removeCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName); + } + + public void invalidateDbCache(long catalogId, String dbName) { + paimonMetadataCache.invalidateDbCache(catalogId, dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java new file mode 100644 index 00000000000000..2307e91adb3911 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +public class PaimonMvccSnapshot implements MvccSnapshot { + private final PaimonSnapshotCacheValue snapshotCacheValue; + + public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; + } + + public PaimonSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index 8f54f0834e481b..4d3326f8e48376 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -24,8 +24,8 @@ import java.util.Map; public class PaimonPartitionInfo { - private Map nameToPartitionItem; - private Map nameToPartition; + private final Map nameToPartitionItem; + private final Map nameToPartition; public PaimonPartitionInfo() { this.nameToPartitionItem = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index 20d27b2425df24..e3db6673662651 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -28,18 +28,11 @@ public class PaimonSchemaCacheValue extends SchemaCacheValue { private Table paimonTable; private List partitionColumns; - private PaimonPartitionInfo partitionInfo; - private long snapshootId; - - public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable, - long snapshootId, - PaimonPartitionInfo partitionInfo) { + public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable) { super(schema); this.partitionColumns = partitionColumns; this.paimonTable = paimonTable; - this.snapshootId = snapshootId; - this.partitionInfo = partitionInfo; } public Table getPaimonTable() { @@ -49,12 +42,4 @@ public Table getPaimonTable() { public List getPartitionColumns() { return partitionColumns; } - - public PaimonPartitionInfo getPartitionInfo() { - return partitionInfo; - } - - public long getSnapshootId() { - return snapshootId; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java new file mode 100644 index 00000000000000..4a536dd72cc901 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshot { + private final long snapshotId; + private final long schemaId; + + public PaimonSnapshot(long snapshotId, long schemaId) { + this.snapshotId = snapshotId; + this.schemaId = schemaId; + } + + public long getSnapshotId() { + return snapshotId; + } + + public long getSchemaId() { + return schemaId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java new file mode 100644 index 00000000000000..970f111a72133f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogIf; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PaimonSnapshotCacheKey { + private final CatalogIf catalog; + private final String dbName; + private final String tableName; + + public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; + this.dbName = dbName; + this.tableName = tableName; + } + + public CatalogIf getCatalog() { + return catalog; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; + return catalog.getId() == that.catalog.getId() + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(catalog.getId(), dbName, tableName); + } + + @Override + public String toString() { + return new StringJoiner(", ", PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]") + .add("catalog=" + catalog) + .add("dbName='" + dbName + "'") + .add("tableName='" + tableName + "'") + .toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java new file mode 100644 index 00000000000000..c50ecdabfde3df --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshotCacheValue { + + private final PaimonPartitionInfo partitionInfo; + private final PaimonSnapshot snapshot; + + public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, PaimonSnapshot snapshot) { + this.partitionInfo = partitionInfo; + this.snapshot = snapshot; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public PaimonSnapshot getSnapshot() { + return snapshot; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 885eba06ed956d..ee45b5c8ed1be9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,12 +21,16 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.paimon.table.Table; +import java.util.Optional; + public class PaimonSource { private final PaimonExternalTable paimonExtTable; @@ -36,7 +40,8 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - this.originTable = paimonExtTable.getPaimonTable(); + MvccSnapshot snapshot = ConnectContext.get().getStatementContext().getSnapshot(paimonExtTable); + this.originTable = paimonExtTable.getPaimonTable(snapshot == null ? Optional.empty() : Optional.of(snapshot)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 68ed0cc9b23592..353e024ed97348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -29,6 +29,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; @@ -70,6 +73,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -141,6 +145,8 @@ public enum MTMVTaskRefreshMode { private StmtExecutor executor; private Map partitionSnapshots; + private final Map snapshots = Maps.newHashMap(); + public MTMVTask() { } @@ -218,6 +224,9 @@ private void exec(Set refreshPartitionNames, throws Exception { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); + for (Entry entry : snapshots.entrySet()) { + statementContext.setSnapshot(entry.getKey(), entry.getValue()); + } ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); @@ -305,6 +314,11 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException { MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf; baseTableIf.beforeMTMVRefresh(mtmv); } + if (tableIf instanceof MvccTable) { + MvccTable mvccTable = (MvccTable) tableIf; + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(); + snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index b172f9dc591bd9..594beb54565556 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -526,7 +526,11 @@ public void loadSnapshots(Map, TableIf> tables) { } for (TableIf tableIf : tables.values()) { if (tableIf instanceof MvccTable) { - snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + // may be set by MTMV, we can not load again + if (!snapshots.containsKey(mvccTableInfo)) { + snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot()); + } } } } @@ -541,6 +545,16 @@ public MvccSnapshot getSnapshot(MvccTable mvccTable) { return snapshots.get(new MvccTableInfo(mvccTable)); } + /** + * Obtain snapshot information of mvcc + * + * @param mvccTableInfo mvccTableInfo + * @param snapshot snapshot + */ + public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot snapshot) { + snapshots.put(mvccTableInfo, snapshot); + } + private static class CloseableResource implements Closeable { public final String resourceName; public final String threadName; From 5674be52d389014dd23707c241df19c3d0f575cc Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 14:09:08 +0800 Subject: [PATCH 02/16] 1 --- .../doris/datasource/paimon/source/PaimonSource.java | 7 ++----- .../org/apache/doris/nereids/StatementContext.java | 12 ++++++++---- .../nereids/trees/plans/logical/LogicalFileScan.java | 7 ++++--- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index ee45b5c8ed1be9..9cf73572f98dd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.qe.ConnectContext; @@ -29,8 +28,6 @@ import org.apache.paimon.table.Table; -import java.util.Optional; - public class PaimonSource { private final PaimonExternalTable paimonExtTable; @@ -40,8 +37,8 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - MvccSnapshot snapshot = ConnectContext.get().getStatementContext().getSnapshot(paimonExtTable); - this.originTable = paimonExtTable.getPaimonTable(snapshot == null ? Optional.empty() : Optional.of(snapshot)); + this.originTable = paimonExtTable.getPaimonTable( + ConnectContext.get().getStatementContext().getSnapshot(paimonExtTable)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 594beb54565556..737ed0f5ec4ba0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -538,11 +538,15 @@ public void loadSnapshots(Map, TableIf> tables) { /** * Obtain snapshot information of mvcc * - * @param mvccTable mvccTable - * @return MvccSnapshot + * @param tableIf tableIf + * @return Optional */ - public MvccSnapshot getSnapshot(MvccTable mvccTable) { - return snapshots.get(new MvccTableInfo(mvccTable)); + public Optional getSnapshot(TableIf tableIf) { + if (!(tableIf instanceof MvccTable)) { + return Optional.empty(); + } + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + return snapshots.containsKey(mvccTableInfo) ? Optional.of(snapshots.get(mvccTableInfo)) : Optional.empty(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 96b8e032d11274..670a83b9e157e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -60,10 +61,10 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali } public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, - Optional tableSample, Optional tableSnapshot) { - // todo: real snapshotId + Optional tableSample, Optional tableSnapshot) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(ConnectContext.get().getStatementContext().getSnapshot(table)), + tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { From 6ad30e346cbd3a493f47cbcd1471037f277e1d88 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 14:12:49 +0800 Subject: [PATCH 03/16] 1 --- .../main/java/org/apache/doris/nereids/StatementContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 737ed0f5ec4ba0..91291b0cc85938 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -539,7 +539,7 @@ public void loadSnapshots(Map, TableIf> tables) { * Obtain snapshot information of mvcc * * @param tableIf tableIf - * @return Optional + * @return MvccSnapshot */ public Optional getSnapshot(TableIf tableIf) { if (!(tableIf instanceof MvccTable)) { From a8617f2b7ea543c17f32cda9cc8dee127bbf238d Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 14:32:33 +0800 Subject: [PATCH 04/16] 1 --- .../nereids/rules/rewrite/PruneFileScanPartition.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index ba8b270d1f397d..dca0d4a6ecb429 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -36,7 +37,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,8 +74,8 @@ public Rule build() { private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); - // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns( + ConnectContext.get().getStatementContext().getSnapshot(externalTable)))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -83,8 +83,8 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, Map scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) + List partitionSlots = externalTable.getPartitionColumns( + ConnectContext.get().getStatementContext().getSnapshot(externalTable)) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); From f28e835600e9669a41fc9edae71e3b89ba383fce Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 18:38:54 +0800 Subject: [PATCH 05/16] 1 --- .../paimon/PaimonExternalTable.java | 175 ++++-------------- .../paimon/PaimonMetadataCache.java | 79 +++++++- .../doris/datasource/paimon/PaimonSchema.java | 46 +++++ .../paimon/PaimonSchemaCacheKey.java | 83 +++++++++ .../paimon/PaimonSchemaCacheValue.java | 10 +- .../doris/datasource/paimon/PaimonUtil.java | 130 ++++++++++++- 6 files changed, 368 insertions(+), 155 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index adcf0d32617075..37e6da24354730 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -22,13 +22,9 @@ import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccTable; import org.apache.doris.mtmv.MTMVBaseTableIf; @@ -37,6 +33,7 @@ import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.mtmv.MTMVVersionSnapshot; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -44,23 +41,13 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.CoreOptions; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.types.ArrayType; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.RowType; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -73,8 +60,11 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); + private final Table paimonTable; + public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); + this.paimonTable = catalog.getPaimonTable(dbName, name); } public String getPaimonCatalogType() { @@ -89,113 +79,21 @@ protected synchronized void makeSureInitialized() { } public Table getPaimonTable(Optional snapshot) { - return getPaimonSchemaCacheValue().getPaimonTable().copy( + return paimonTable.copy( Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), - String.valueOf(getPaimonSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); + String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - private PaimonSchemaCacheValue getPaimonSchemaCacheValue() { + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - throw new CacheException("SchemaCacheValue is empty: %s:%s:%s", null, catalog.getName(), dbName, getName()); - } - return (PaimonSchemaCacheValue) schemaCacheValue.get(); - } - - @Override - public Optional initSchema() { - Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - TableSchema schema = ((FileStoreTable) paimonTable).schema(); - List columns = schema.fields(); - List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - Set partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys()); - List partitionColumns = Lists.newArrayList(); - for (DataField field : columns) { - Column column = new Column(field.name().toLowerCase(), - paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id()); - tmpSchema.add(column); - if (partitionColumnNames.contains(field.name())) { - partitionColumns.add(column); - } - } - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable)); - } - - private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { - int tsScale = 3; // default - switch (dataType.getTypeRoot()) { - case BOOLEAN: - return Type.BOOLEAN; - case INTEGER: - return Type.INT; - case BIGINT: - return Type.BIGINT; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case SMALLINT: - return Type.SMALLINT; - case TINYINT: - return Type.TINYINT; - case VARCHAR: - case BINARY: - case CHAR: - case VARBINARY: - return Type.STRING; - case DECIMAL: - DecimalType decimal = (DecimalType) dataType; - return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); - case DATE: - return ScalarType.createDateV2Type(); - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.TimestampType) { - tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case ARRAY: - ArrayType arrayType = (ArrayType) dataType; - Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); - return org.apache.doris.catalog.ArrayType.create(innerType, true); - case MAP: - MapType mapType = (MapType) dataType; - return new org.apache.doris.catalog.MapType( - paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); - case ROW: - RowType rowType = (RowType) dataType; - List fields = rowType.getFields(); - return new org.apache.doris.catalog.StructType(fields.stream() - .map(field -> new org.apache.doris.catalog.StructField(field.name(), - paimonTypeToDorisType(field.type()))) - .collect(Collectors.toCollection(ArrayList::new))); - case TIME_WITHOUT_TIME_ZONE: - return Type.UNSUPPORTED; - default: - LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); - return Type.UNSUPPORTED; - } + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSchema(catalog, dbName, name, schemaId); } - protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { - return paimonPrimitiveTypeToDorisType(type); + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { + makeSureInitialized(); + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(catalog, dbName, name); } @Override @@ -225,13 +123,6 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { public long fetchRowCount() { makeSureInitialized(); long rowCount = 0; - Optional schemaCacheValue = getSchemaCacheValue(); - Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) - .orElse(null); - if (paimonTable == null) { - LOG.info("Paimon table {} is null.", name); - return UNKNOWN_ROW_COUNT; - } List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); @@ -266,14 +157,14 @@ public Set getPartitionColumnNames(Optional snapshot) { @Override public List getPartitionColumns(Optional snapshot) { - return getPaimonSchemaCacheValue().getPartitionColumns(); + return getPaimonSchemaCacheValue(snapshot).getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - PaimonPartition paimonPartition = getPaimonSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition() + PaimonPartition paimonPartition = getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition() .get(partitionName); if (paimonPartition == null) { throw new AnalysisException("can not find partition: " + partitionName); @@ -284,23 +175,10 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont @Override public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - PaimonSnapshotCacheValue paimonSnapshot = getPaimonSnapshotCacheValue(snapshot); + PaimonSnapshotCacheValue paimonSnapshot = getOrFetchSnapshotCacheValue(snapshot); return new MTMVVersionSnapshot(paimonSnapshot.getSnapshot().getSnapshotId()); } - private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional snapshot) { - if (snapshot.isPresent()) { - return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); - } else { - return getPaimonSnapshotCacheValue(); - } - } - - private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { - return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSnapshot(catalog, dbName, name); - } - @Override public boolean isPartitionColumnAllowNull() { // Paimon will write to the 'null' partition regardless of whether it is' null or 'null'. @@ -318,7 +196,7 @@ public MvccSnapshot loadSnapshot() { @Override protected Map getNameToPartitionItems(Optional snapshot) { - return getPaimonSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); } @Override @@ -326,4 +204,23 @@ public boolean supportInternalPartitionPruned() { return true; } + @Override + public List getFullSchema() { + Optional snapshot = ConnectContext.get().getStatementContext().getSnapshot(this); + return getPaimonSchemaCacheValue(snapshot).getSchema(); + } + + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { + PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); + return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); + } + + private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getPaimonSnapshotCacheValue(); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 27a60e2d50ab7b..fa31a518b32927 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -28,12 +28,17 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.table.system.SnapshotsTable; +import org.apache.paimon.types.DataField; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -41,11 +46,13 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import java.util.concurrent.ExecutorService; public class PaimonMetadataCache { private final LoadingCache snapshotCache; + private final LoadingCache schemaCache; public PaimonMetadataCache(ExecutorService executor) { CacheFactory snapshotCacheFactory = new CacheFactory( @@ -55,6 +62,53 @@ public PaimonMetadataCache(ExecutorService executor) { true, null); this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); + + CacheFactory schemaCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.schemaCache = schemaCacheFactory.buildCache(key -> loadSchema(key), null, executor); + } + + @NotNull + private PaimonSchemaCacheValue loadSchema(PaimonSchemaCacheKey key) { + try { + PaimonSchema schema = loadPaimonSchemaBySchemaId(key); + List columns = schema.getFields(); + List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + List partitionColumns = Lists.newArrayList(); + for (DataField field : columns) { + Column column = new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id()); + dorisColumns.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + return new PaimonSchemaCacheValue(dorisColumns, partitionColumns); + } catch (IOException e) { + throw new CacheException("failed to loadSchema for: %s.%s.%s.%s", + e, key.getCatalog().getName(), key.getDbName(), key.getTableName(), key.getSchemaId()); + } + } + + private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + Predicate predicate = builder.equal(0, key.getSchemaId()); + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + if (rows.size() != 1) { + throw new CacheException("failed to loadPaimonSchemaBySchemaId for: %s.%s.%s.%s", + null, key.getCatalog().getName(), key.getDbName(), key.getTableName(), key.getSchemaId()); + } + InternalRow internalRow = rows.get(0); + return PaimonUtil.rowToSchema(internalRow); + } @NotNull @@ -88,7 +142,7 @@ private List loadPartitions(PaimonSnapshotCacheKey key) throws IOException { Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); - List rows = PaimonUtil.read(table, null); + List rows = PaimonUtil.read(table, null, null); List res = Lists.newArrayListWithCapacity(rows.size()); for (InternalRow row : rows) { res.add(PaimonUtil.rowToPartition(row)); @@ -100,7 +154,7 @@ private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOE Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); // snapshotId - List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}); + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); long latestSnapshotId = 0L; long latestSchemaId = 0L; for (InternalRow row : rows) { @@ -117,6 +171,10 @@ public void invalidateCatalogCache(long catalogId) { snapshotCache.asMap().keySet().stream() .filter(key -> key.getCatalog().getId() == catalogId) .forEach(snapshotCache::invalidate); + + schemaCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId) + .forEach(schemaCache::invalidate); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -125,12 +183,22 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) && key.getTableName().equals( tblName)) .forEach(snapshotCache::invalidate); + + schemaCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) + && key.getTableName().equals( + tblName)) + .forEach(schemaCache::invalidate); } public void invalidateDbCache(long catalogId, String dbName) { snapshotCache.asMap().keySet().stream() .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) .forEach(snapshotCache::invalidate); + + schemaCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) + .forEach(schemaCache::invalidate); } public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { @@ -138,10 +206,17 @@ public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbNa return snapshotCache.get(key); } + public PaimonSchemaCacheValue getPaimonSchema(CatalogIf catalog, String dbName, String tbName, long schemaId) { + PaimonSchemaCacheKey key = new PaimonSchemaCacheKey(catalog, dbName, tbName, schemaId); + return schemaCache.get(key); + } + public Map> getCacheStats() { Map> res = Maps.newHashMap(); res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), snapshotCache.estimatedSize())); + res.put("paimon_schema_cache", ExternalMetaCacheMgr.getCacheStats(schemaCache.stats(), + schemaCache.estimatedSize())); return res; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java new file mode 100644 index 00000000000000..ef26e1ed20879d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.paimon.types.DataField; + +import java.util.List; + +public class PaimonSchema { + private final long schemaId; + private final List fields; + private final List partitionKeys; + + public PaimonSchema(long schemaId, List fields, List partitionKeys) { + this.schemaId = schemaId; + this.fields = fields; + this.partitionKeys = partitionKeys; + } + + public long getSchemaId() { + return schemaId; + } + + public List getFields() { + return fields; + } + + public List getPartitionKeys() { + return partitionKeys; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java new file mode 100644 index 00000000000000..fd71d94f1c2988 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogIf; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PaimonSchemaCacheKey { + private final CatalogIf catalog; + private final String dbName; + private final String tableName; + private final long schemaId; + + public PaimonSchemaCacheKey(CatalogIf catalog, String dbName, String tableName, long schemaId) { + this.catalog = catalog; + this.dbName = dbName; + this.tableName = tableName; + this.schemaId = schemaId; + } + + public CatalogIf getCatalog() { + return catalog; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public long getSchemaId() { + return schemaId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o; + return catalog.getId() == that.catalog.getId() + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(schemaId, that.schemaId); + } + + @Override + public int hashCode() { + return Objects.hash(catalog.getId(), dbName, tableName, schemaId); + } + + @Override + public String toString() { + return new StringJoiner(", ", PaimonSchemaCacheKey.class.getSimpleName() + "[", "]") + .add("catalog=" + catalog) + .add("dbName='" + dbName + "'") + .add("tableName='" + tableName + "'") + .add("schemaId='" + schemaId + "'") + .toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index e3db6673662651..ccb530a3cbccc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -20,23 +20,15 @@ import org.apache.doris.catalog.Column; import org.apache.doris.datasource.SchemaCacheValue; -import org.apache.paimon.table.Table; - import java.util.List; public class PaimonSchemaCacheValue extends SchemaCacheValue { - private Table paimonTable; private List partitionColumns; - public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable) { + public PaimonSchemaCacheValue(List schema, List partitionColumns) { super(schema); this.partitionColumns = partitionColumns; - this.paimonTable = paimonTable; - } - - public Table getPaimonTable() { - return paimonTable; } public List getPartitionColumns() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 8b7017cac29486..95ae27d3e4fdd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.hive.HiveUtil; @@ -30,12 +31,22 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; @@ -48,8 +59,11 @@ import javax.annotation.Nullable; public class PaimonUtil { + private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); + public static List read( - Table table, @Nullable int[][] projection, Pair, String>... dynamicOptions) + Table table, @Nullable int[][] projection, @Nullable Predicate predicate, + Pair, String>... dynamicOptions) throws IOException { Map options = new HashMap<>(); for (Pair, String> pair : dynamicOptions) { @@ -60,6 +74,9 @@ public static List read( if (projection != null) { readBuilder.withProjection(projection); } + if (predicate != null) { + readBuilder.withFilter(predicate); + } RecordReader reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); InternalRowSerializer serializer = @@ -117,15 +134,15 @@ public static PaimonPartitionInfo generatePartitionInfo(List partitionCo private static String getPartitionName(List partitionColumns, String partitionValueStr) { Preconditions.checkNotNull(partitionValueStr); - String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") - .split(","); - Preconditions.checkState(partitionColumns.size() == partitionValues.length); + List partitionValues = JsonSerdeUtil.fromJson(partitionValueStr, new TypeReference>() { + }); + Preconditions.checkState(partitionColumns.size() == partitionValues.size()); StringBuilder sb = new StringBuilder(); for (int i = 0; i < partitionColumns.size(); ++i) { if (i != 0) { sb.append("/"); } - sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues.get(i)); } return sb.toString(); } @@ -152,4 +169,107 @@ public static ListPartitionItem toListPartitionItem(String partitionName, List 6) { + tsScale = 6; + } + } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case ARRAY: + ArrayType arrayType = (ArrayType) dataType; + Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); + return org.apache.doris.catalog.ArrayType.create(innerType, true); + case MAP: + MapType mapType = (MapType) dataType; + return new org.apache.doris.catalog.MapType( + paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); + case ROW: + RowType rowType = (RowType) dataType; + List fields = rowType.getFields(); + return new org.apache.doris.catalog.StructType(fields.stream() + .map(field -> new org.apache.doris.catalog.StructField(field.name(), + paimonTypeToDorisType(field.type()))) + .collect(Collectors.toCollection(ArrayList::new))); + case TIME_WITHOUT_TIME_ZONE: + return Type.UNSUPPORTED; + default: + LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); + return Type.UNSUPPORTED; + } + } + + public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { + return paimonPrimitiveTypeToDorisType(type); + } + + /** + * https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table + * demo: + * 0 + * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"}, + * {"id":1,"name":"item_id","type":"BIGINT"}, + * {"id":2,"name":"behavior","type":"STRING"}, + * {"id":3,"name":"dt","type":"STRING NOT NULL"}, + * {"id":4,"name":"hh","type":"STRING NOT NULL"}] + * ["dt"] + * ["dt","hh","user_id"] + * {"owner":"hadoop","provider":"paimon"} + * 2024-12-03 15:38:14.734 + * + * @param row + * @return + */ + public static PaimonSchema rowToSchema(InternalRow row) { + long schemaId = row.getLong(0); + String fieldsStr = row.getString(1).toString(); + String partitionKeysStr = row.getString(2).toString(); + List fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference>() { + }); + List partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference>() { + }); + return new PaimonSchema(schemaId, fields, partitionKeys); + } } From 3912664915c818c1a7a89c9a21ed7aa240353cf8 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 19:04:11 +0800 Subject: [PATCH 06/16] 1 --- .../doris/datasource/paimon/PaimonMetadataCache.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index fa31a518b32927..0ab95866d9114e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -44,7 +44,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -115,12 +114,8 @@ private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { try { PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); - PaimonExternalTable paimonExternalTable - = (PaimonExternalTable) ((PaimonExternalCatalog) key.getCatalog()).getDbOrAnalysisException( - key.getDbName()) - .getTableOrAnalysisException( - key.getTableName()); - List partitionColumns = paimonExternalTable.getPartitionColumns(Optional.empty()); + List partitionColumns = getPaimonSchema(key.getCatalog(), key.getDbName(), key.getTableName(), + latestSnapshot.getSchemaId()).getPartitionColumns(); PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); } catch (IOException | AnalysisException e) { From e45b91870061643148744b737c8386d16d3c009c Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 19:19:54 +0800 Subject: [PATCH 07/16] 1 --- .../datasource/paimon/PaimonMetadataCache.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 0ab95866d9114e..f25aa339d7e993 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -100,14 +100,16 @@ private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); PredicateBuilder builder = new PredicateBuilder(table.rowType()); Predicate predicate = builder.equal(0, key.getSchemaId()); + // Adding predicates will also return excess data List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); - if (rows.size() != 1) { - throw new CacheException("failed to loadPaimonSchemaBySchemaId for: %s.%s.%s.%s", - null, key.getCatalog().getName(), key.getDbName(), key.getTableName(), key.getSchemaId()); + for (InternalRow row : rows) { + PaimonSchema schema = PaimonUtil.rowToSchema(row); + if (schema.getSchemaId() == key.getSchemaId()) { + return schema; + } } - InternalRow internalRow = rows.get(0); - return PaimonUtil.rowToSchema(internalRow); - + throw new CacheException("failed to loadSchema for: %s.%s.%s.%s", + null, key.getCatalog().getName(), key.getDbName(), key.getTableName(), key.getSchemaId()); } @NotNull From 54af29e11900b05ea6c19ef9de41599eee3e7cea Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 19:27:34 +0800 Subject: [PATCH 08/16] 1 --- .../org/apache/doris/datasource/paimon/PaimonUtil.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 95ae27d3e4fdd8..1f7576dca51d93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -134,15 +134,15 @@ public static PaimonPartitionInfo generatePartitionInfo(List partitionCo private static String getPartitionName(List partitionColumns, String partitionValueStr) { Preconditions.checkNotNull(partitionValueStr); - List partitionValues = JsonSerdeUtil.fromJson(partitionValueStr, new TypeReference>() { - }); - Preconditions.checkState(partitionColumns.size() == partitionValues.size()); + String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") + .split(","); + Preconditions.checkState(partitionColumns.size() == partitionValues.length); StringBuilder sb = new StringBuilder(); for (int i = 0; i < partitionColumns.size(); ++i) { if (i != 0) { sb.append("/"); } - sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues.get(i)); + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); } return sb.toString(); } From a9aca03ba60549199b67638c1d5f0b7cdc611977 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 19:35:24 +0800 Subject: [PATCH 09/16] 1 --- .../apache/doris/datasource/paimon/PaimonExternalTable.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 37e6da24354730..a877052886b641 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -206,7 +206,10 @@ public boolean supportInternalPartitionPruned() { @Override public List getFullSchema() { - Optional snapshot = ConnectContext.get().getStatementContext().getSnapshot(this); + Optional snapshot = Optional.empty(); + if (ConnectContext.get() != null) { + snapshot = ConnectContext.get().getStatementContext().getSnapshot(this); + } return getPaimonSchemaCacheValue(snapshot).getSchema(); } From 027609485cad47c91b79b0bd6a04cc6fc8a4fefa Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 3 Dec 2024 20:10:28 +0800 Subject: [PATCH 10/16] 1 --- .../org/apache/doris/datasource/paimon/PaimonMetadataCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index f25aa339d7e993..1c57512461e124 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -150,7 +150,7 @@ private List loadPartitions(PaimonSnapshotCacheKey key) private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); - // snapshotId + // snapshotId and schemaId List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); long latestSnapshotId = 0L; long latestSchemaId = 0L; From 17628ee6990aa95afd8d260284fb4a8cf6f0a247 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 5 Dec 2024 15:07:45 +0800 Subject: [PATCH 11/16] resolve comment --- .../doris/datasource/mvcc/MvccUtil.java | 44 +++++++++++++++++++ .../paimon/PaimonExternalTable.java | 10 ++--- .../paimon/source/PaimonSource.java | 5 +-- .../doris/nereids/StatementContext.java | 2 +- .../rules/rewrite/PruneFileScanPartition.java | 5 +-- .../commands/UpdateMvByPartitionCommand.java | 7 ++- .../trees/plans/logical/LogicalFileScan.java | 4 +- 7 files changed, 57 insertions(+), 20 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java new file mode 100644 index 00000000000000..ffdaff770e21a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.qe.ConnectContext; + +import java.util.Optional; + +public class MvccUtil { + /** + * get Snapshot From StatementContext + * + * @param tableIf + * @return MvccSnapshot + */ + public static Optional getSnapshotFromContext(TableIf tableIf) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return Optional.empty(); + } + StatementContext statementContext = connectContext.getStatementContext(); + if (statementContext == null) { + return Optional.empty(); + } + return statementContext.getSnapshot(tableIf); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index a877052886b641..2a65588eeac5da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -27,13 +27,13 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.mtmv.MTMVVersionSnapshot; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -195,7 +195,7 @@ public MvccSnapshot loadSnapshot() { } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); } @@ -206,11 +206,7 @@ public boolean supportInternalPartitionPruned() { @Override public List getFullSchema() { - Optional snapshot = Optional.empty(); - if (ConnectContext.get() != null) { - snapshot = ConnectContext.get().getStatementContext().getSnapshot(this); - } - return getPaimonSchemaCacheValue(snapshot).getSchema(); + return getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema(); } private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 9cf73572f98dd9..a8bb814f1d353b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,9 +21,9 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.paimon.table.Table; @@ -37,8 +37,7 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - this.originTable = paimonExtTable.getPaimonTable( - ConnectContext.get().getStatementContext().getSnapshot(paimonExtTable)); + this.originTable = paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 36f1bfc7675a25..f21600ac585763 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -575,7 +575,7 @@ public Optional getSnapshot(TableIf tableIf) { return Optional.empty(); } MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); - return snapshots.containsKey(mvccTableInfo) ? Optional.of(snapshots.get(mvccTableInfo)) : Optional.empty(); + return Optional.ofNullable(snapshots.get(mvccTableInfo)); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index dca0d4a6ecb429..e99906f5e13dc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; -import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -75,7 +74,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); if (CollectionUtils.isEmpty(externalTable.getPartitionColumns( - ConnectContext.get().getStatementContext().getSnapshot(externalTable)))) { + ctx.getStatementContext().getSnapshot(externalTable)))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -84,7 +83,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); List partitionSlots = externalTable.getPartitionColumns( - ConnectContext.get().getStatementContext().getSnapshot(externalTable)) + ctx.getStatementContext().getSnapshot(externalTable)) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 869c2d0b38bd4c..b5be72950cd47a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; @@ -315,10 +316,8 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, } if (targetTable instanceof ExternalTable) { // Add filter only when partition has data when external table - // TODO: 2024/12/4 real snapshot - partitionHasDataItems.add( - ((ExternalTable) targetTable).getNameToPartitionItems(Optional.empty()) - .get(partitionName)); + partitionHasDataItems.add(((ExternalTable) targetTable).getNameToPartitionItems( + MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName)); } } if (partitionHasDataItems.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 670a83b9e157e6..1f5f71f7bafe59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.TableSample; @@ -28,7 +29,6 @@ import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; -import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -63,7 +63,7 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, Optional tableSample, Optional tableSnapshot) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(ConnectContext.get().getStatementContext().getSnapshot(table)), + table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)), tableSample, tableSnapshot); } From cd7ebaa7ab0b94e625314f0663f6225df9d301ba Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 5 Dec 2024 15:19:01 +0800 Subject: [PATCH 12/16] add case --- .../mtmv_p0/test_paimon_rewrite_mtmv.groovy | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy diff --git a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy new file mode 100644 index 00000000000000..f546cf230822ea --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_rewrite_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_rewrite_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT par,count(*) as num FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format group by par;"; + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """analyze table ${catalogName}.`test_paimon_spark`.test_tb_mix_format with sync""" + sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format modify column col set stats ('row_count'='20');""" + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mvSql} + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" +} + From a7faa916da13e217f786f69469c5abb5d601919a Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 5 Dec 2024 16:09:03 +0800 Subject: [PATCH 13/16] add case --- regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy index f546cf230822ea..985443875c7b26 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy @@ -43,7 +43,7 @@ suite("test_paimon_rewrite_mtmv", "p0,external,mtmv,external_docker,external_doc );""" sql """analyze table ${catalogName}.`test_paimon_spark`.test_tb_mix_format with sync""" - sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format modify column col set stats ('row_count'='20');""" + sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format modify column par set stats ('row_count'='20');""" sql """drop materialized view if exists ${mvName};""" From 5e5211e47fd68c4437c10d1cf5162f921e8a82b9 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 5 Dec 2024 16:12:41 +0800 Subject: [PATCH 14/16] case --- .../data/mtmv_p0/test_paimon_rewrite_mtmv.out | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out diff --git a/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out new file mode 100644 index 00000000000000..63bda82c1db5bd --- /dev/null +++ b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !refresh_one_partition -- +a 10 + +-- !refresh_one_partition_rewrite -- +a 10 +b 10 + +-- !refresh_auto -- +a 10 +b 10 + +-- !refresh_all_partition_rewrite -- +a 10 +b 10 + From 0a1238835e93b0dddbea48ffadbd6d4c9790b24f Mon Sep 17 00:00:00 2001 From: zhangdong Date: Mon, 9 Dec 2024 17:18:31 +0800 Subject: [PATCH 15/16] move paimon schema cache to ExternalSchemaCache --- .../doris/datasource/ExternalCatalog.java | 9 +- .../doris/datasource/ExternalSchemaCache.java | 6 +- .../doris/datasource/ExternalTable.java | 7 +- .../datasource/hive/HMSExternalTable.java | 3 +- .../paimon/PaimonExternalTable.java | 71 +++++++++++++++- .../paimon/PaimonMetadataCache.java | 83 +------------------ .../paimon/PaimonSchemaCacheKey.java | 50 +++-------- .../doris/external/hms/HmsCatalogTest.java | 2 +- .../apache/doris/qe/HmsQueryCacheTest.java | 8 +- 9 files changed, 105 insertions(+), 134 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index cde08113373aee..d7cbee18c74c7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -41,6 +41,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; @@ -432,13 +433,13 @@ private void refreshOnlyCatalogCache(boolean invalidCache) { } } - public final Optional getSchema(String dbName, String tblName) { + public final Optional getSchema(SchemaCacheKey key) { makeSureInitialized(); - Optional> db = getDb(dbName); + Optional> db = getDb(key.getDbName()); if (db.isPresent()) { - Optional table = db.get().getTable(tblName); + Optional table = db.get().getTable(key.getTblName()); if (table.isPresent()) { - return table.get().initSchemaAndUpdateTime(); + return table.get().initSchemaAndUpdateTime(key); } } return Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a0558766e81400..de3eeff75d97fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -74,7 +74,7 @@ public Long getValue() { } private Optional loadSchema(SchemaCacheKey key) { - Optional schema = catalog.getSchema(key.dbName, key.tblName); + Optional schema = catalog.getSchema(key); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } @@ -83,6 +83,10 @@ private Optional loadSchema(SchemaCacheKey key) { public Optional getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + return getSchemaValue(key); + } + + public Optional getSchemaValue(SchemaCacheKey key) { return schemaCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 5451a219edfd3c..91df061678f154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -317,8 +318,12 @@ public Optional getColumnStatistic(String colName) { * * @return */ - public Optional initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { schemaUpdateTime = System.currentTimeMillis(); + return initSchema(key); + } + + public Optional initSchema(SchemaCacheKey key) { return initSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 2115f47d777b80..ae331ba54cba55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; @@ -477,7 +478,7 @@ public Set getPartitionNames() { } @Override - public Optional initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); // try to use transient_lastDdlTime from hms client diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 2a65588eeac5da..8ddfca886d9fe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -24,7 +24,11 @@ import org.apache.doris.catalog.PartitionType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccTable; import org.apache.doris.datasource.mvcc.MvccUtil; @@ -41,13 +45,22 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.system.SchemasTable; +import org.apache.paimon.types.DataField; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -84,10 +97,15 @@ public Table getPaimonTable(Optional snapshot) { String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - private PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { - makeSureInitialized(); - return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSchema(catalog, dbName, name, schemaId); + public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new PaimonSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); + } + return (PaimonSchemaCacheValue) schemaCacheValue.get(); } private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { @@ -209,6 +227,51 @@ public List getFullSchema() { return getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema(); } + @Override + public Optional initSchema(SchemaCacheKey key) { + makeSureInitialized(); + PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; + try { + PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey); + List columns = schema.getFields(); + List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + List partitionColumns = Lists.newArrayList(); + for (DataField field : columns) { + Column column = new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id()); + dorisColumns.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + return Optional.of(new PaimonSchemaCacheValue(dorisColumns, partitionColumns)); + } catch (Exception e) { + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), + paimonSchemaCacheKey.getSchemaId()); + } + + } + + private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), + name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + Predicate predicate = builder.equal(0, key.getSchemaId()); + // Adding predicates will also return excess data + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + for (InternalRow row : rows) { + PaimonSchema schema = PaimonUtil.rowToSchema(row); + if (schema.getSchemaId() == key.getSchemaId()) { + return schema; + } + } + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId()); + } + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 1c57512461e124..5b711e070667b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -28,30 +28,23 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.PartitionsTable; -import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.table.system.SnapshotsTable; -import org.apache.paimon.types.DataField; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.OptionalLong; -import java.util.Set; import java.util.concurrent.ExecutorService; public class PaimonMetadataCache { private final LoadingCache snapshotCache; - private final LoadingCache schemaCache; public PaimonMetadataCache(ExecutorService executor) { CacheFactory snapshotCacheFactory = new CacheFactory( @@ -61,63 +54,16 @@ public PaimonMetadataCache(ExecutorService executor) { true, null); this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); - - CacheFactory schemaCacheFactory = new CacheFactory( - OptionalLong.of(28800L), - OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), - Config.max_external_table_cache_num, - true, - null); - this.schemaCache = schemaCacheFactory.buildCache(key -> loadSchema(key), null, executor); - } - - @NotNull - private PaimonSchemaCacheValue loadSchema(PaimonSchemaCacheKey key) { - try { - PaimonSchema schema = loadPaimonSchemaBySchemaId(key); - List columns = schema.getFields(); - List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); - Set partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); - List partitionColumns = Lists.newArrayList(); - for (DataField field : columns) { - Column column = new Column(field.name().toLowerCase(), - PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id()); - dorisColumns.add(column); - if (partitionColumnNames.contains(field.name())) { - partitionColumns.add(column); - } - } - return new PaimonSchemaCacheValue(dorisColumns, partitionColumns); - } catch (IOException e) { - throw new CacheException("failed to loadSchema for: %s.%s.%s.%s", - e, key.getCatalog().getName(), key.getDbName(), key.getTableName(), key.getSchemaId()); - } - } - - private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { - Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), - key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); - PredicateBuilder builder = new PredicateBuilder(table.rowType()); - Predicate predicate = builder.equal(0, key.getSchemaId()); - // Adding predicates will also return excess data - List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); - for (InternalRow row : rows) { - PaimonSchema schema = PaimonUtil.rowToSchema(row); - if (schema.getSchemaId() == key.getSchemaId()) { - return schema; - } - } - throw new CacheException("failed to loadSchema for: %s.%s.%s.%s", - null, key.getCatalog().getName(), key.getDbName(), key.getTableName(), key.getSchemaId()); } @NotNull private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { try { PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); - List partitionColumns = getPaimonSchema(key.getCatalog(), key.getDbName(), key.getTableName(), - latestSnapshot.getSchemaId()).getPartitionColumns(); + PaimonExternalTable table = (PaimonExternalTable) key.getCatalog().getDbOrAnalysisException(key.getDbName()) + .getTableOrAnalysisException(key.getTableName()); + List partitionColumns = table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId()) + .getPartitionColumns(); PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); } catch (IOException | AnalysisException e) { @@ -168,10 +114,6 @@ public void invalidateCatalogCache(long catalogId) { snapshotCache.asMap().keySet().stream() .filter(key -> key.getCatalog().getId() == catalogId) .forEach(snapshotCache::invalidate); - - schemaCache.asMap().keySet().stream() - .filter(key -> key.getCatalog().getId() == catalogId) - .forEach(schemaCache::invalidate); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -180,22 +122,12 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) && key.getTableName().equals( tblName)) .forEach(snapshotCache::invalidate); - - schemaCache.asMap().keySet().stream() - .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) - && key.getTableName().equals( - tblName)) - .forEach(schemaCache::invalidate); } public void invalidateDbCache(long catalogId, String dbName) { snapshotCache.asMap().keySet().stream() .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) .forEach(snapshotCache::invalidate); - - schemaCache.asMap().keySet().stream() - .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) - .forEach(schemaCache::invalidate); } public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { @@ -203,17 +135,10 @@ public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbNa return snapshotCache.get(key); } - public PaimonSchemaCacheValue getPaimonSchema(CatalogIf catalog, String dbName, String tbName, long schemaId) { - PaimonSchemaCacheKey key = new PaimonSchemaCacheKey(catalog, dbName, tbName, schemaId); - return schemaCache.get(key); - } - public Map> getCacheStats() { Map> res = Maps.newHashMap(); res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), snapshotCache.estimatedSize())); - res.put("paimon_schema_cache", ExternalMetaCacheMgr.getCacheStats(schemaCache.stats(), - schemaCache.estimatedSize())); return res; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java index fd71d94f1c2988..f74555b369b380 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java @@ -17,36 +17,18 @@ package org.apache.doris.datasource.paimon; -import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; -import java.util.Objects; -import java.util.StringJoiner; +import com.google.common.base.Objects; -public class PaimonSchemaCacheKey { - private final CatalogIf catalog; - private final String dbName; - private final String tableName; +public class PaimonSchemaCacheKey extends SchemaCacheKey { private final long schemaId; - public PaimonSchemaCacheKey(CatalogIf catalog, String dbName, String tableName, long schemaId) { - this.catalog = catalog; - this.dbName = dbName; - this.tableName = tableName; + public PaimonSchemaCacheKey(String dbName, String tableName, long schemaId) { + super(dbName, tableName); this.schemaId = schemaId; } - public CatalogIf getCatalog() { - return catalog; - } - - public String getDbName() { - return dbName; - } - - public String getTableName() { - return tableName; - } - public long getSchemaId() { return schemaId; } @@ -56,28 +38,18 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof PaimonSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { return false; } PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o; - return catalog.getId() == that.catalog.getId() - && Objects.equals(dbName, that.dbName) - && Objects.equals(tableName, that.tableName) - && Objects.equals(schemaId, that.schemaId); + return schemaId == that.schemaId; } @Override public int hashCode() { - return Objects.hash(catalog.getId(), dbName, tableName, schemaId); - } - - @Override - public String toString() { - return new StringJoiner(", ", PaimonSchemaCacheKey.class.getSimpleName() + "[", "]") - .add("catalog=" + catalog) - .add("dbName='" + dbName + "'") - .add("tableName='" + tableName + "'") - .add("schemaId='" + schemaId + "'") - .toString(); + return Objects.hashCode(super.hashCode(), schemaId); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java index c875ef6bc2f141..c43117f2e36d4f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java @@ -138,7 +138,7 @@ private void createDbAndTableForHmsCatalog(HMSExternalCatalog hmsCatalog) { result = TableIf.TableType.HMS_EXTERNAL_TABLE; // mock initSchemaAndUpdateTime and do nothing - tbl.initSchemaAndUpdateTime(); + tbl.initSchemaAndUpdateTime(null); minTimes = 0; tbl.getDatabase(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index bddb3c8185ae72..af6588b84fc85b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -158,7 +158,7 @@ private void init(HMSExternalCatalog hmsCatalog) { result = DLAType.HIVE; // mock initSchemaAndUpdateTime and do nothing - tbl.initSchemaAndUpdateTime(); + tbl.initSchemaAndUpdateTime(null); minTimes = 0; tbl.getDatabase(); @@ -208,7 +208,7 @@ private void init(HMSExternalCatalog hmsCatalog) { result = DLAType.HIVE; // mock initSchemaAndUpdateTime and do nothing - tbl2.initSchemaAndUpdateTime(); + tbl2.initSchemaAndUpdateTime(null); minTimes = 0; tbl2.getDatabase(); @@ -386,7 +386,7 @@ public void testHitSqlCacheAfterPartitionUpdateTimeChanged() throws Exception { List scanNodes = Arrays.asList(hiveScanNode4); // invoke initSchemaAndUpdateTime first and init schemaUpdateTime - tbl2.initSchemaAndUpdateTime(); + tbl2.initSchemaAndUpdateTime(null); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); @@ -434,7 +434,7 @@ public void testHitSqlCacheByNereidsAfterPartitionUpdateTimeChanged() { List scanNodes = Arrays.asList(hiveScanNode4); // invoke initSchemaAndUpdateTime first and init schemaUpdateTime - tbl2.initSchemaAndUpdateTime(); + tbl2.initSchemaAndUpdateTime(null); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); From b1da13804a9c8a2d149de0ca7640a83ea6e517bf Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 10 Dec 2024 10:58:06 +0800 Subject: [PATCH 16/16] move paimon schema cache to ExternalSchemaCache --- .../apache/doris/datasource/hive/HMSExternalTable.java | 4 ++++ .../org/apache/doris/external/hms/HmsCatalogTest.java | 2 +- .../test/java/org/apache/doris/qe/HmsQueryCacheTest.java | 8 ++++---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index cfeb1dbf981e9d..da4670d6d0589d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -503,6 +503,10 @@ public Set getPartitionNames() { @Override public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { + return initSchemaAndUpdateTime(); + } + + public Optional initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); // try to use transient_lastDdlTime from hms client diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java index c43117f2e36d4f..c875ef6bc2f141 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java @@ -138,7 +138,7 @@ private void createDbAndTableForHmsCatalog(HMSExternalCatalog hmsCatalog) { result = TableIf.TableType.HMS_EXTERNAL_TABLE; // mock initSchemaAndUpdateTime and do nothing - tbl.initSchemaAndUpdateTime(null); + tbl.initSchemaAndUpdateTime(); minTimes = 0; tbl.getDatabase(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index af6588b84fc85b..bddb3c8185ae72 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -158,7 +158,7 @@ private void init(HMSExternalCatalog hmsCatalog) { result = DLAType.HIVE; // mock initSchemaAndUpdateTime and do nothing - tbl.initSchemaAndUpdateTime(null); + tbl.initSchemaAndUpdateTime(); minTimes = 0; tbl.getDatabase(); @@ -208,7 +208,7 @@ private void init(HMSExternalCatalog hmsCatalog) { result = DLAType.HIVE; // mock initSchemaAndUpdateTime and do nothing - tbl2.initSchemaAndUpdateTime(null); + tbl2.initSchemaAndUpdateTime(); minTimes = 0; tbl2.getDatabase(); @@ -386,7 +386,7 @@ public void testHitSqlCacheAfterPartitionUpdateTimeChanged() throws Exception { List scanNodes = Arrays.asList(hiveScanNode4); // invoke initSchemaAndUpdateTime first and init schemaUpdateTime - tbl2.initSchemaAndUpdateTime(null); + tbl2.initSchemaAndUpdateTime(); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); @@ -434,7 +434,7 @@ public void testHitSqlCacheByNereidsAfterPartitionUpdateTimeChanged() { List scanNodes = Arrays.asList(hiveScanNode4); // invoke initSchemaAndUpdateTime first and init schemaUpdateTime - tbl2.initSchemaAndUpdateTime(null); + tbl2.initSchemaAndUpdateTime(); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2);