diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 7637b569fcc773..ba551ac9c85df1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -53,6 +53,7 @@ import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -208,7 +209,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation, writeMvLock(); try { if (task.getStatus() == TaskStatus.SUCCESS) { - this.status.setState(MTMVState.NORMAL); + this.status.setState(determineStateFrom(relation)); this.status.setSchemaChangeDetail(null); this.status.setRefreshState(MTMVRefreshState.SUCCESS); this.relation = relation; @@ -227,6 +228,19 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation, } } + private MTMVState determineStateFrom(MTMVRelation relation) { + // If any base table is in schema change state, the mv should be in schema change state + boolean hasChange = Optional.ofNullable(relation) + .map(MTMVRelation::getBaseTablesOneLevel) + .map(tables -> tables.stream() + .filter(Objects::nonNull) + .anyMatch(t -> t.getOlapTableState() == OlapTable.OlapTableState.SCHEMA_CHANGE) + ) + .orElse(false); + + return hasChange ? MTMVState.SCHEMA_CHANGE : MTMVState.NORMAL; + } + public Map alterMvProperties(Map mvProperties) { writeMvLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 9e80b5ed6e4a40..fdca54c13784d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -58,6 +58,9 @@ public class BaseTableInfo { private String dbName; @SerializedName("cn") private String ctlName; + // schema change is an asynchronous task, so it is necessary to determine the olapTable status + @SerializedName("ts") + private OlapTable.OlapTableState olapTableState; public BaseTableInfo(TableIf table) { java.util.Objects.requireNonNull(table, "table is null"); @@ -71,6 +74,9 @@ public BaseTableInfo(TableIf table) { this.tableName = table.getName(); this.dbName = database.getFullName(); this.ctlName = catalog.getName(); + if (table instanceof OlapTable) { + this.olapTableState = ((OlapTable) table).getState(); + } } // for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog @@ -82,6 +88,7 @@ public BaseTableInfo(OlapTable table, long dbId) { this.tableName = table.getName(); this.dbName = table.getDBName(); this.ctlName = InternalCatalog.INTERNAL_CATALOG_NAME; + this.olapTableState = table.getState(); } public String getTableName() { @@ -115,6 +122,10 @@ public void setTableName(String tableName) { this.tableName = tableName; } + public OlapTable.OlapTableState getOlapTableState() { + return olapTableState; + } + // if compatible failed due catalog dropped, ctlName will be null public boolean isInternalTable() { if (!StringUtils.isEmpty(ctlName)) { @@ -194,6 +205,9 @@ public void compatible(CatalogMgr catalogMgr) throws Exception { this.ctlName = catalog.getName(); this.dbName = db.getFullName(); this.tableName = table.getName(); + if (table instanceof OlapTable) { + this.olapTableState = ((OlapTable) table).getState(); + } } catch (AnalysisException e) { String msg = String.format( "Failed to get name based on id during compatibility process, ctlId: %s, dbId: %s, tableId: %s", diff --git a/regression-test/suites/mtmv_p0/schema_change_check/schema_change_check.groovy b/regression-test/suites/mtmv_p0/schema_change_check/schema_change_check.groovy new file mode 100644 index 00000000000000..466fdd7062b73e --- /dev/null +++ b/regression-test/suites/mtmv_p0/schema_change_check/schema_change_check.groovy @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("schema_change_check") { + sql """ drop database if exists schema_change_check;""" + sql """ create database schema_change_check + PROPERTIES ( + "replication_allocation" = "tag.location.default:1" + ); + """ + sql """ use schema_change_check;""" + + sql """ drop table if exists test_base; """ + // build base table + sql """ + CREATE TABLE `test_base` ( + `k1` int NULL, + `k2` varchar(20) NULL, + `v1` int NULL, + `v2` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`); + """ + + sql """ insert into test_base values(1,2,3,4),(2,3,5,6);""" + // build MTMV + sql """ CREATE MATERIALIZED VIEW test_asy_mv BUILD IMMEDIATE REFRESH AUTO ON SCHEDULE EVERY 1 MINUTE AS select * from test_base where k1 > 1; """ + sql """ REFRESH MATERIALIZED VIEW test_asy_mv complete;""" + def job_name = getJobName("schema_change_check", "test_asy_mv"); + waitingMTMVTaskFinished(job_name) + explain { + sql("select * from test_base where k1 > 1") + // hit mtmv + contains "test_asy_mv" + } + + // schema change + sql """ alter table test_base modify column k2 int key;""" + sql """ REFRESH MATERIALIZED VIEW test_asy_mv complete;""" + waitingMTMVTaskFinished(job_name) + explain { + sql("select * from test_base where k1 > 1") + // should be failed to hit mtmv + notContains "test_asy_mv" + } + // make sure the schema_change job is finished + sleep(10000) + sql """ REFRESH MATERIALIZED VIEW test_asy_mv complete;""" + sleep(10000) + explain { + sql("select * from test_base where k1 > 1") + // should be failed to hit mtmv + notContains "test_asy_mv" + } + + sql """ drop MATERIALIZED VIEW test_asy_mv; """ + sql """ drop table test_base;""" + sql """ drop database schema_change_check;""" +}