From 301e80d6aafb1584c8421ac9d6b47ded3cdea22c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 13 Mar 2025 14:10:02 +0800 Subject: [PATCH] [Fix](cloud) Fix dup key problem when `enable_new_tablet_do_compaction=true` (#48399) https://github.com/apache/doris/pull/39558 add a config to support shadow tablet to do cumulative compaction during schema change in cloud mode to avoid -235 error on new tablet in the case of a large number of loads. However, this introduces correctness problem on merge-on-write table because some rowsets' delete bitmaps are wrong if there are cumu compactions on new tablet when SC calculate delete bitmaps for incremental rowsets after converting historical data. This PR introduce a new type of compaction `STOP_TOKEN` to fail all existing compaction jobs and disallow doing any compaction on tablet and change the SC process as following: 1. converting historical data 2. register stop token on new tablet to fail all existing compaction jobs and disallow doing any compaction on new tablet 3. calculate delete bitmap for incremental rowsets without lock 4. calculate delete bitmap for incremental rowsets with lock 5. commit SC job and remove stop token ---- ref: https://github.com/apache/doris/pull/29386 --- be/src/cloud/cloud_compaction_stop_token.cpp | 125 +++++++++++++++ be/src/cloud/cloud_compaction_stop_token.h | 45 ++++++ .../cloud_cumulative_compaction_policy.cpp | 7 +- be/src/cloud/cloud_schema_change_job.cpp | 7 + be/src/cloud/cloud_storage_engine.cpp | 65 ++++++++ be/src/cloud/cloud_storage_engine.h | 8 + cloud/src/meta-service/meta_service_job.cpp | 79 +++++++++- gensrc/proto/cloud.proto | 1 + .../test_cloud_mow_new_tablet_compaction.out | 14 ++ ...est_cloud_mow_new_tablet_compaction.groovy | 143 ++++++++++++++++++ 10 files changed, 491 insertions(+), 3 deletions(-) create mode 100644 be/src/cloud/cloud_compaction_stop_token.cpp create mode 100644 be/src/cloud/cloud_compaction_stop_token.h create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy diff --git a/be/src/cloud/cloud_compaction_stop_token.cpp b/be/src/cloud/cloud_compaction_stop_token.cpp new file mode 100644 index 00000000000000..9d6f1b614a61dc --- /dev/null +++ b/be/src/cloud/cloud_compaction_stop_token.cpp @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_stop_token.h" + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/logging.h" +#include "gen_cpp/cloud.pb.h" + +namespace doris { + +CloudCompactionStopToken::CloudCompactionStopToken(CloudStorageEngine& engine, + CloudTabletSPtr tablet, int64_t initiator) + : _engine {engine}, _tablet {std::move(tablet)}, _initiator(initiator) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +void CloudCompactionStopToken::do_lease() { + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto* compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + using namespace std::chrono; + int64_t lease_time = duration_cast(system_clock::now().time_since_epoch()).count() + + (config::lease_compaction_interval_seconds * 4); + compaction_job->set_lease(lease_time); + auto st = _engine.meta_mgr().lease_tablet_job(job); + if (!st.ok()) { + LOG_WARNING("failed to lease compaction stop token") + .tag("job_id", _uuid) + .tag("delete_bitmap_lock_initiator", _initiator) + .tag("tablet_id", _tablet->tablet_id()) + .error(st); + } +} + +Status CloudCompactionStopToken::do_register() { + int64_t base_compaction_cnt = 0; + int64_t cumulative_compaction_cnt = 0; + { + std::lock_guard lock {_tablet->get_header_lock()}; + base_compaction_cnt = _tablet->base_compaction_cnt(); + cumulative_compaction_cnt = _tablet->cumulative_compaction_cnt(); + } + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto* compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_delete_bitmap_lock_initiator(_initiator); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN); + // required by MS to check if it's a valid compaction job + compaction_job->set_base_compaction_cnt(base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); + compaction_job->set_expiration(now + config::compaction_timeout_seconds); + compaction_job->set_lease(now + (config::lease_compaction_interval_seconds * 4)); + cloud::StartTabletJobResponse resp; + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + LOG_WARNING("failed to register compaction stop token") + .tag("job_id", _uuid) + .tag("delete_bitmap_lock_initiator", _initiator) + .tag("tablet_id", _tablet->tablet_id()) + .error(st); + } + return st; +} + +Status CloudCompactionStopToken::do_unregister() { + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto* compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_delete_bitmap_lock_initiator(_initiator); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN); + auto st = _engine.meta_mgr().abort_tablet_job(job); + if (!st.ok()) { + LOG_WARNING("failed to unregister compaction stop token") + .tag("job_id", _uuid) + .tag("delete_bitmap_lock_initiator", _initiator) + .tag("tablet_id", _tablet->tablet_id()) + .error(st); + } + return st; +} + +int64_t CloudCompactionStopToken::initiator() const { + return _initiator; +} +} // namespace doris diff --git a/be/src/cloud/cloud_compaction_stop_token.h b/be/src/cloud/cloud_compaction_stop_token.h new file mode 100644 index 00000000000000..ce61ebc37472c7 --- /dev/null +++ b/be/src/cloud/cloud_compaction_stop_token.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" + +namespace doris { + +class CloudCompactionStopToken { +public: + CloudCompactionStopToken(CloudStorageEngine& engine, CloudTabletSPtr tablet, int64_t initiator); + ~CloudCompactionStopToken() = default; + + void do_lease(); + Status do_register(); + Status do_unregister(); + + int64_t initiator() const; + +private: + CloudStorageEngine& _engine; + CloudTabletSPtr _tablet; + std::string _uuid; + int64_t _initiator; +}; + +} // namespace doris diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index 9e3ca3eb3db7ce..c49448e1998517 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -66,8 +66,13 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( input_rowsets->push_back(rowset); } } + LOG_INFO( + "[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_" + "input_rowsets] tablet_id={}, start={}, end={}, " + "input_rowsets->size()={}", + target_tablet_id, start_version, end_version, input_rowsets->size()); + return input_rowsets->size(); } - return input_rowsets->size(); }) size_t promotion_size = cloud_promotion_size(tablet); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index b98dfb33a24848..6d84cce7cbaf72 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -425,6 +425,11 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, .tag("out_rowset_size", _output_rowsets.size()) .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version) .tag("alter_version", alter_version); + RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator)); + Defer defer {[&]() { + static_cast(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet)); + }}; + TabletMetaSharedPtr tmp_meta = std::make_shared(*(_new_tablet->tablet_meta())); tmp_meta->delete_bitmap().delete_bitmap.clear(); std::shared_ptr tmp_tablet = @@ -444,6 +449,8 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, if (max_version >= start_calc_delete_bitmap_version) { RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock", + DBUG_BLOCK); for (auto rowset : incremental_rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 9c403ac8e3b97f..52ab28b52c3109 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -25,9 +25,11 @@ #include #include +#include #include #include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_stop_token.h" #include "cloud/cloud_cumulative_compaction.h" #include "cloud/cloud_cumulative_compaction_policy.h" #include "cloud/cloud_full_compaction.h" @@ -37,6 +39,8 @@ #include "cloud/cloud_txn_delete_bitmap_cache.h" #include "cloud/cloud_warm_up_manager.h" #include "cloud/config.h" +#include "common/config.h" +#include "common/status.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" @@ -758,6 +762,7 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { std::vector> full_compactions; std::vector> base_compactions; std::vector> cumu_compactions; + std::vector> compation_stop_tokens; { std::lock_guard lock(_compaction_mtx); for (auto& [_, base] : _submitted_base_compactions) { @@ -775,8 +780,16 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { full_compactions.push_back(full); } } + for (auto& [_, stop_token] : _active_compaction_stop_tokens) { + if (stop_token) { + compation_stop_tokens.push_back(stop_token); + } + } } // TODO(plat1ko): Support batch lease rpc + for (auto& stop_token : compation_stop_tokens) { + stop_token->do_lease(); + } for (auto& comp : full_compactions) { comp->do_lease(); } @@ -854,5 +867,57 @@ std::shared_ptr CloudStorageEngine::cumu_compac return _cumulative_compaction_policies.at(compaction_policy); } +Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet, + int64_t initiator) { + { + std::lock_guard lock(_compaction_mtx); + auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr); + if (!success) { + return Status::AlreadyExist("stop token already exists for tablet_id={}", + tablet->tablet_id()); + } + } + + auto stop_token = std::make_shared(*this, tablet, initiator); + auto st = stop_token->do_register(); + + if (!st.ok()) { + std::lock_guard lock(_compaction_mtx); + _active_compaction_stop_tokens.erase(tablet->tablet_id()); + return st; + } + + { + std::lock_guard lock(_compaction_mtx); + _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token; + } + LOG_INFO( + "successfully register compaction stop token for tablet_id={}, " + "delete_bitmap_lock_initiator={}", + tablet->tablet_id(), initiator); + return st; +} + +Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet) { + std::shared_ptr stop_token; + { + std::lock_guard lock(_compaction_mtx); + if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id()); + it != _active_compaction_stop_tokens.end()) { + stop_token = it->second; + } else { + return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id()); + } + _active_compaction_stop_tokens.erase(tablet->tablet_id()); + } + // stop token will be removed when SC commit or abort + // RETURN_IF_ERROR(stop_token->do_unregister()); + LOG_INFO( + "successfully unregister compaction stop token for tablet_id={}, " + "delete_bitmap_lock_initiator={}", + tablet->tablet_id(), stop_token->initiator()); + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 5e51285d93a8ff..6381fbe60012db 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -44,6 +44,7 @@ class CloudBaseCompaction; class CloudFullCompaction; class TabletHotspot; class CloudWarmUpManager; +class CloudCompactionStopToken; class CloudStorageEngine final : public BaseStorageEngine { public: @@ -143,6 +144,10 @@ class CloudStorageEngine final : public BaseStorageEngine { return *_sync_load_for_tablets_thread_pool; } + Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator); + + Status unregister_compaction_stop_token(CloudTabletSPtr tablet); + private: void _refresh_storage_vault_info_thread_callback(); void _vacuum_stale_rowsets_thread_callback(); @@ -188,6 +193,9 @@ class CloudStorageEngine final : public BaseStorageEngine { // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx` std::unordered_map>> _submitted_cumu_compactions; + // tablet_id -> active compaction stop tokens + std::unordered_map> + _active_compaction_stop_tokens; std::unique_ptr _base_compaction_thread_pool; std::unique_ptr _cumu_compaction_thread_pool; diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 5299b85f41d9a9..99b61e7ebf01e2 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -61,7 +62,8 @@ bool check_compaction_input_verions(const TabletCompactionJobPB& compaction, if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) { return true; } - if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) { + if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE || + compaction.type() == TabletCompactionJobPB::STOP_TOKEN) { return true; } if (compaction.input_versions_size() != 2 || @@ -192,11 +194,27 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst }), compactions.end()); // clang-format on // Check conflict job + if (std::ranges::any_of(compactions, [](const auto& c) { + return c.type() == TabletCompactionJobPB::STOP_TOKEN; + })) { + auto it = std::ranges::find_if(compactions, [](const auto& c) { + return c.type() == TabletCompactionJobPB::STOP_TOKEN; + }); + msg = fmt::format( + "compactions are not allowed on tablet_id={} currently, blocked by schema " + "change job delete_bitmap_initiator={}", + tablet_id, it->delete_bitmap_lock_initiator()); + code = MetaServiceCode::JOB_TABLET_BUSY; + return; + } if (compaction.type() == TabletCompactionJobPB::FULL) { // Full compaction is generally used for data correctness repair // for MOW table, so priority should be given to performing full // compaction operations and canceling other types of compaction. compactions.Clear(); + } else if (compaction.type() == TabletCompactionJobPB::STOP_TOKEN) { + // fail all existing compactions + compactions.Clear(); } else if ((!compaction.has_check_input_versions_range() && compaction.input_versions().empty()) || (compaction.has_check_input_versions_range() && @@ -1111,6 +1129,25 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str auto job_val = recorded_job.SerializeAsString(); txn->put(job_key, job_val); if (!new_tablet_job_val.empty()) { + auto& compactions = *new_recorded_job.mutable_compaction(); + auto origin_size = compactions.size(); + compactions.erase( + std::remove_if( + compactions.begin(), compactions.end(), + [&](auto& c) { + return c.has_delete_bitmap_lock_initiator() && + c.delete_bitmap_lock_initiator() == + schema_change.delete_bitmap_lock_initiator(); + }), + compactions.end()); + if (compactions.size() < origin_size) { + INSTANCE_LOG(INFO) + << "remove " << (origin_size - compactions.size()) + << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id + << " delete_bitmap_lock_initiator=" + << schema_change.delete_bitmap_lock_initiator() + << " key=" << hex(job_key); + } new_recorded_job.clear_schema_change(); new_tablet_job_val = new_recorded_job.SerializeAsString(); txn->put(new_tablet_job_key, new_tablet_job_val); @@ -1150,7 +1187,28 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str return; } if (schema_change.alter_version() < 2) { // no need to update stats - // TODO(cyx): clear schema_change job? + // TODO(cyx): clear schema_change job? + if (!new_tablet_job_val.empty()) { + auto& compactions = *new_recorded_job.mutable_compaction(); + auto origin_size = compactions.size(); + compactions.erase( + std::remove_if(compactions.begin(), compactions.end(), + [&](auto& c) { + return c.has_delete_bitmap_lock_initiator() && + c.delete_bitmap_lock_initiator() == + schema_change.delete_bitmap_lock_initiator(); + }), + compactions.end()); + if (compactions.size() < origin_size) { + INSTANCE_LOG(INFO) + << "remove " << (origin_size - compactions.size()) + << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id + << " delete_bitmap_lock_initiator=" + << schema_change.delete_bitmap_lock_initiator() << " key=" << hex(job_key); + } + new_tablet_job_val = new_recorded_job.SerializeAsString(); + txn->put(new_tablet_job_key, new_tablet_job_val); + } need_commit = true; return; } @@ -1287,6 +1345,23 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str auto job_val = recorded_job.SerializeAsString(); txn->put(job_key, job_val); if (!new_tablet_job_val.empty()) { + auto& compactions = *new_recorded_job.mutable_compaction(); + auto origin_size = compactions.size(); + compactions.erase( + std::remove_if(compactions.begin(), compactions.end(), + [&](auto& c) { + return c.has_delete_bitmap_lock_initiator() && + c.delete_bitmap_lock_initiator() == + schema_change.delete_bitmap_lock_initiator(); + }), + compactions.end()); + if (compactions.size() < origin_size) { + INSTANCE_LOG(INFO) << "remove " << (origin_size - compactions.size()) + << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id + << " delete_bitmap_lock_initiator=" + << schema_change.delete_bitmap_lock_initiator() + << " key=" << hex(job_key); + } new_recorded_job.clear_schema_change(); new_tablet_job_val = new_recorded_job.SerializeAsString(); txn->put(new_tablet_job_key, new_tablet_job_val); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index ff0279990ee10d..28ec3ba67d8189 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -514,6 +514,7 @@ message TabletCompactionJobPB { CUMULATIVE = 2; EMPTY_CUMULATIVE = 3; // just update cumulative point FULL = 4; + STOP_TOKEN = 5; // fail existing compactions and deny newly incomming compactions } // IP and port of the node which initiates this job optional string initiator = 1; // prepare diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out new file mode 100644 index 00000000000000..4f02f6683a21f4 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 2 + +-- !dup_key_count -- + +-- !sql -- +1 \N 88 88 +2 2 2 2 +3 \N 77 77 +5 \N 77 77 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy new file mode 100644 index 00000000000000..467e9fddb4369e --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_mow_new_tablet_compaction", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def customBeConfig = [ + enable_new_tablet_do_compaction : true + ] + + setBeConfigTemporary(customBeConfig) { + def table1 = "test_cloud_mow_new_tablet_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2);" + sql "insert into ${table1} values(3,3,3,2);" + sql "sync;" + qt_sql "select * from ${table1} order by k1;" + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${table1};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock") + sql "alter table ${table1} modify column c1 varchar(100);" + + Thread.sleep(3000) + + tabletStats = sql_return_maparray("show tablets from ${table1};") + def newTabletId = "-1" + for (def stat : tabletStats) { + if (stat.TabletId != tabletId) { + newTabletId = stat.TabletId + break + } + } + + logger.info("new_tablet_id=${newTabletId}") + + int start_ver = 5 + int end_ver = 4 + + // these load will skip to calculate bitmaps in publish phase on new tablet because it's in NOT_READY state + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + def threads = [] + threads << Thread.start { sql "insert into ${table1} values(1,99,99,99),(3,99,99,99);"} + ++end_ver + Thread.sleep(200) + threads << Thread.start { sql "insert into ${table1} values(5,88,88,88),(1,88,88,88);" } + ++end_ver + Thread.sleep(200) + threads << Thread.start { sql "insert into ${table1} values(3,77,77,77),(5,77,77,77);" } + ++end_ver + Thread.sleep(2000) + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + threads.each { it.join() } + + + // let sc capture these rowsets when calculating increment rowsets without lock + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + Thread.sleep(1000) + + // do cumu compaction on these rowsets on new tablet + // this can happen when enable_new_tablet_do_compaction=true + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id:"${newTabletId}", start_version: start_ver, end_version: end_ver]); + { + // trigger cumu compaction, should fail + logger.info("trigger cumu compaction on tablet=${newTabletId} BE.Host=${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assert code == 0 + def compactJson = parseJson(out.trim()) + assert "success" != compactJson.status.toLowerCase() + } + + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock") + // wait for sc to finish + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + qt_dup_key_count "select k1,count() as cnt from ${table1} group by k1 having cnt>1;" + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +}