diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 1d09224e6fa38f..54b620418893b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -64,6 +64,7 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -526,10 +527,18 @@ protected void runRunningJob() throws AlterCancelException { List tasks = rollupBatchTask.getUnfinishedTasks(2000); ensureCloudClusterExist(tasks); for (AgentTask task : tasks) { - if (task.getFailedTimes() > 0) { + int maxFailedTimes = 0; + if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) { + if (task.getErrorCode() != null && task.getErrorCode() + .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) { + maxFailedTimes = Config.schema_change_max_retry_time; + } + } + if (task.getFailedTimes() > maxFailedTimes) { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); - LOG.warn("rollup task failed: " + task.getErrorMsg()); + LOG.warn("rollup task failed, failedTimes: {}, maxFailedTimes: {}, err: {}", + task.getFailedTimes(), maxFailedTimes, task.getErrorMsg()); List failedBackends = failedTabletBackends.get(task.getTabletId()); if (failedBackends == null) { failedBackends = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 89e3c6249c83a0..8141936033cc91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -583,15 +583,14 @@ protected void runRunningJob() throws AlterCancelException { if (task.getErrorCode() != null && task.getErrorCode() .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) { maxFailedTimes = Config.schema_change_max_retry_time; - LOG.warn("schema change task failed: {}, set maxFailedTimes {}", task.getErrorMsg(), - maxFailedTimes); } } if (task.getFailedTimes() > maxFailedTimes) { task.setFinished(true); if (!FeConstants.runningUnitTest) { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); - LOG.warn("schema change task failed: {}", task.getErrorMsg()); + LOG.warn("schema change task failed, failedTimes: {}, maxFailedTimes: {}, err: {}", + task.getFailedTimes(), maxFailedTimes, task.getErrorMsg()); List failedBackends = failedTabletBackends.get(task.getTabletId()); if (failedBackends == null) { failedBackends = Lists.newArrayList(); diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out new file mode 100644 index 00000000000000..47a82e61109431 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query1_before -- +\N 1 2 3 2023-10-17 ok +1 \N 3 1 2023-10-17 ok +1 2 3 \N 2023-10-17 ok +1 3 2 2 2023-10-17 ok +2 3 2 1 2023-10-18 ok +3 1 1 2 2023-10-19 ko +3 3 \N 2 2023-10-19 ko + +-- !query1_after -- +\N 1 2 3 2023-10-17 ok +1 \N 3 1 2023-10-17 ok +1 2 3 \N 2023-10-17 ok +1 3 2 2 2023-10-17 ok +2 3 2 1 2023-10-18 ok +3 1 1 2 2023-10-19 ko +3 3 \N 2 2023-10-19 ko + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy new file mode 100644 index 00000000000000..eec2a0adb6fd1a --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_sync_mv", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + + def table1 = "test_cloud_mow_sync_mv" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ + CREATE TABLE ${table1} ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_shipdate` DATE not NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL + ) ENGINE=OLAP + unique KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate ) + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true"); + """ + + sql """ + insert into ${table1} values + (null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'), + (3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'), + (1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx'), + (3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx'), + (1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'); + """ + + def mv1 = """ + select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate, + substring(concat(l_returnflag, l_linestatus), 1) + from ${table1}; + """ + + def query1 = """ + select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate, + substring(concat(l_returnflag, l_linestatus), 1) + from ${table1}; + """ + order_qt_query1_before "${query1}" + + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job") + Thread.sleep(1000) + + def t1 = Thread.start { + Thread.sleep(5000) + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job") + } + + create_sync_mv(db, table1, "mv1", mv1) + t1.join() + + explain { + sql("""${query1}""") + check {result -> + result.contains("(mv1)") && result.contains("__DORIS_DELETE_SIGN__") + } + } + order_qt_query1_after "${query1}" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } +}