From eec259bcfa69cecc004db1f7c829cb40179fc8c8 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 21 Nov 2024 10:45:19 +0800 Subject: [PATCH 1/4] [feat](mtmv)mtmv support paimon partition refresh (#43959) ### What problem does this PR solve? Previously, when using Paimon to create MTMV, it was not possible to perceive changes in partition lists and data, so only `refresh materialized view mv1 complete` could be used to force full refresh. This PR obtains the partition list of Paimon, the last update time of the partition, and the latest snapshotId of the table. Therefore, MTMV can be partitioned based on Paimon tables and perceive changes in data, automatically refreshing partitions ### Release note mtmv support paimon partition refresh --- .../paimon/PaimonExternalTable.java | 159 +++++++++++++++++- .../datasource/paimon/PaimonPartition.java | 61 +++++++ .../paimon/PaimonPartitionInfo.java | 48 ++++++ .../paimon/PaimonSchemaCacheValue.java | 23 ++- .../doris/datasource/paimon/PaimonUtil.java | 155 +++++++++++++++++ .../org/apache/doris/mtmv/PaimonUtilTest.java | 71 ++++++++ .../data/mtmv_p0/test_paimon_mtmv.out | 116 ++++++++++++- .../suites/mtmv_p0/test_paimon_mtmv.groovy | 135 ++++++++++----- 8 files changed, 718 insertions(+), 50 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9eaf1b7df32ef..5645c4e89e726c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -18,10 +18,22 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.mtmv.MTMVBaseTableIf; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -30,25 +42,35 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -public class PaimonExternalTable extends ExternalTable { +public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); @@ -73,18 +95,95 @@ public Table getPaimonTable() { return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } + private PaimonPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new PaimonPartitionInfo(); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + private List getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return Lists.newArrayList(); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId(); + } + @Override public Optional initSchema() { Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); TableSchema schema = ((FileStoreTable) paimonTable).schema(); List columns = schema.fields(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys()); + List partitionColumns = Lists.newArrayList(); for (DataField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(), + Column column = new Column(field.name().toLowerCase(), paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id())); + field.id()); + tmpSchema.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + try { + // after 0.9.0 paimon will support table.getLatestSnapshotId() + long latestSnapshotId = loadLatestSnapshotId(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns); + return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId, + partitionInfo)); + } catch (IOException | AnalysisException e) { + LOG.warn(e); + return Optional.empty(); + } + } + + private long loadLatestSnapshotId() throws IOException { + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, + name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId + List rows = PaimonUtil.read(table, new int[][] {{0}}); + long latestSnapshotId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + } + } + return latestSnapshotId; + } + + private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List paimonPartitions = loadPartitions(); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions() + throws IOException { + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, + name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); } - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); + return res; } private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { @@ -205,4 +304,56 @@ public long fetchRowCount() { } return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map getAndCopyPartitionItems() { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + @Override + public PartitionType getPartitionType() { + return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames() { + return getPartitionColumnsFromCache().stream() + .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns() { + return getPartitionColumnsFromCache(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { + PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (paimonPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + @Override + public boolean isPartitionColumnAllowNull() { + // Paimon will write to the 'null' partition regardless of whether it is' null or 'null'. + // The logic is inconsistent with Doris' empty partition logic, so it needs to return false. + // However, when Spark creates Paimon tables, specifying 'not null' does not take effect. + // In order to successfully create the materialized view, false is returned here. + // The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data. + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java new file mode 100644 index 00000000000000..545448199b3375 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table +public class PaimonPartition { + // Partition values, for example: [1, dd] + private final String partitionValues; + // The amount of data in the partition + private final long recordCount; + // Partition file size + private final long fileSizeInBytes; + // Number of partition files + private final long fileCount; + // Last update time of partition + private final long lastUpdateTime; + + public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime) { + this.partitionValues = partitionValues; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + } + + public String getPartitionValues() { + return partitionValues; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java new file mode 100644 index 00000000000000..8f54f0834e481b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class PaimonPartitionInfo { + private Map nameToPartitionItem; + private Map nameToPartition; + + public PaimonPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToPartition = Maps.newHashMap(); + } + + public PaimonPartitionInfo(Map nameToPartitionItem, + Map nameToPartition) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToPartition = nameToPartition; + } + + public Map getNameToPartitionItem() { + return nameToPartitionItem; + } + + public Map getNameToPartition() { + return nameToPartition; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index aaaefe7f32db2b..20d27b2425df24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -27,13 +27,34 @@ public class PaimonSchemaCacheValue extends SchemaCacheValue { private Table paimonTable; + private List partitionColumns; + private PaimonPartitionInfo partitionInfo; - public PaimonSchemaCacheValue(List schema, Table paimonTable) { + private long snapshootId; + + public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable, + long snapshootId, + PaimonPartitionInfo partitionInfo) { super(schema); + this.partitionColumns = partitionColumns; this.paimonTable = paimonTable; + this.snapshootId = snapshootId; + this.partitionInfo = partitionInfo; } public Table getPaimonTable() { return paimonTable; } + + public List getPartitionColumns() { + return partitionColumns; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public long getSnapshootId() { + return snapshootId; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java new file mode 100644 index 00000000000000..8b7017cac29486 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HiveUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Projection; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class PaimonUtil { + public static List read( + Table table, @Nullable int[][] projection, Pair, String>... dynamicOptions) + throws IOException { + Map options = new HashMap<>(); + for (Pair, String> pair : dynamicOptions) { + options.put(pair.getKey().key(), pair.getValue()); + } + table = table.copy(options); + ReadBuilder readBuilder = table.newReadBuilder(); + if (projection != null) { + readBuilder.withProjection(projection); + } + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } + + + /* + https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table + +---------------+----------------+--------------------+--------------------+------------------------+ + | partition | record_count | file_size_in_bytes| file_count| last_update_time| + +---------------+----------------+--------------------+--------------------+------------------------+ + | [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400| + +---------------+----------------+--------------------+--------------------+------------------------+ + org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "record_count", new BigIntType(false)), + new DataField(2, "file_size_in_bytes", new BigIntType(false)), + new DataField(3, "file_count", new BigIntType(false)), + new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS()))); + */ + public static PaimonPartition rowToPartition(InternalRow row) { + String partition = row.getString(0).toString(); + long recordCount = row.getLong(1); + long fileSizeInBytes = row.getLong(2); + long fileCount = row.getLong(3); + long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond(); + return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime); + } + + public static PaimonPartitionInfo generatePartitionInfo(List partitionColumns, + List paimonPartitions) throws AnalysisException { + Map nameToPartitionItem = Maps.newHashMap(); + Map nameToPartition = Maps.newHashMap(); + PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition); + if (CollectionUtils.isEmpty(partitionColumns)) { + return partitionInfo; + } + for (PaimonPartition paimonPartition : paimonPartitions) { + String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues()); + nameToPartition.put(partitionName, paimonPartition); + nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns)); + } + return partitionInfo; + } + + private static String getPartitionName(List partitionColumns, String partitionValueStr) { + Preconditions.checkNotNull(partitionValueStr); + String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") + .split(","); + Preconditions.checkState(partitionColumns.size() == partitionValues.length); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColumns.size(); ++i) { + if (i != 0) { + sb.append("/"); + } + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); + } + return sb.toString(); + } + + public static ListPartitionItem toListPartitionItem(String partitionName, List partitionColumns) + throws AnalysisException { + List types = partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()); + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + List partitionValues = HiveUtil.toPartitionValues(partitionName); + Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types); + List values = Lists.newArrayListWithExpectedSize(types.size()); + for (String partitionValue : partitionValues) { + // null will in partition 'null' + // "null" will in partition 'null' + // NULL will in partition 'null' + // "NULL" will in partition 'NULL' + // values.add(new PartitionValue(partitionValue, "null".equals(partitionValue))); + values.add(new PartitionValue(partitionValue, false)); + } + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); + ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key)); + return listPartitionItem; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java new file mode 100644 index 00000000000000..789af7bf8357ac --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.paimon.PaimonPartition; +import org.apache.doris.datasource.paimon.PaimonPartitionInfo; +import org.apache.doris.datasource.paimon.PaimonUtil; + +import com.google.common.collect.Lists; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class PaimonUtilTest { + + @Test + public void testGeneratePartitionInfo() throws AnalysisException { + Column k1 = new Column("k1", PrimitiveType.INT); + Column k2 = new Column("k2", PrimitiveType.VARCHAR); + List partitionColumns = Lists.newArrayList(k1, k2); + PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5); + List paimonPartitions = Lists.newArrayList(p1); + PaimonPartitionInfo partitionInfo = PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + String expectPartitionName = "k1=1/k2=aa"; + Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName)); + PartitionItem partitionItem = partitionInfo.getNameToPartitionItem().get(expectPartitionName); + List keys = partitionItem.getItems(); + Assert.assertEquals(1, keys.size()); + PartitionKey partitionKey = keys.get(0); + List exprs = partitionKey.getKeys(); + Assert.assertEquals(2, exprs.size()); + Assert.assertEquals(1, exprs.get(0).getLongValue()); + Assert.assertEquals("aa", exprs.get(1).getStringValue()); + } + + @Test + public void testRowToPartition() { + GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 3L, 4L, Timestamp.fromEpochMillis(5L)); + PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row); + Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues()); + Assert.assertEquals(2L, paimonPartition.getRecordCount()); + Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes()); + Assert.assertEquals(4L, paimonPartition.getFileCount()); + Assert.assertEquals(5L, paimonPartition.getLastUpdateTime()); + } +} diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_mtmv.out index c654cb01214f57..c28b7cb7baca22 100644 --- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out +++ b/regression-test/data/mtmv_p0/test_paimon_mtmv.out @@ -1,9 +1,113 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !catalog -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 +-- !base_table -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b --- !mtmv -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 +-- !refresh_one_partition -- +1 2 a +10 1 a +2 2 a +3 2 a +4 2 a +5 2 a +6 1 a +7 1 a +8 1 a +9 1 a + +-- !refresh_auto -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !not_partition_before -- +false + +-- !not_partition -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !not_partition_after -- +true diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index e84eb497b2c7b1..f2989edbf6dfd6 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -15,48 +15,105 @@ // specific language governing permissions and limitations // under the License. -suite("test_paimon_mtmv", "p0,external,paimon,external_docker,external_docker_hive") { +suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { String enabled = context.config.otherConfigs.get("enablePaimonTest") - logger.info("enabled: " + enabled) + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - logger.info("externalEnvIp: " + externalEnvIp) - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") - logger.info("hdfs_port: " + hdfs_port) - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String catalog_name = "paimon_mtmv_catalog"; - String mvName = "test_paimon_mtmv" - String dbName = "regression_test_mtmv_p0" - String paimonDb = "db1" - String paimonTable = "all_table" - sql """drop catalog if exists ${catalog_name} """ - - sql """create catalog if not exists ${catalog_name} properties ( - "type" = "paimon", - "paimon.catalog.type"="filesystem", - "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" );""" - order_qt_catalog """select * from ${catalog_name}.${paimonDb}.${paimonTable}""" - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable}; - """ - - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - def jobName = getJobName(dbName, mvName); - waitingMTMVTaskFinished(jobName) - order_qt_mtmv "SELECT * FROM ${mvName}" - - sql """drop materialized view if exists ${mvName};""" - sql """ drop catalog if exists ${catalog_name} """ - } + order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format ; """ + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // rebuild catalog, should not Affects MTMV + sql """drop catalog if exists ${catalogName}""" + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + ); + """ + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " + + sql """drop materialized view if exists ${mvName};""" + + // not have partition + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + //should can refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_not_partition "SELECT * FROM ${mvName} " + order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" + } From 336645ff2ad85eb14532fcf16f30af7d55f5b1c8 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Wed, 11 Dec 2024 10:53:08 +0800 Subject: [PATCH 2/4] [feat](mtmv)Paimon queries the data in the cache instead of querying the latest data (#44911) Problem Summary: - add `PaimonMetadataCacheMgr` in `ExternalMetaCacheMgr` to manage snapshotCache of paimon table - move paimonSchemaCache to PaimonMetadataCacheMgr, and add schemaId as part of key - PaimonExternalTable overrides the methods in ExternalTable and supports partition pruning - PaimonExternalTable implements the MvcTable interface, supporting the retrieval of snapshot data from the cache during queries to avoid cache refreshes that may result in different versions of metadata being used in a single query - MTMVTask retrieves snapshot data of mvccTable before the task starts to avoid cache refresh that may result in different versions of metadata being used in a single refresh task Paimon queries the data in the cache instead of querying the latest data behavior changes of query paimon table: - FE has just started and is query the latest data - Paimon data has changed, Doris is still query the previous data - After the snapshot cache expires, Doris will query the latest data - desc paimon; The schema corresponding to the snapshotId in the snapshot cache is displayed --- .../doris/datasource/ExternalCatalog.java | 9 +- .../datasource/ExternalMetaCacheMgr.java | 12 + .../doris/datasource/ExternalSchemaCache.java | 6 +- .../doris/datasource/ExternalTable.java | 7 +- .../datasource/hive/HMSExternalTable.java | 5 + .../doris/datasource/mvcc/MvccUtil.java | 44 +++ .../paimon/PaimonExternalTable.java | 313 +++++------------- .../paimon/PaimonMetadataCache.java | 144 ++++++++ .../paimon/PaimonMetadataCacheMgr.java | 49 +++ .../datasource/paimon/PaimonMvccSnapshot.java | 32 ++ .../paimon/PaimonPartitionInfo.java | 4 +- .../doris/datasource/paimon/PaimonSchema.java | 46 +++ .../paimon/PaimonSchemaCacheKey.java | 55 +++ .../paimon/PaimonSchemaCacheValue.java | 25 +- .../datasource/paimon/PaimonSnapshot.java | 36 ++ .../paimon/PaimonSnapshotCacheKey.java | 75 +++++ .../paimon/PaimonSnapshotCacheValue.java | 37 +++ .../doris/datasource/paimon/PaimonUtil.java | 122 ++++++- .../paimon/source/PaimonSource.java | 3 +- .../doris/job/extensions/mtmv/MTMVTask.java | 14 + .../doris/nereids/StatementContext.java | 26 +- .../rules/rewrite/PruneFileScanPartition.java | 9 +- .../commands/UpdateMvByPartitionCommand.java | 7 + .../trees/plans/logical/LogicalFileScan.java | 7 +- .../data/mtmv_p0/test_paimon_rewrite_mtmv.out | 16 + .../mtmv_p0/test_paimon_rewrite_mtmv.groovy | 95 ++++++ 26 files changed, 928 insertions(+), 270 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java create mode 100644 regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out create mode 100644 regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index cde08113373aee..d7cbee18c74c7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -41,6 +41,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; @@ -432,13 +433,13 @@ private void refreshOnlyCatalogCache(boolean invalidCache) { } } - public final Optional getSchema(String dbName, String tblName) { + public final Optional getSchema(SchemaCacheKey key) { makeSureInitialized(); - Optional> db = getDb(dbName); + Optional> db = getDb(key.getDbName()); if (db.isPresent()) { - Optional table = db.get().getTable(tblName); + Optional table = db.get().getTable(key.getTblName()); if (table.isPresent()) { - return table.get().initSchemaAndUpdateTime(); + return table.get().initSchemaAndUpdateTime(key); } } return Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index cc40ad292ce182..24f55e74266863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -31,6 +31,8 @@ import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr; import org.apache.doris.datasource.metacache.MetaCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.nereids.exceptions.NotSupportedException; @@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr { private ExternalRowCountCache rowCountCache; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; + private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; public ExternalMetaCacheMgr() { rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -122,6 +125,7 @@ public ExternalMetaCacheMgr() { hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); + paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); } public ExecutorService getFileListingExecutor() { @@ -167,6 +171,10 @@ public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCacheMgr.getPaimonMetadataCache(); + } + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId); } @@ -189,6 +197,7 @@ public void removeCache(long catalogId) { hudiPartitionMgr.removePartitionProcessor(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); + paimonMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -204,6 +213,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); + paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); if (LOG.isDebugEnabled()) { LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -222,6 +232,7 @@ public void invalidateDbCache(long catalogId, String dbName) { hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); if (LOG.isDebugEnabled()) { LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -239,6 +250,7 @@ public void invalidateCatalogCache(long catalogId) { hudiPartitionMgr.cleanPartitionProcess(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); + paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a0558766e81400..de3eeff75d97fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -74,7 +74,7 @@ public Long getValue() { } private Optional loadSchema(SchemaCacheKey key) { - Optional schema = catalog.getSchema(key.dbName, key.tblName); + Optional schema = catalog.getSchema(key); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } @@ -83,6 +83,10 @@ private Optional loadSchema(SchemaCacheKey key) { public Optional getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + return getSchemaValue(key); + } + + public Optional getSchemaValue(SchemaCacheKey key) { return schemaCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index bd1e36e7bc968b..e5b618b4d31708 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -317,8 +318,12 @@ public Optional getColumnStatistic(String colName) { * * @return */ - public Optional initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { schemaUpdateTime = System.currentTimeMillis(); + return initSchema(key); + } + + public Optional initSchema(SchemaCacheKey key) { return initSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index ad685386ec9e89..7dac6435fcbc55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; @@ -501,6 +502,10 @@ public Set getPartitionNames() { } @Override + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { + return initSchemaAndUpdateTime(); + } + public Optional initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java new file mode 100644 index 00000000000000..ffdaff770e21a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.qe.ConnectContext; + +import java.util.Optional; + +public class MvccUtil { + /** + * get Snapshot From StatementContext + * + * @param tableIf + * @return MvccSnapshot + */ + public static Optional getSnapshotFromContext(TableIf tableIf) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return Optional.empty(); + } + StatementContext statementContext = connectContext.getStatementContext(); + if (statementContext == null) { + return Optional.empty(); + } + return statementContext.getSnapshot(tableIf); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 5645c4e89e726c..7b59d879d9301c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -19,21 +19,15 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; -import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Type; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; -import org.apache.doris.mtmv.MTMVBaseTableIf; -import org.apache.doris.mtmv.MTMVRefreshContext; -import org.apache.doris.mtmv.MTMVRelatedTableIf; -import org.apache.doris.mtmv.MTMVSnapshotIf; -import org.apache.doris.mtmv.MTMVTimestampSnapshot; -import org.apache.doris.mtmv.MTMVVersionSnapshot; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -42,40 +36,36 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.system.PartitionsTable; -import org.apache.paimon.table.system.SnapshotsTable; -import org.apache.paimon.types.ArrayType; +import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.RowType; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { +public class PaimonExternalTable extends ExternalTable implements MvccTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); + private final Table paimonTable; + public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); + this.paimonTable = catalog.getPaimonTable(dbName, name); } public String getPaimonCatalogType() { @@ -89,176 +79,27 @@ protected synchronized void makeSureInitialized() { } } - public Table getPaimonTable() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); - } - - private PaimonPartitionInfo getPartitionInfoFromCache() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - return new PaimonPartitionInfo(); - } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + public Table getPaimonTable(Optional snapshot) { + return paimonTable.copy( + Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), + String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - private List getPartitionColumnsFromCache() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); + public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new PaimonSchemaCacheKey(dbName, name, schemaId)); if (!schemaCacheValue.isPresent()) { - return Lists.newArrayList(); + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); + return (PaimonSchemaCacheValue) schemaCacheValue.get(); } - public long getLatestSnapshotIdFromCache() throws AnalysisException { + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("not present"); - } - return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId(); - } - - @Override - public Optional initSchema() { - Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - TableSchema schema = ((FileStoreTable) paimonTable).schema(); - List columns = schema.fields(); - List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - Set partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys()); - List partitionColumns = Lists.newArrayList(); - for (DataField field : columns) { - Column column = new Column(field.name().toLowerCase(), - paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id()); - tmpSchema.add(column); - if (partitionColumnNames.contains(field.name())) { - partitionColumns.add(column); - } - } - try { - // after 0.9.0 paimon will support table.getLatestSnapshotId() - long latestSnapshotId = loadLatestSnapshotId(); - PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns); - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId, - partitionInfo)); - } catch (IOException | AnalysisException e) { - LOG.warn(e); - return Optional.empty(); - } - } - - private long loadLatestSnapshotId() throws IOException { - Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, - name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); - // snapshotId - List rows = PaimonUtil.read(table, new int[][] {{0}}); - long latestSnapshotId = 0L; - for (InternalRow row : rows) { - long snapshotId = row.getLong(0); - if (snapshotId > latestSnapshotId) { - latestSnapshotId = snapshotId; - } - } - return latestSnapshotId; - } - - private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { - if (CollectionUtils.isEmpty(partitionColumns)) { - return new PaimonPartitionInfo(); - } - List paimonPartitions = loadPartitions(); - return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); - } - - private List loadPartitions() - throws IOException { - Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, - name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); - List rows = PaimonUtil.read(table, null); - List res = Lists.newArrayListWithCapacity(rows.size()); - for (InternalRow row : rows) { - res.add(PaimonUtil.rowToPartition(row)); - } - return res; - } - - private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { - int tsScale = 3; // default - switch (dataType.getTypeRoot()) { - case BOOLEAN: - return Type.BOOLEAN; - case INTEGER: - return Type.INT; - case BIGINT: - return Type.BIGINT; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case SMALLINT: - return Type.SMALLINT; - case TINYINT: - return Type.TINYINT; - case VARCHAR: - case BINARY: - case CHAR: - case VARBINARY: - return Type.STRING; - case DECIMAL: - DecimalType decimal = (DecimalType) dataType; - return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); - case DATE: - return ScalarType.createDateV2Type(); - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.TimestampType) { - tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case ARRAY: - ArrayType arrayType = (ArrayType) dataType; - Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); - return org.apache.doris.catalog.ArrayType.create(innerType, true); - case MAP: - MapType mapType = (MapType) dataType; - return new org.apache.doris.catalog.MapType( - paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); - case ROW: - RowType rowType = (RowType) dataType; - List fields = rowType.getFields(); - return new org.apache.doris.catalog.StructType(fields.stream() - .map(field -> new org.apache.doris.catalog.StructField(field.name(), - paimonTypeToDorisType(field.type()))) - .collect(Collectors.toCollection(ArrayList::new))); - case TIME_WITHOUT_TIME_ZONE: - return Type.UNSUPPORTED; - default: - LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); - return Type.UNSUPPORTED; - } - } - - protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { - return paimonPrimitiveTypeToDorisType(type); + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(catalog, dbName, name); } @Override @@ -288,13 +129,6 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { public long fetchRowCount() { makeSureInitialized(); long rowCount = 0; - Optional schemaCacheValue = getSchemaCacheValue(); - Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) - .orElse(null); - if (paimonTable == null) { - LOG.info("Paimon table {} is null.", name); - return UNKNOWN_ROW_COUNT; - } List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); @@ -306,54 +140,85 @@ public long fetchRowCount() { } @Override - public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { - Env.getCurrentEnv().getRefreshManager() - .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + public List getPartitionColumns(Optional snapshot) { + return getPaimonSchemaCacheValue(snapshot).getPartitionColumns(); } @Override - public Map getAndCopyPartitionItems() { - return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + public MvccSnapshot loadSnapshot() { + return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); } @Override - public PartitionType getPartitionType() { - return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + public Map getNameToPartitionItems(Optional snapshot) { + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); } @Override - public Set getPartitionColumnNames() { - return getPartitionColumnsFromCache().stream() - .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + public boolean supportInternalPartitionPruned() { + return true; } @Override - public List getPartitionColumns() { - return getPartitionColumnsFromCache(); + public List getFullSchema() { + return getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) - throws AnalysisException { - PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); - if (paimonPartition == null) { - throw new AnalysisException("can not find partition: " + partitionName); + public Optional initSchema(SchemaCacheKey key) { + makeSureInitialized(); + PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; + try { + PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey); + List columns = schema.getFields(); + List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + List partitionColumns = Lists.newArrayList(); + for (DataField field : columns) { + Column column = new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id()); + dorisColumns.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + return Optional.of(new PaimonSchemaCacheValue(dorisColumns, partitionColumns)); + } catch (Exception e) { + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), + paimonSchemaCacheKey.getSchemaId()); } - return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime()); } - @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { - return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), + name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + Predicate predicate = builder.equal(0, key.getSchemaId()); + // Adding predicates will also return excess data + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + for (InternalRow row : rows) { + PaimonSchema schema = PaimonUtil.rowToSchema(row); + if (schema.getSchemaId() == key.getSchemaId()) { + return schema; + } + } + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId()); } - @Override - public boolean isPartitionColumnAllowNull() { - // Paimon will write to the 'null' partition regardless of whether it is' null or 'null'. - // The logic is inconsistent with Doris' empty partition logic, so it needs to return false. - // However, when Spark creates Paimon tables, specifying 'not null' does not take effect. - // In order to successfully create the materialized view, false is returned here. - // The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data. - return true; + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { + PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); + return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); + } + + private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getPaimonSnapshotCacheValue(); + } } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java new file mode 100644 index 00000000000000..5b711e070667b7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalMetaCacheMgr; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCache { + + private final LoadingCache snapshotCache; + + public PaimonMetadataCache(ExecutorService executor) { + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); + } + + @NotNull + private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { + try { + PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonExternalTable table = (PaimonExternalTable) key.getCatalog().getDbOrAnalysisException(key.getDbName()) + .getTableOrAnalysisException(key.getTableName()); + List partitionColumns = table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId()) + .getPartitionColumns(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); + return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); + } catch (IOException | AnalysisException e) { + throw new CacheException("failed to loadSnapshot for: %s.%s.%s", + e, key.getCatalog().getName(), key.getDbName(), key.getTableName()); + } + } + + private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List partitionColumns) + throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List paimonPartitions = loadPartitions(key); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions(PaimonSnapshotCacheKey key) + throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); + } + return res; + } + + private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId and schemaId + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); + long latestSnapshotId = 0L; + long latestSchemaId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + latestSchemaId = row.getLong(1); + } + } + return new PaimonSnapshot(latestSnapshotId, latestSchemaId); + } + + public void invalidateCatalogCache(long catalogId) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId) + .forEach(snapshotCache::invalidate); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) + && key.getTableName().equals( + tblName)) + .forEach(snapshotCache::invalidate); + } + + public void invalidateDbCache(long catalogId, String dbName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) + .forEach(snapshotCache::invalidate); + } + + public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, dbName, tbName); + return snapshotCache.get(key); + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java new file mode 100644 index 00000000000000..a282fde665b197 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCacheMgr { + + private PaimonMetadataCache paimonMetadataCache; + + public PaimonMetadataCacheMgr(ExecutorService executor) { + this.paimonMetadataCache = new PaimonMetadataCache(executor); + } + + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCache; + } + + public void removeCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName); + } + + public void invalidateDbCache(long catalogId, String dbName) { + paimonMetadataCache.invalidateDbCache(catalogId, dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java new file mode 100644 index 00000000000000..2307e91adb3911 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +public class PaimonMvccSnapshot implements MvccSnapshot { + private final PaimonSnapshotCacheValue snapshotCacheValue; + + public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; + } + + public PaimonSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index 8f54f0834e481b..4d3326f8e48376 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -24,8 +24,8 @@ import java.util.Map; public class PaimonPartitionInfo { - private Map nameToPartitionItem; - private Map nameToPartition; + private final Map nameToPartitionItem; + private final Map nameToPartition; public PaimonPartitionInfo() { this.nameToPartitionItem = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java new file mode 100644 index 00000000000000..ef26e1ed20879d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.paimon.types.DataField; + +import java.util.List; + +public class PaimonSchema { + private final long schemaId; + private final List fields; + private final List partitionKeys; + + public PaimonSchema(long schemaId, List fields, List partitionKeys) { + this.schemaId = schemaId; + this.fields = fields; + this.partitionKeys = partitionKeys; + } + + public long getSchemaId() { + return schemaId; + } + + public List getFields() { + return fields; + } + + public List getPartitionKeys() { + return partitionKeys; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java new file mode 100644 index 00000000000000..f74555b369b380 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +public class PaimonSchemaCacheKey extends SchemaCacheKey { + private final long schemaId; + + public PaimonSchemaCacheKey(String dbName, String tableName, long schemaId) { + super(dbName, tableName); + this.schemaId = schemaId; + } + + public long getSchemaId() { + return schemaId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PaimonSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { + return false; + } + PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o; + return schemaId == that.schemaId; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), schemaId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index 20d27b2425df24..ccb530a3cbccc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -20,41 +20,18 @@ import org.apache.doris.catalog.Column; import org.apache.doris.datasource.SchemaCacheValue; -import org.apache.paimon.table.Table; - import java.util.List; public class PaimonSchemaCacheValue extends SchemaCacheValue { - private Table paimonTable; private List partitionColumns; - private PaimonPartitionInfo partitionInfo; - - private long snapshootId; - public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable, - long snapshootId, - PaimonPartitionInfo partitionInfo) { + public PaimonSchemaCacheValue(List schema, List partitionColumns) { super(schema); this.partitionColumns = partitionColumns; - this.paimonTable = paimonTable; - this.snapshootId = snapshootId; - this.partitionInfo = partitionInfo; - } - - public Table getPaimonTable() { - return paimonTable; } public List getPartitionColumns() { return partitionColumns; } - - public PaimonPartitionInfo getPartitionInfo() { - return partitionInfo; - } - - public long getSnapshootId() { - return snapshootId; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java new file mode 100644 index 00000000000000..4a536dd72cc901 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshot { + private final long snapshotId; + private final long schemaId; + + public PaimonSnapshot(long snapshotId, long schemaId) { + this.snapshotId = snapshotId; + this.schemaId = schemaId; + } + + public long getSnapshotId() { + return snapshotId; + } + + public long getSchemaId() { + return schemaId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java new file mode 100644 index 00000000000000..970f111a72133f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogIf; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PaimonSnapshotCacheKey { + private final CatalogIf catalog; + private final String dbName; + private final String tableName; + + public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; + this.dbName = dbName; + this.tableName = tableName; + } + + public CatalogIf getCatalog() { + return catalog; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; + return catalog.getId() == that.catalog.getId() + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(catalog.getId(), dbName, tableName); + } + + @Override + public String toString() { + return new StringJoiner(", ", PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]") + .add("catalog=" + catalog) + .add("dbName='" + dbName + "'") + .add("tableName='" + tableName + "'") + .toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java new file mode 100644 index 00000000000000..c50ecdabfde3df --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshotCacheValue { + + private final PaimonPartitionInfo partitionInfo; + private final PaimonSnapshot snapshot; + + public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, PaimonSnapshot snapshot) { + this.partitionInfo = partitionInfo; + this.snapshot = snapshot; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public PaimonSnapshot getSnapshot() { + return snapshot; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 8b7017cac29486..1f7576dca51d93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.hive.HiveUtil; @@ -30,12 +31,22 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; @@ -48,8 +59,11 @@ import javax.annotation.Nullable; public class PaimonUtil { + private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); + public static List read( - Table table, @Nullable int[][] projection, Pair, String>... dynamicOptions) + Table table, @Nullable int[][] projection, @Nullable Predicate predicate, + Pair, String>... dynamicOptions) throws IOException { Map options = new HashMap<>(); for (Pair, String> pair : dynamicOptions) { @@ -60,6 +74,9 @@ public static List read( if (projection != null) { readBuilder.withProjection(projection); } + if (predicate != null) { + readBuilder.withFilter(predicate); + } RecordReader reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); InternalRowSerializer serializer = @@ -152,4 +169,107 @@ public static ListPartitionItem toListPartitionItem(String partitionName, List 6) { + tsScale = 6; + } + } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case ARRAY: + ArrayType arrayType = (ArrayType) dataType; + Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); + return org.apache.doris.catalog.ArrayType.create(innerType, true); + case MAP: + MapType mapType = (MapType) dataType; + return new org.apache.doris.catalog.MapType( + paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); + case ROW: + RowType rowType = (RowType) dataType; + List fields = rowType.getFields(); + return new org.apache.doris.catalog.StructType(fields.stream() + .map(field -> new org.apache.doris.catalog.StructField(field.name(), + paimonTypeToDorisType(field.type()))) + .collect(Collectors.toCollection(ArrayList::new))); + case TIME_WITHOUT_TIME_ZONE: + return Type.UNSUPPORTED; + default: + LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); + return Type.UNSUPPORTED; + } + } + + public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { + return paimonPrimitiveTypeToDorisType(type); + } + + /** + * https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table + * demo: + * 0 + * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"}, + * {"id":1,"name":"item_id","type":"BIGINT"}, + * {"id":2,"name":"behavior","type":"STRING"}, + * {"id":3,"name":"dt","type":"STRING NOT NULL"}, + * {"id":4,"name":"hh","type":"STRING NOT NULL"}] + * ["dt"] + * ["dt","hh","user_id"] + * {"owner":"hadoop","provider":"paimon"} + * 2024-12-03 15:38:14.734 + * + * @param row + * @return + */ + public static PaimonSchema rowToSchema(InternalRow row) { + long schemaId = row.getLong(0); + String fieldsStr = row.getString(1).toString(); + String partitionKeysStr = row.getString(2).toString(); + List fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference>() { + }); + List partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference>() { + }); + return new PaimonSchema(schemaId, fields, partitionKeys); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 885eba06ed956d..a8bb814f1d353b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.thrift.TFileAttributes; @@ -36,7 +37,7 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - this.originTable = paimonExtTable.getPaimonTable(); + this.originTable = paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 68ed0cc9b23592..353e024ed97348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -29,6 +29,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; @@ -70,6 +73,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -141,6 +145,8 @@ public enum MTMVTaskRefreshMode { private StmtExecutor executor; private Map partitionSnapshots; + private final Map snapshots = Maps.newHashMap(); + public MTMVTask() { } @@ -218,6 +224,9 @@ private void exec(Set refreshPartitionNames, throws Exception { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); + for (Entry entry : snapshots.entrySet()) { + statementContext.setSnapshot(entry.getKey(), entry.getValue()); + } ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); @@ -305,6 +314,11 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException { MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf; baseTableIf.beforeMTMVRefresh(mtmv); } + if (tableIf instanceof MvccTable) { + MvccTable mvccTable = (MvccTable) tableIf; + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(); + snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 022532a61bfe4b..2d2c2897c4245c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -558,7 +558,11 @@ public void loadSnapshots(Map, TableIf> tables) { } for (TableIf tableIf : tables.values()) { if (tableIf instanceof MvccTable) { - snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + // may be set by MTMV, we can not load again + if (!snapshots.containsKey(mvccTableInfo)) { + snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot()); + } } } } @@ -566,11 +570,25 @@ public void loadSnapshots(Map, TableIf> tables) { /** * Obtain snapshot information of mvcc * - * @param mvccTable mvccTable + * @param tableIf tableIf * @return MvccSnapshot */ - public MvccSnapshot getSnapshot(MvccTable mvccTable) { - return snapshots.get(new MvccTableInfo(mvccTable)); + public Optional getSnapshot(TableIf tableIf) { + if (!(tableIf instanceof MvccTable)) { + return Optional.empty(); + } + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + return Optional.ofNullable(snapshots.get(mvccTableInfo)); + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTableInfo mvccTableInfo + * @param snapshot snapshot + */ + public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot snapshot) { + snapshots.put(mvccTableInfo, snapshot); } private static class CloseableResource implements Closeable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index ba8b270d1f397d..e99906f5e13dc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,8 +73,8 @@ public Rule build() { private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); - // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -83,8 +82,8 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, Map scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) + List partitionSlots = externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 36cc0f95a77a8e..b0a95ffdd3aab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -28,6 +28,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; @@ -316,6 +318,11 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, partitionHasDataItems.add( ((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId())); } + if (targetTable instanceof ExternalTable) { + // Add filter only when partition has data when external table + partitionHasDataItems.add(((ExternalTable) targetTable).getNameToPartitionItems( + MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName)); + } } if (partitionHasDataItems.isEmpty()) { predicates.setNeedAddFilter(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 96b8e032d11274..1f5f71f7bafe59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.TableSample; @@ -60,10 +61,10 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali } public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, - Optional tableSample, Optional tableSnapshot) { - // todo: real snapshotId + Optional tableSample, Optional tableSnapshot) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)), + tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out new file mode 100644 index 00000000000000..63bda82c1db5bd --- /dev/null +++ b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !refresh_one_partition -- +a 10 + +-- !refresh_one_partition_rewrite -- +a 10 +b 10 + +-- !refresh_auto -- +a 10 +b 10 + +-- !refresh_all_partition_rewrite -- +a 10 +b 10 + diff --git a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy new file mode 100644 index 00000000000000..985443875c7b26 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_rewrite_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_rewrite_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT par,count(*) as num FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format group by par;"; + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """analyze table ${catalogName}.`test_paimon_spark`.test_tb_mix_format with sync""" + sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format modify column par set stats ('row_count'='20');""" + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mvSql} + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" +} + From 178d29d46cbddac6ce12a7067997013ac4b495e6 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 19 Dec 2024 19:12:11 +0800 Subject: [PATCH 3/4] 1 --- .../main/java/org/apache/doris/datasource/ExternalTable.java | 2 +- .../java/org/apache/doris/datasource/hive/HMSExternalTable.java | 2 +- .../doris/datasource/maxcompute/MaxComputeExternalTable.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index e5b618b4d31708..91df061678f154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -404,7 +404,7 @@ public SelectedPartitions initSelectedPartitions(Optional snapshot * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 7dac6435fcbc55..da4670d6d0589d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -331,7 +331,7 @@ public SelectedPartitions initHudiSelectedPartitions(Optional tab } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return getNameToPartitionItems(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 0f748f59e927bc..dbbbcf2d6a1e5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -92,7 +92,7 @@ public List getPartitionColumns() { } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { if (getPartitionColumns().isEmpty()) { return Collections.emptyMap(); } From 8f3949514837e7fb9883ecad8281e45de0292901 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 19 Dec 2024 19:14:41 +0800 Subject: [PATCH 4/4] add case --- .../data/mtmv_p0/test_paimon_mtmv.out | 113 ----------------- .../data/mtmv_p0/test_paimon_rewrite_mtmv.out | 16 --- .../suites/mtmv_p0/test_paimon_mtmv.groovy | 119 ------------------ .../mtmv_p0/test_paimon_rewrite_mtmv.groovy | 95 -------------- 4 files changed, 343 deletions(-) delete mode 100644 regression-test/data/mtmv_p0/test_paimon_mtmv.out delete mode 100644 regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out delete mode 100644 regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy delete mode 100644 regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_mtmv.out deleted file mode 100644 index c28b7cb7baca22..00000000000000 --- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out +++ /dev/null @@ -1,113 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !base_table -- -1 2 a -1 2 b -10 1 a -10 1 b -2 2 a -2 2 b -3 2 a -3 2 b -4 2 a -4 2 b -5 2 a -5 2 b -6 1 a -6 1 b -7 1 a -7 1 b -8 1 a -8 1 b -9 1 a -9 1 b - --- !refresh_one_partition -- -1 2 a -10 1 a -2 2 a -3 2 a -4 2 a -5 2 a -6 1 a -7 1 a -8 1 a -9 1 a - --- !refresh_auto -- -1 2 a -1 2 b -10 1 a -10 1 b -2 2 a -2 2 b -3 2 a -3 2 b -4 2 a -4 2 b -5 2 a -5 2 b -6 1 a -6 1 b -7 1 a -7 1 b -8 1 a -8 1 b -9 1 a -9 1 b - --- !is_sync_before_rebuild -- -true - --- !is_sync_after_rebuild -- -true - --- !refresh_complete_rebuild -- -1 2 a -1 2 b -10 1 a -10 1 b -2 2 a -2 2 b -3 2 a -3 2 b -4 2 a -4 2 b -5 2 a -5 2 b -6 1 a -6 1 b -7 1 a -7 1 b -8 1 a -8 1 b -9 1 a -9 1 b - --- !not_partition_before -- -false - --- !not_partition -- -1 2 a -1 2 b -10 1 a -10 1 b -2 2 a -2 2 b -3 2 a -3 2 b -4 2 a -4 2 b -5 2 a -5 2 b -6 1 a -6 1 b -7 1 a -7 1 b -8 1 a -8 1 b -9 1 a -9 1 b - --- !not_partition_after -- -true - diff --git a/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out deleted file mode 100644 index 63bda82c1db5bd..00000000000000 --- a/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out +++ /dev/null @@ -1,16 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !refresh_one_partition -- -a 10 - --- !refresh_one_partition_rewrite -- -a 10 -b 10 - --- !refresh_auto -- -a 10 -b 10 - --- !refresh_all_partition_rewrite -- -a 10 -b 10 - diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy deleted file mode 100644 index f2989edbf6dfd6..00000000000000 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { - String enabled = context.config.otherConfigs.get("enablePaimonTest") - if (enabled == null || !enabled.equalsIgnoreCase("true")) { - logger.info("disabled paimon test") - return - } - String suiteName = "test_paimon_mtmv" - String catalogName = "${suiteName}_catalog" - String mvName = "${suiteName}_mv" - String dbName = context.config.getDbNameByFile(context.file) - - String minio_port = context.config.otherConfigs.get("iceberg_minio_port") - String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - - sql """drop catalog if exists ${catalogName}""" - sql """CREATE CATALOG ${catalogName} PROPERTIES ( - 'type'='paimon', - 'warehouse' = 's3://warehouse/wh/', - "s3.access_key" = "admin", - "s3.secret_key" = "password", - "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", - "s3.region" = "us-east-1" - );""" - - order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format ; """ - - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - partition by(`par`) - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; - """ - def showPartitionsResult = sql """show partitions from ${mvName}""" - logger.info("showPartitionsResult: " + showPartitionsResult.toString()) - assertTrue(showPartitionsResult.toString().contains("p_a")) - assertTrue(showPartitionsResult.toString().contains("p_b")) - - // refresh one partitions - sql """ - REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_one_partition "SELECT * FROM ${mvName} " - - //refresh auto - sql """ - REFRESH MATERIALIZED VIEW ${mvName} auto - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_auto "SELECT * FROM ${mvName} " - order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - - // rebuild catalog, should not Affects MTMV - sql """drop catalog if exists ${catalogName}""" - sql """ - CREATE CATALOG ${catalogName} PROPERTIES ( - 'type'='paimon', - 'warehouse' = 's3://warehouse/wh/', - "s3.access_key" = "admin", - "s3.secret_key" = "password", - "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", - "s3.region" = "us-east-1" - ); - """ - order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - - // should refresh normal after catalog rebuild - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " - - sql """drop materialized view if exists ${mvName};""" - - // not have partition - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; - """ - order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - //should can refresh auto - sql """ - REFRESH MATERIALIZED VIEW ${mvName} auto - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_not_partition "SELECT * FROM ${mvName} " - order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - sql """drop materialized view if exists ${mvName};""" - sql """drop catalog if exists ${catalogName}""" - -} - diff --git a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy deleted file mode 100644 index 985443875c7b26..00000000000000 --- a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite("test_paimon_rewrite_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { - String enabled = context.config.otherConfigs.get("enablePaimonTest") - if (enabled == null || !enabled.equalsIgnoreCase("true")) { - logger.info("disabled paimon test") - return - } - String suiteName = "test_paimon_rewrite_mtmv" - String catalogName = "${suiteName}_catalog" - String mvName = "${suiteName}_mv" - String dbName = context.config.getDbNameByFile(context.file) - - String minio_port = context.config.otherConfigs.get("iceberg_minio_port") - String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - - sql """set materialized_view_rewrite_enable_contain_external_table=true;""" - String mvSql = "SELECT par,count(*) as num FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format group by par;"; - - sql """drop catalog if exists ${catalogName}""" - sql """CREATE CATALOG ${catalogName} PROPERTIES ( - 'type'='paimon', - 'warehouse' = 's3://warehouse/wh/', - "s3.access_key" = "admin", - "s3.secret_key" = "password", - "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", - "s3.region" = "us-east-1" - );""" - - sql """analyze table ${catalogName}.`test_paimon_spark`.test_tb_mix_format with sync""" - sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format modify column par set stats ('row_count'='20');""" - - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - partition by(`par`) - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - ${mvSql} - """ - def showPartitionsResult = sql """show partitions from ${mvName}""" - logger.info("showPartitionsResult: " + showPartitionsResult.toString()) - assertTrue(showPartitionsResult.toString().contains("p_a")) - assertTrue(showPartitionsResult.toString().contains("p_b")) - - // refresh one partitions - sql """ - REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_one_partition "SELECT * FROM ${mvName} " - - def explainOnePartition = sql """ explain ${mvSql} """ - logger.info("explainOnePartition: " + explainOnePartition.toString()) - assertTrue(explainOnePartition.toString().contains("VUNION")) - order_qt_refresh_one_partition_rewrite "${mvSql}" - - mv_rewrite_success("${mvSql}", "${mvName}") - - //refresh auto - sql """ - REFRESH MATERIALIZED VIEW ${mvName} auto - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_auto "SELECT * FROM ${mvName} " - - def explainAllPartition = sql """ explain ${mvSql}; """ - logger.info("explainAllPartition: " + explainAllPartition.toString()) - assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) - order_qt_refresh_all_partition_rewrite "${mvSql}" - - mv_rewrite_success("${mvSql}", "${mvName}") - - sql """drop materialized view if exists ${mvName};""" - sql """drop catalog if exists ${catalogName}""" -} -