diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index 24bd61db8fafe6..f4178320044ea6 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -219,6 +219,17 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point( int64_t last_cumulative_point) { TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), output_rowset.get(), last_cumulative_point); + DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", { + auto target_tablet_id = dp->param("tablet_id", -1); + auto cumu_point = dp->param("cumu_point", -1); + if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) { + LOG_INFO( + "[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] " + "tablet_id={}, cumu_point={}", + target_tablet_id, cumu_point); + return cumu_point; + } + }); // for MoW table, if there's too many versions, the delete bitmap will grow to // a very big size, which may cause the tablet meta too big and the `save_meta` // operation too slow. diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 7358f6d19156a1..08e43ab2142ae9 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -149,6 +149,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() { } Status CloudFullCompaction::execute_compact() { + DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", { + auto target_tablet_id = dp->param("tablet_id", -1); + LOG_INFO( + "[verbose] CloudFullCompaction::execute_compact.block, target_tablet_id={}, " + "tablet_id={}", + target_tablet_id, cloud_tablet()->tablet_id()); + if (target_tablet_id == cloud_tablet()->tablet_id()) { + DBUG_BLOCK; + } + }); TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl", Status::OK(), this); #ifndef __APPLE__ @@ -272,6 +282,7 @@ Status CloudFullCompaction::modify_rowsets() { cloud_tablet()->delete_rowsets(_input_rowsets, wrlock); cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock); cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt()); + cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt()); cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point()); if (output_rowset_delete_bitmap) { _tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap); diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 904e8e2e099405..a7a560f67193c6 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -521,6 +521,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, } req.set_base_compaction_cnt(tablet->base_compaction_cnt()); req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); + req.set_full_compaction_cnt(tablet->full_compaction_cnt()); req.set_cumulative_point(tablet->cumulative_layer_point()); } req.set_end_version(-1); @@ -772,6 +773,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); + tablet->set_full_compaction_cnt(stats.full_compaction_cnt()); tablet->set_cumulative_layer_point(stats.cumulative_point()); tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), stats.num_rows(), stats.data_size()); diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index d1ea7dec379684..1f3c202d06a6ef 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -123,12 +123,14 @@ class CloudTablet final : public BaseTablet { int64_t max_version_unlocked() const override { return _max_version; } int64_t base_compaction_cnt() const { return _base_compaction_cnt; } int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; } + int64_t full_compaction_cnt() const { return _full_compaction_cnt; } int64_t cumulative_layer_point() const { return _cumulative_point.load(std::memory_order_relaxed); } void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } + void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; } void set_cumulative_layer_point(int64_t new_point); int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; } @@ -322,6 +324,7 @@ class CloudTablet final : public BaseTablet { int64_t _base_compaction_cnt = 0; int64_t _cumulative_compaction_cnt = 0; + int64_t _full_compaction_cnt = 0; int64_t _max_version = -1; int64_t _base_size = 0; int64_t _alter_version = -1; diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 2fa746e5f69f18..68ee07cd2680c5 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1603,6 +1603,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, std::vector> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t cc_cnt, int64_t req_cp, int64_t cp, + int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, int64_t req_end) { using Version = std::pair; // combine `v1` `v2` to `v1`, return true if success @@ -1628,8 +1629,8 @@ std::vector> calc_sync_versions(int64_t req_bc_cnt, if (req_cc_cnt < cc_cnt) { Version cc_version; - if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) { - // * only one CC happened and CP changed + if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) { + // * only one CC happened and CP changed, and no full compaction happened // BE [=][=][=][=][=====][=][=] // ^~~~~ req_cp // MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=] @@ -1653,6 +1654,13 @@ std::vector> calc_sync_versions(int64_t req_bc_cnt, // ^_____________________^ related_versions: [req_cp, max] // there may be holes if we don't return all version // after ms_cp, however it can be optimized. + // * one CC happened and CP changed, and full compaction happened + // BE [=][=][=][=][=][=][=][=][=][=] + // ^~~~~ req_cp + // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=] + // ^~~~~~~ ms_cp + // ^___________________________^ related_versions: [req_cp, max] + // cc_version = {req_cp, std::numeric_limits::max() - 1}; } if (versions.empty() || !combine_if_overlapping(versions.front(), cc_version)) { @@ -1723,6 +1731,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, } int64_t req_bc_cnt = request->base_compaction_cnt(); int64_t req_cc_cnt = request->cumulative_compaction_cnt(); + int64_t req_fc_cnt = request->has_full_compaction_cnt() ? request->full_compaction_cnt() : 0; int64_t req_cp = request->cumulative_point(); do { @@ -1807,6 +1816,8 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, int64_t bc_cnt = tablet_stat.base_compaction_cnt(); int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt(); + int64_t fc_cnt = + tablet_stat.has_full_compaction_cnt() ? tablet_stat.full_compaction_cnt() : 0; int64_t cp = tablet_stat.cumulative_point(); response->mutable_stats()->CopyFrom(tablet_stat); @@ -1818,17 +1829,18 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, //========================================================================== // Find version ranges to be synchronized due to compaction //========================================================================== - if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) { + if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp || req_fc_cnt > fc_cnt) { code = MetaServiceCode::INVALID_ARGUMENT; ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt - << ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" << tablet_id; + << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ", req_cp=" << req_cp + << ", cp=" << cp << " tablet_id=" << tablet_id; msg = ss.str(); LOG(WARNING) << msg; return; } auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, - req_start, req_end); + req_fc_cnt, fc_cnt, req_start, req_end); for (auto [start, end] : versions) { internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, response); if (code != MetaServiceCode::OK) { diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 7b1f13462037f1..76b6a8d0c432fc 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -755,6 +755,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string } else if (compaction.type() == TabletCompactionJobPB::FULL) { // clang-format off stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1); + stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ? stats->full_compaction_cnt() + 1 : 1); if (compaction.output_cumulative_point() > stats->cumulative_point()) { // After supporting parallel cumu compaction, compaction with older cumu point may be committed after // new cumu point has been set, MUST NOT set cumu point back to old value diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 8150dced5bd9d1..6f94ec3b46ae75 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -3781,7 +3781,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) { extern std::vector> calc_sync_versions( int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t cc_cnt, int64_t req_cp, - int64_t cp, int64_t req_start, int64_t req_end); + int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, int64_t req_end); TEST(MetaServiceTest, CalcSyncVersionsTest) { using Versions = std::vector>; @@ -3797,7 +3797,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{8, 12}})); } @@ -3813,7 +3813,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 10}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12] } @@ -3822,7 +3822,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 15}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12] } @@ -3839,7 +3839,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12] } @@ -3855,7 +3855,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12] } @@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3}; auto [req_cp, cp] = std::tuple {5, 15}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12] } @@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}})); } @@ -3895,7 +3895,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1}; auto [req_cp, cp] = std::tuple {8, 8}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12] } @@ -3904,7 +3904,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 10}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12] } @@ -3913,7 +3913,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 15}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8, 12] } @@ -3922,11 +3922,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); // [0, 4] v [5, max] v [8, 12] ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}})); } + + { + // when there exists full compaction, we can't optimize by "* only one CC happened and CP changed" + + // * one CC happened and CP changed, and full compaction happened + // BE [=][=][=][=][=][=][=][=][=][=] + // ^~~~~ req_cp + // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=] + // ^~~~~~~ ms_cp + // ^___________________________^ related_versions: [req_cp, max] + // + auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1}; + auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; + auto [req_cp, cp] = std::tuple {4, 7}; + auto [req_start, req_end] = std::tuple {9, 12}; + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 1, + req_start, req_end); + ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}})); + } + + { + // abnormal case: + auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1}; + auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; + auto [req_cp, cp] = std::tuple {4, 7}; + auto [req_start, req_end] = std::tuple {9, 12}; + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, + req_start, req_end); + // when not considering full compaction, the returned versions is wrong becasue rowsets in [7-8] are missed + ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}})); + } } TEST(MetaServiceTest, StageTest) { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index bc4a219d02691f..345e03e8574264 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -624,7 +624,7 @@ message TabletStatsPB { optional int64 cumulative_point = 9; optional int64 last_base_compaction_time_ms = 10; optional int64 last_cumu_compaction_time_ms = 11; - optional int64 full_compaction_cnt = 12; + optional int64 full_compaction_cnt = 12; // used by calc_sync_versions() only optional int64 last_full_compaction_time_ms = 13; optional int64 index_size = 14; optional int64 segment_size = 15; @@ -1043,6 +1043,7 @@ message GetRowsetRequest { // for compability reason we use FILL_WITH_DICT as default optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT]; optional string request_ip = 9; + optional int64 full_compaction_cnt = 10; } message GetRowsetResponse { diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out new file mode 100644 index 00000000000000..ed930452a5d142 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out @@ -0,0 +1,55 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 10 +2 20 +3 30 + +-- !sql -- +1 10 +2 20 +3 30 + +-- !write_cluster_new_write -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !read_cluster_query -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !write_cluster_full_compaction -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !write_cluster_cumu_compaction -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !write_cluster_new_write -- +1 80 +2 70 +3 30 +4 40 +5 50 + +-- !read_cluster_check_dup_key -- + +-- !read_cluster_res -- +1 80 +2 70 +3 30 +4 40 +5 50 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy new file mode 100644 index 00000000000000..12bdd2dd007bf9 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_cloud_calc_sync_version","docker") { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + ] + options.enableDebugPoints() + options.cloudMode = true + + docker(options) { + def write_cluster = "write_cluster" + def read_cluster = "read_cluster" + + // Add two clusters + cluster.addBackend(1, write_cluster) + cluster.addBackend(1, read_cluster) + + sql "use @${write_cluster}" + logger.info("==== switch to write cluster") + def tableName = "test_cloud_calc_sync_version" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + UNIQUE KEY(k) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + + sql """ INSERT INTO ${tableName} VALUES (1,10)""" + sql """ INSERT INTO ${tableName} VALUES (2,20)""" + sql """ INSERT INTO ${tableName} VALUES (3,30)""" + qt_sql "select * from ${tableName} order by k;" + + def check_rs_metas = { tbl, check_func -> + def compactionUrl = sql_return_maparray("show tablets from ${tbl};").get(0).CompactionStatus + def (code, out, err) = curl("GET", compactionUrl) + assert code == 0 + def jsonMeta = parseJson(out.trim()) + logger.info("==== rowsets: ${jsonMeta.rowsets}, cumu point: ${jsonMeta["cumulative point"]}") + check_func(jsonMeta.rowsets, jsonMeta["cumulative point"]) + } + + def tabletStats = sql_return_maparray("show tablets from ${tableName};") + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + def backends = sql_return_maparray('show backends') + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + + def do_cumu_compaction = { def tbl, def tablet_id, int start, int end, int cp -> + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", [tablet_id: "${tablet_id}", cumu_point: "${cp}"]) + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", [tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"]) + + trigger_and_wait_compaction(tbl, "cumulative") + + GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point") + GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets") + } + + try { + // [2-2],[3-3],[4-4] -> [2,4] + do_cumu_compaction(tableName, tabletId, 2, 4, 5) + qt_sql "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 2 + assert cumu_point as int == 5 + assert rowsets[1].contains("[2-4]") + }) + + sql """ INSERT INTO ${tableName} VALUES (4,40)""" // ver=5 + sql """ INSERT INTO ${tableName} VALUES (5,50)""" // ver=6 + sql "sync;" + + GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block", [tablet_id: "${tabletId}"]) + def t1 = thread("full compaction") { + // [2,4],[5-5],[6-6] -> [2,6] + sql "use @${write_cluster}" + trigger_and_wait_compaction(tableName, "full") + } + + sleep(1500) + sql """ INSERT INTO ${tableName} VALUES (1,60)""" // ver=7 + sql """ INSERT INTO ${tableName} VALUES (2,70)""" // ver=8 + sql "sync;" + qt_write_cluster_new_write "select * from ${tableName} order by k;" + + + // read cluster sync rowsets [2-4],[5-5],[6-6],[7-7],[8-8], bc_cnt=0, cc_cnt=1, cp=4 + sql "use @${read_cluster}" + logger.info("==== switch to read cluster") + qt_read_cluster_query "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 6 + assert cumu_point as int == 5 + assert rowsets[1].contains("[2-4]") + assert rowsets[2].contains("[5-5]") + assert rowsets[3].contains("[6-6]") + assert rowsets[4].contains("[7-7]") + assert rowsets[5].contains("[8-8]") + }) + + + sql "use @${write_cluster}" + logger.info("==== switch to write cluster") + GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block") + t1.get() + qt_write_cluster_full_compaction "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 4 + assert cumu_point as int == 7 // updated by full compaction + assert rowsets[1].contains("[2-6]") + assert rowsets[2].contains("[7-7]") + assert rowsets[3].contains("[8-8]") + }) + + + do_cumu_compaction(tableName, tabletId, 7, 8, 7) + qt_write_cluster_cumu_compaction "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 3 + assert cumu_point as int == 7 + assert rowsets[1].contains("[2-6]") + assert rowsets[2].contains("[7-8]") + }) + + sql """ INSERT INTO ${tableName} VALUES (1,80)""" // ver=9 + sql "sync;" + qt_write_cluster_new_write "select * from ${tableName} order by k;" + + + // read cluster will read dup keys of ver=9 to ver=7 because it will not sync rowset [7-8] + sql "use @${read_cluster}" + logger.info("==== switch to read cluster") + sql "set disable_nereids_rules=ELIMINATE_GROUP_BY;" + qt_read_cluster_check_dup_key "select k,count() from ${tableName} group by k having count()>1;" + qt_read_cluster_res "select * from ${tableName} order by k;" + + } catch (Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +}