diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9eaf1b7df32ef..5645c4e89e726c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -18,10 +18,22 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.mtmv.MTMVBaseTableIf; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -30,25 +42,35 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -public class PaimonExternalTable extends ExternalTable { +public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); @@ -73,18 +95,95 @@ public Table getPaimonTable() { return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } + private PaimonPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new PaimonPartitionInfo(); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + private List getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return Lists.newArrayList(); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId(); + } + @Override public Optional initSchema() { Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); TableSchema schema = ((FileStoreTable) paimonTable).schema(); List columns = schema.fields(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys()); + List partitionColumns = Lists.newArrayList(); for (DataField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(), + Column column = new Column(field.name().toLowerCase(), paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id())); + field.id()); + tmpSchema.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + try { + // after 0.9.0 paimon will support table.getLatestSnapshotId() + long latestSnapshotId = loadLatestSnapshotId(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns); + return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId, + partitionInfo)); + } catch (IOException | AnalysisException e) { + LOG.warn(e); + return Optional.empty(); + } + } + + private long loadLatestSnapshotId() throws IOException { + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, + name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId + List rows = PaimonUtil.read(table, new int[][] {{0}}); + long latestSnapshotId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + } + } + return latestSnapshotId; + } + + private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List paimonPartitions = loadPartitions(); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions() + throws IOException { + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, + name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); } - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); + return res; } private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { @@ -205,4 +304,56 @@ public long fetchRowCount() { } return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map getAndCopyPartitionItems() { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + @Override + public PartitionType getPartitionType() { + return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames() { + return getPartitionColumnsFromCache().stream() + .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns() { + return getPartitionColumnsFromCache(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { + PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (paimonPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + @Override + public boolean isPartitionColumnAllowNull() { + // Paimon will write to the 'null' partition regardless of whether it is' null or 'null'. + // The logic is inconsistent with Doris' empty partition logic, so it needs to return false. + // However, when Spark creates Paimon tables, specifying 'not null' does not take effect. + // In order to successfully create the materialized view, false is returned here. + // The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data. + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java new file mode 100644 index 00000000000000..545448199b3375 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table +public class PaimonPartition { + // Partition values, for example: [1, dd] + private final String partitionValues; + // The amount of data in the partition + private final long recordCount; + // Partition file size + private final long fileSizeInBytes; + // Number of partition files + private final long fileCount; + // Last update time of partition + private final long lastUpdateTime; + + public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime) { + this.partitionValues = partitionValues; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + } + + public String getPartitionValues() { + return partitionValues; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java new file mode 100644 index 00000000000000..8f54f0834e481b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class PaimonPartitionInfo { + private Map nameToPartitionItem; + private Map nameToPartition; + + public PaimonPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToPartition = Maps.newHashMap(); + } + + public PaimonPartitionInfo(Map nameToPartitionItem, + Map nameToPartition) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToPartition = nameToPartition; + } + + public Map getNameToPartitionItem() { + return nameToPartitionItem; + } + + public Map getNameToPartition() { + return nameToPartition; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index aaaefe7f32db2b..20d27b2425df24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -27,13 +27,34 @@ public class PaimonSchemaCacheValue extends SchemaCacheValue { private Table paimonTable; + private List partitionColumns; + private PaimonPartitionInfo partitionInfo; - public PaimonSchemaCacheValue(List schema, Table paimonTable) { + private long snapshootId; + + public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable, + long snapshootId, + PaimonPartitionInfo partitionInfo) { super(schema); + this.partitionColumns = partitionColumns; this.paimonTable = paimonTable; + this.snapshootId = snapshootId; + this.partitionInfo = partitionInfo; } public Table getPaimonTable() { return paimonTable; } + + public List getPartitionColumns() { + return partitionColumns; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public long getSnapshootId() { + return snapshootId; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java new file mode 100644 index 00000000000000..8b7017cac29486 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HiveUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Projection; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class PaimonUtil { + public static List read( + Table table, @Nullable int[][] projection, Pair, String>... dynamicOptions) + throws IOException { + Map options = new HashMap<>(); + for (Pair, String> pair : dynamicOptions) { + options.put(pair.getKey().key(), pair.getValue()); + } + table = table.copy(options); + ReadBuilder readBuilder = table.newReadBuilder(); + if (projection != null) { + readBuilder.withProjection(projection); + } + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } + + + /* + https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table + +---------------+----------------+--------------------+--------------------+------------------------+ + | partition | record_count | file_size_in_bytes| file_count| last_update_time| + +---------------+----------------+--------------------+--------------------+------------------------+ + | [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400| + +---------------+----------------+--------------------+--------------------+------------------------+ + org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "record_count", new BigIntType(false)), + new DataField(2, "file_size_in_bytes", new BigIntType(false)), + new DataField(3, "file_count", new BigIntType(false)), + new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS()))); + */ + public static PaimonPartition rowToPartition(InternalRow row) { + String partition = row.getString(0).toString(); + long recordCount = row.getLong(1); + long fileSizeInBytes = row.getLong(2); + long fileCount = row.getLong(3); + long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond(); + return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime); + } + + public static PaimonPartitionInfo generatePartitionInfo(List partitionColumns, + List paimonPartitions) throws AnalysisException { + Map nameToPartitionItem = Maps.newHashMap(); + Map nameToPartition = Maps.newHashMap(); + PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition); + if (CollectionUtils.isEmpty(partitionColumns)) { + return partitionInfo; + } + for (PaimonPartition paimonPartition : paimonPartitions) { + String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues()); + nameToPartition.put(partitionName, paimonPartition); + nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns)); + } + return partitionInfo; + } + + private static String getPartitionName(List partitionColumns, String partitionValueStr) { + Preconditions.checkNotNull(partitionValueStr); + String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") + .split(","); + Preconditions.checkState(partitionColumns.size() == partitionValues.length); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColumns.size(); ++i) { + if (i != 0) { + sb.append("/"); + } + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); + } + return sb.toString(); + } + + public static ListPartitionItem toListPartitionItem(String partitionName, List partitionColumns) + throws AnalysisException { + List types = partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()); + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + List partitionValues = HiveUtil.toPartitionValues(partitionName); + Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types); + List values = Lists.newArrayListWithExpectedSize(types.size()); + for (String partitionValue : partitionValues) { + // null will in partition 'null' + // "null" will in partition 'null' + // NULL will in partition 'null' + // "NULL" will in partition 'NULL' + // values.add(new PartitionValue(partitionValue, "null".equals(partitionValue))); + values.add(new PartitionValue(partitionValue, false)); + } + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); + ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key)); + return listPartitionItem; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java new file mode 100644 index 00000000000000..789af7bf8357ac --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.paimon.PaimonPartition; +import org.apache.doris.datasource.paimon.PaimonPartitionInfo; +import org.apache.doris.datasource.paimon.PaimonUtil; + +import com.google.common.collect.Lists; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class PaimonUtilTest { + + @Test + public void testGeneratePartitionInfo() throws AnalysisException { + Column k1 = new Column("k1", PrimitiveType.INT); + Column k2 = new Column("k2", PrimitiveType.VARCHAR); + List partitionColumns = Lists.newArrayList(k1, k2); + PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5); + List paimonPartitions = Lists.newArrayList(p1); + PaimonPartitionInfo partitionInfo = PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + String expectPartitionName = "k1=1/k2=aa"; + Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName)); + PartitionItem partitionItem = partitionInfo.getNameToPartitionItem().get(expectPartitionName); + List keys = partitionItem.getItems(); + Assert.assertEquals(1, keys.size()); + PartitionKey partitionKey = keys.get(0); + List exprs = partitionKey.getKeys(); + Assert.assertEquals(2, exprs.size()); + Assert.assertEquals(1, exprs.get(0).getLongValue()); + Assert.assertEquals("aa", exprs.get(1).getStringValue()); + } + + @Test + public void testRowToPartition() { + GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 3L, 4L, Timestamp.fromEpochMillis(5L)); + PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row); + Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues()); + Assert.assertEquals(2L, paimonPartition.getRecordCount()); + Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes()); + Assert.assertEquals(4L, paimonPartition.getFileCount()); + Assert.assertEquals(5L, paimonPartition.getLastUpdateTime()); + } +} diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_mtmv.out index c654cb01214f57..c28b7cb7baca22 100644 --- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out +++ b/regression-test/data/mtmv_p0/test_paimon_mtmv.out @@ -1,9 +1,113 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !catalog -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 +-- !base_table -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b --- !mtmv -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 +-- !refresh_one_partition -- +1 2 a +10 1 a +2 2 a +3 2 a +4 2 a +5 2 a +6 1 a +7 1 a +8 1 a +9 1 a + +-- !refresh_auto -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !not_partition_before -- +false + +-- !not_partition -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !not_partition_after -- +true diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index e84eb497b2c7b1..f2989edbf6dfd6 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -15,48 +15,105 @@ // specific language governing permissions and limitations // under the License. -suite("test_paimon_mtmv", "p0,external,paimon,external_docker,external_docker_hive") { +suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { String enabled = context.config.otherConfigs.get("enablePaimonTest") - logger.info("enabled: " + enabled) + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - logger.info("externalEnvIp: " + externalEnvIp) - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") - logger.info("hdfs_port: " + hdfs_port) - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String catalog_name = "paimon_mtmv_catalog"; - String mvName = "test_paimon_mtmv" - String dbName = "regression_test_mtmv_p0" - String paimonDb = "db1" - String paimonTable = "all_table" - sql """drop catalog if exists ${catalog_name} """ - - sql """create catalog if not exists ${catalog_name} properties ( - "type" = "paimon", - "paimon.catalog.type"="filesystem", - "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" );""" - order_qt_catalog """select * from ${catalog_name}.${paimonDb}.${paimonTable}""" - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable}; - """ - - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - def jobName = getJobName(dbName, mvName); - waitingMTMVTaskFinished(jobName) - order_qt_mtmv "SELECT * FROM ${mvName}" - - sql """drop materialized view if exists ${mvName};""" - sql """ drop catalog if exists ${catalog_name} """ - } + order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format ; """ + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // rebuild catalog, should not Affects MTMV + sql """drop catalog if exists ${catalogName}""" + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + ); + """ + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " + + sql """drop materialized view if exists ${mvName};""" + + // not have partition + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + //should can refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_not_partition "SELECT * FROM ${mvName} " + order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" + }