Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](mtmv)fix nested mtmv not refresh #40433

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3144,9 +3144,10 @@ public List<Column> getPartitionColumns() {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
return new MTMVVersionSnapshot(visibleVersion, partitionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.doris.mtmv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.AnalysisException;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -74,6 +77,46 @@ public String toString() {
}

public void compatible(MTMV mtmv) {
try {
// snapshot add partitionId resolve problem of insert overwrite
zddr marked this conversation as resolved.
Show resolved Hide resolved
compatiblePartitions(mtmv);
} catch (Throwable e) {
LOG.warn("MTMV compatiblePartitions failed, mtmv: {}", mtmv.getName(), e);
}
try {
// change table id to BaseTableInfo
compatibleTables(mtmv);
} catch (Throwable e) {
LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e);
}
}

private void compatiblePartitions(MTMV mtmv) throws AnalysisException {
if (!checkHasDataWithoutPartitionId()) {
return;
}
OlapTable relatedTable = (OlapTable) mtmv.getMvPartitionInfo().getRelatedTable();
for (Entry<String, MTMVSnapshotIf> entry : partitions.entrySet()) {
MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue();
if (versionSnapshot.getId() == 0) {
Partition partition = relatedTable.getPartition(entry.getKey());
if (partition != null) {
(versionSnapshot).setId(partition.getId());
}
}
}
}

private boolean checkHasDataWithoutPartitionId() {
for (MTMVSnapshotIf snapshot : partitions.values()) {
if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) {
return true;
}
}
return false;
}

private void compatibleTables(MTMV mtmv) {
if (tables.size() == tablesInfo.size()) {
return;
}
Expand All @@ -87,7 +130,7 @@ public void compatible(MTMV mtmv) {
if (tableInfo.isPresent()) {
tablesInfo.put(tableInfo.get(), entry.getValue());
} else {
LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(),
LOG.warn("MTMV compatibleTables failed, tableId: {}, relationTables: {}", entry.getKey(),
relation.getBaseTablesOneLevel());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,30 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf {
@SerializedName("v")
private long version;

// The partition version after insert overwrite is 1,
// which may cause the upper level materialized view to be unaware of changes in the data at the bottom level.
// However, the partition ID after overwrite will change, so the partition ID should be added.
// only for partition, table will always 0
@SerializedName("id")
private long id;

public MTMVVersionSnapshot(long version) {
this.version = version;
}

public MTMVVersionSnapshot(long version, long id) {
this.version = version;
this.id = id;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -37,18 +57,19 @@ public boolean equals(Object o) {
return false;
}
MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
return version == that.version;
return version == that.version && id == that.id;
}

@Override
public int hashCode() {
return Objects.hashCode(version);
return Objects.hashCode(version, id);
}

@Override
public String toString() {
return "MTMVVersionSnapshot{"
+ "version=" + version
+ ", id=" + id
+ '}';
}
}
11 changes: 11 additions & 0 deletions regression-test/data/mtmv_p0/test_multi_level_mtmv.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@
-- !mv2_should_one_partition --
["p_2"]

-- !mv1_should_one_partition_again --
["p_2"]

-- !mv2_should_one_partition_again --
["p_2"]

-- !mv2_again --
1 1
2 2
2 3

-- !status1 --
multi_level_mtmv1 SCHEMA_CHANGE SUCCESS

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !refresh_init --
1 2017-01-15 1
2 2017-02-15 2
3 2017-03-15 3

-- !mtmv_sync --
true

16 changes: 16 additions & 0 deletions regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ suite("test_multi_level_mtmv") {
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"

// insert into p2 again, check partition version if change
sql """
INSERT INTO ${tableName} VALUES(2,3);
"""
sql """
REFRESH MATERIALIZED VIEW ${mv1} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv1)
order_qt_mv1_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv1}' order by CreateTime desc limit 1"
sql """
REFRESH MATERIALIZED VIEW ${mv2} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"
order_qt_mv2_again "select * from ${mv2}"

// drop table
sql """
drop table ${tableName}
Expand Down
71 changes: 71 additions & 0 deletions regression-test/suites/mtmv_up_down_olap_p0/load.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_upgrade_downgrade_prepare_olap_mtmv","p0,mtmv,restart_fe") {
String suiteName = "mtmv_up_down_olap"
String mvName = "${suiteName}_mtmv"
String tableName = "${suiteName}_table"
String tableName2 = "${suiteName}_table2"

sql """drop materialized view if exists ${mvName};"""
sql """drop table if exists `${tableName}`"""
sql """drop table if exists `${tableName2}`"""

sql """
CREATE TABLE `${tableName}` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
`date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
`num` SMALLINT NOT NULL COMMENT '\"数量\"'
) ENGINE=OLAP
DUPLICATE KEY(`user_id`, `date`, `num`)
COMMENT 'OLAP'
PARTITION BY RANGE(`date`)
(PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01')))
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
PROPERTIES ('replication_num' = '1') ;
"""
sql """
insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);
"""

sql """
CREATE TABLE `${tableName2}` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
`age` SMALLINT NOT NULL COMMENT '\"年龄\"'
) ENGINE=OLAP
DUPLICATE KEY(`user_id`, `age`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
PROPERTIES ('replication_num' = '1') ;
"""
sql """
insert into ${tableName2} values(1,1),(2,2),(3,3);
"""

sql """
CREATE MATERIALIZED VIEW ${mvName}
REFRESH AUTO ON MANUAL
partition by(`date`)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
SELECT a.* FROM ${tableName} a inner join ${tableName2} b on a.user_id=b.user_id;
"""
waitingMTMVTaskFinishedByMvName(mvName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_upgrade_downgrade_olap_mtmv","p0,mtmv,restart_fe") {
String suiteName = "mtmv_up_down_olap"
String dbName = context.config.getDbNameByFile(context.file)
String mvName = "${suiteName}_mtmv"
String tableName = "${suiteName}_table"
// test data is normal
order_qt_refresh_init "SELECT * FROM ${mvName}"
// test is sync
order_qt_mtmv_sync "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'"
sql """
REFRESH MATERIALIZED VIEW ${mvName} complete
"""
// test can refresh success
waitingMTMVTaskFinishedByMvName(mvName)
}
Loading