diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 8db397aa577ccd..ca33cbc887d361 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -197,18 +197,19 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); if (config::enable_auto_clone_on_compaction_missing_version) { + int64_t max_version = tablet()->max_version_unlocked(); LOG_INFO("cumulative compaction submit missing rowset clone task.") .tag("tablet_id", _tablet->tablet_id()) - .tag("version", missing_versions.back().first) + .tag("max_version", max_version) .tag("replica_id", tablet()->replica_id()) .tag("partition_id", _tablet->partition_id()) .tag("table_id", _tablet->table_id()); - Status st = _engine.submit_clone_task(tablet(), missing_versions.back().first); + Status st = _engine.submit_clone_task(tablet(), max_version); if (!st) { LOG_WARNING("cumulative compaction failed to submit missing rowset clone task.") .tag("st", st.msg()) .tag("tablet_id", _tablet->tablet_id()) - .tag("version", missing_versions.back().first) + .tag("max_version", max_version) .tag("replica_id", tablet()->replica_id()) .tag("partition_id", _tablet->partition_id()) .tag("table_id", _tablet->table_id()); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 13e7dcd97aab4a..992e74109cced6 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -220,17 +220,17 @@ Status EnginePublishVersionTask::execute() { if (config::enable_auto_clone_on_mow_publish_missing_version) { LOG_INFO("mow publish submit missing rowset clone task.") .tag("tablet_id", tablet->tablet_id()) - .tag("version", version.first - 1) + .tag("version", version.second) .tag("replica_id", tablet->replica_id()) .tag("partition_id", tablet->partition_id()) .tag("table_id", tablet->table_id()); - Status st = _engine.submit_clone_task(tablet.get(), version.first - 1); + Status st = _engine.submit_clone_task(tablet.get(), version.second); if (!st) { LOG_WARNING( "mow publish failed to submit missing rowset clone task.") .tag("st", st.msg()) .tag("tablet_id", tablet->tablet_id()) - .tag("version", version.first - 1) + .tag("version", version.second) .tag("replica_id", tablet->replica_id()) .tag("partition_id", tablet->partition_id()) .tag("table_id", tablet->table_id()); diff --git a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy index a7f060a110888b..f15de709b3da96 100644 --- a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy @@ -39,7 +39,7 @@ suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') { assertNotNull(normalBe) try { - def tableName = "test_compaction_clone_missing_rowset" + def tableName = "test_compaction_clone_missing_rowset_fault_injection" sql """ DROP TABLE IF EXISTS ${tableName} force""" sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( diff --git a/regression-test/suites/fault_injection_p0/test_compaction_full_clone_missing_rowset_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_compaction_full_clone_missing_rowset_fault_injection.groovy new file mode 100644 index 00000000000000..1e8bdd4f37342a --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_compaction_full_clone_missing_rowset_fault_injection.groovy @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_compaction_full_clone_missing_rowset_fault_injection', 'docker') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + options.feConfigs += [ "disable_tablet_scheduler=true" ] + options.beConfigs += [ "enable_auto_clone_on_compaction_missing_version=true" ] + options.beConfigs += [ "tablet_rowset_stale_sweep_time_sec=0" ] + options.beConfigs += [ "tablet_rowset_stale_sweep_by_size=true" ] + options.beConfigs += [ "tablet_rowset_stale_sweep_threshold_size=0" ] + options.beNum = 3 + docker(options) { + + def injectBe = null + def normalBe1 = null + def normalBe2 = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe1 = backends[1] + assertNotNull(normalBe1) + normalBe2 = backends[2] + assertNotNull(normalBe2) + + try { + def tableName = "test_compaction_full_clone_missing_rowset_fault_injection" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "3", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"]) + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + sql """ INSERT INTO ${tableName} VALUES (5,0)""" + sql """ INSERT INTO ${tableName} VALUES (6,0)""" + sql """ INSERT INTO ${tableName} VALUES (7,0)""" + sql """ INSERT INTO ${tableName} VALUES (8,0)""" + sql """ INSERT INTO ${tableName} VALUES (9,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def tabletId = array[0].TabletId + + // normal BEs compaction + logger.info("normal BE run cumu compaction:" + tabletId) + def (code, out, err) = be_run_cumulative_compaction(normalBe1.Host, normalBe1.HttpPort, tabletId) + logger.info("normal BE1 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + (code, out, err) = be_run_cumulative_compaction(normalBe2.Host, normalBe2.HttpPort, tabletId) + logger.info("normal BE2 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + logger.info("normal BE show:" + tabletId) + (code, out, err) = be_show_tablet_status(normalBe1.Host, normalBe1.HttpPort, tabletId) + logger.info("normal BE1 show: code=" + code + ", out=" + out + ", err=" + err) + (code, out, err) = be_show_tablet_status(normalBe2.Host, normalBe2.HttpPort, tabletId) + logger.info("normal BE2 show: code=" + code + ", out=" + out + ", err=" + err) + + sleep(10000) + + // 1st check rowsets + logger.info("1st show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + // missing rowset [3-5] + assertTrue(out.contains("[3-5]")) + assertTrue(out.contains("[6-6]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-9]")) + assertTrue(out.contains("[10-10]")) + + logger.info("1st run cumu compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + sleep(30000) + + // 2nd check rowsets + logger.info("2nd show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-10]")) + + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + } + } + } +} diff --git a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset_fault_injection.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy rename to regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset_fault_injection.groovy diff --git a/regression-test/suites/fault_injection_p0/test_mow_publish_full_clone_missing_rowset_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_mow_publish_full_clone_missing_rowset_fault_injection.groovy new file mode 100644 index 00000000000000..e582d3339bf97a --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_mow_publish_full_clone_missing_rowset_fault_injection.groovy @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_mow_publish_full_clone_missing_rowset_fault_injection', 'docker') { + + def set_be_param = { paramName, paramValue, beIp, bePort -> + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + options.feConfigs += [ "disable_tablet_scheduler=true" ] + options.beConfigs += [ "enable_auto_clone_on_compaction_missing_version=true" ] + options.beConfigs += [ "tablet_rowset_stale_sweep_time_sec=0" ] + options.beConfigs += [ "tablet_rowset_stale_sweep_by_size=true" ] + options.beConfigs += [ "tablet_rowset_stale_sweep_threshold_size=0" ] + options.beNum = 3 + docker(options) { + + def injectBe = null + def normalBe1 = null + def normalBe2 = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe1 = backends[1] + assertNotNull(normalBe1) + normalBe2 = backends[2] + assertNotNull(normalBe2) + + try { + def tableName = "test_mow_publish_full_clone_missing_rowset_fault_injection" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "3", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"]) + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + sql """ INSERT INTO ${tableName} VALUES (5,0)""" + sql """ INSERT INTO ${tableName} VALUES (6,0)""" + sql """ INSERT INTO ${tableName} VALUES (7,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def tabletId = array[0].TabletId + + // normal BEs compaction + logger.info("normal BE run cumu compaction:" + tabletId) + def (code, out, err) = be_run_cumulative_compaction(normalBe1.Host, normalBe1.HttpPort, tabletId) + logger.info("normal BE1 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + (code, out, err) = be_run_cumulative_compaction(normalBe2.Host, normalBe2.HttpPort, tabletId) + logger.info("normal BE2 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + logger.info("normal BE show:" + tabletId) + (code, out, err) = be_show_tablet_status(normalBe1.Host, normalBe1.HttpPort, tabletId) + logger.info("normal BE1 show: code=" + code + ", out=" + out + ", err=" + err) + (code, out, err) = be_show_tablet_status(normalBe2.Host, normalBe2.HttpPort, tabletId) + logger.info("normal BE2 show: code=" + code + ", out=" + out + ", err=" + err) + + sleep(10000) + + // 1st inject be check rowsets + logger.info("1st inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertFalse(out.contains("[3-3]")) + assertFalse(out.contains("[4-4]")) + assertFalse(out.contains("[5-5]")) + assertFalse(out.contains("[6-6]")) + assertFalse(out.contains("[7-7]")) + + set_be_param("enable_auto_clone_on_mow_publish_missing_version", "true", injectBe.Host, injectBe.HttpPort); + Thread.sleep(10000) + // submit clone task + sql """ INSERT INTO ${tableName} VALUES (8,0)""" + + sleep(30000) + + // 2nd inject be check rowsets + logger.info("2nd inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-8]")) + assertTrue(out.contains("[9-9]")) + + // inject be compaction + logger.info("run cumu compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + logger.info("3rd inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("3rd inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-8]")) + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + } + } + } +} \ No newline at end of file