Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -208,7 +209,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
writeMvLock();
try {
if (task.getStatus() == TaskStatus.SUCCESS) {
this.status.setState(MTMVState.NORMAL);
this.status.setState(determineStateFrom(relation));
this.status.setSchemaChangeDetail(null);
this.status.setRefreshState(MTMVRefreshState.SUCCESS);
this.relation = relation;
Expand All @@ -227,6 +228,19 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
}
}

private MTMVState determineStateFrom(MTMVRelation relation) {
// If any base table is in schema change state, the mv should be in schema change state
boolean hasChange = Optional.ofNullable(relation)
.map(MTMVRelation::getBaseTablesOneLevel)
.map(tables -> tables.stream()
.filter(Objects::nonNull)
.anyMatch(t -> t.getOlapTableState() == OlapTable.OlapTableState.SCHEMA_CHANGE)
)
.orElse(false);

return hasChange ? MTMVState.SCHEMA_CHANGE : MTMVState.NORMAL;
}

public Map<String, String> alterMvProperties(Map<String, String> mvProperties) {
writeMvLock();
try {
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class BaseTableInfo {
private String dbName;
@SerializedName("cn")
private String ctlName;
// schema change is an asynchronous task, so it is necessary to determine the olapTable status
@SerializedName("ts")
private OlapTable.OlapTableState olapTableState;

public BaseTableInfo(TableIf table) {
java.util.Objects.requireNonNull(table, "table is null");
Expand All @@ -71,6 +74,9 @@ public BaseTableInfo(TableIf table) {
this.tableName = table.getName();
this.dbName = database.getFullName();
this.ctlName = catalog.getName();
if (table instanceof OlapTable) {
this.olapTableState = ((OlapTable) table).getState();
}
}

// for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog
Expand All @@ -82,6 +88,7 @@ public BaseTableInfo(OlapTable table, long dbId) {
this.tableName = table.getName();
this.dbName = table.getDBName();
this.ctlName = InternalCatalog.INTERNAL_CATALOG_NAME;
this.olapTableState = table.getState();
}

public String getTableName() {
Expand Down Expand Up @@ -115,6 +122,10 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public OlapTable.OlapTableState getOlapTableState() {
return olapTableState;
}

// if compatible failed due catalog dropped, ctlName will be null
public boolean isInternalTable() {
if (!StringUtils.isEmpty(ctlName)) {
Expand Down Expand Up @@ -194,6 +205,9 @@ public void compatible(CatalogMgr catalogMgr) throws Exception {
this.ctlName = catalog.getName();
this.dbName = db.getFullName();
this.tableName = table.getName();
if (table instanceof OlapTable) {
this.olapTableState = ((OlapTable) table).getState();
}
} catch (AnalysisException e) {
String msg = String.format(
"Failed to get name based on id during compatibility process, ctlId: %s, dbId: %s, tableId: %s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("schema_change_check") {
sql """ drop database if exists schema_change_check;"""
sql """ create database schema_change_check
PROPERTIES (
"replication_allocation" = "tag.location.default:1"
);
"""
sql """ use schema_change_check;"""

sql """ drop table if exists test_base; """
// build base table
sql """
CREATE TABLE `test_base` (
`k1` int NULL,
`k2` varchar(20) NULL,
`v1` int NULL,
`v2` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`);
"""

sql """ insert into test_base values(1,2,3,4),(2,3,5,6);"""
// build MTMV
sql """ CREATE MATERIALIZED VIEW test_asy_mv BUILD IMMEDIATE REFRESH AUTO ON SCHEDULE EVERY 1 MINUTE AS select * from test_base where k1 > 1; """
sql """ REFRESH MATERIALIZED VIEW test_asy_mv complete;"""
def job_name = getJobName("schema_change_check", "test_asy_mv");
waitingMTMVTaskFinished(job_name)
explain {
sql("select * from test_base where k1 > 1")
// hit mtmv
contains "test_asy_mv"
}

// schema change
sql """ alter table test_base modify column k2 int key;"""
sql """ REFRESH MATERIALIZED VIEW test_asy_mv complete;"""
waitingMTMVTaskFinished(job_name)
explain {
sql("select * from test_base where k1 > 1")
// should be failed to hit mtmv
notContains "test_asy_mv"
}
// make sure the schema_change job is finished
sleep(10000)
sql """ REFRESH MATERIALIZED VIEW test_asy_mv complete;"""
sleep(10000)
explain {
sql("select * from test_base where k1 > 1")
// should be failed to hit mtmv
notContains "test_asy_mv"
}

sql """ drop MATERIALIZED VIEW test_asy_mv; """
sql """ drop table test_base;"""
sql """ drop database schema_change_check;"""
}