From 04e8829966675bd86233c537e69d51714b5c868b Mon Sep 17 00:00:00 2001 From: yoruet <1559650411@qq.com> Date: Fri, 18 Oct 2024 10:58:41 +0800 Subject: [PATCH 1/2] add compaction status info --- be/src/cloud/cloud_base_compaction.cpp | 40 +++-- be/src/cloud/cloud_compaction_action.cpp | 8 + be/src/cloud/cloud_cumulative_compaction.cpp | 35 +++- be/src/cloud/cloud_full_compaction.cpp | 20 ++- be/src/cloud/cloud_tablet.cpp | 51 ++++++ be/src/cloud/cloud_tablet.h | 36 +++++ be/src/http/action/compaction_action.cpp | 10 ++ be/src/olap/base_compaction.cpp | 45 ++++-- be/src/olap/cumulative_compaction.cpp | 49 ++++-- be/src/olap/full_compaction.cpp | 33 +++- be/src/olap/olap_server.cpp | 4 + be/src/olap/tablet.cpp | 18 +++ be/src/olap/tablet.h | 28 ++++ .../test_full_compaction_status.groovy | 153 ++++++++++++++++++ 14 files changed, 481 insertions(+), 49 deletions(-) create mode 100644 regression-test/suites/compaction/test_full_compaction_status.groovy diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 84eb921b91ffb5..dbc6f81c02c584 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -45,8 +45,16 @@ CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTablet CloudBaseCompaction::~CloudBaseCompaction() = default; Status CloudBaseCompaction::prepare_compact() { + Status st; + Defer defer_set_st([&] { + if (!st.ok()) { + cloud_tablet()->set_last_base_compaction_status(st.to_string()); + cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis()); + } + }); if (_tablet->tablet_state() != TABLET_RUNNING) { - return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + return st; } bool need_sync_tablet = true; @@ -61,10 +69,12 @@ Status CloudBaseCompaction::prepare_compact() { } } if (need_sync_tablet) { - RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + st = cloud_tablet()->sync_rowsets(); + RETURN_IF_ERROR(st); } - RETURN_IF_ERROR(pick_rowsets_to_compact()); + st = pick_rowsets_to_compact(); + RETURN_IF_ERROR(st); for (auto& rs : _input_rowsets) { _input_row_num += rs->num_rows(); @@ -108,11 +118,13 @@ Status CloudBaseCompaction::request_global_lock() { compaction_job->set_expiration(_expiration); compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); cloud::StartTabletJobResponse resp; - auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + cloud_tablet()->set_last_base_compaction_status(st.to_string()); if (resp.has_alter_version()) { (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); } if (!st.ok()) { + cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis()); if (resp.status().code() == cloud::STALE_TABLET_CACHE) { // set last_sync_time to 0 to force sync tablet next time cloud_tablet()->last_sync_time_s = 0; @@ -260,12 +272,21 @@ Status CloudBaseCompaction::execute_compact() { using namespace std::chrono; auto start = steady_clock::now(); - auto res = CloudCompactionMixin::execute_compact(); - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res + Status st; + Defer defer_set_st([&] { + cloud_tablet()->set_last_base_compaction_status(st.to_string()); + if (!st.ok()) { + cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis()); + } else { + cloud_tablet()->set_last_base_compaction_success_time(UnixMillis()); + } + }); + st = CloudCompactionMixin::execute_compact(); + if (!st.ok()) { + LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << st << ", tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version; - return res; + return st; } LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]", _tablet->tablet_id(), duration_cast(steady_clock::now() - start).count(), @@ -290,7 +311,8 @@ Status CloudBaseCompaction::execute_compact() { DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size); base_output_size << _output_rowset->total_disk_size(); - return Status::OK(); + st = Status::OK(); + return st; } Status CloudBaseCompaction::modify_rowsets() { diff --git a/be/src/cloud/cloud_compaction_action.cpp b/be/src/cloud/cloud_compaction_action.cpp index f194a433494a0d..ddc2292b41de6a 100644 --- a/be/src/cloud/cloud_compaction_action.cpp +++ b/be/src/cloud/cloud_compaction_action.cpp @@ -156,6 +156,14 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); } + if (compaction_type == PARAM_COMPACTION_BASE) { + tablet->set_last_base_compaction_schedule_time(UnixMillis()); + } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { + tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); + } else if (compaction_type == PARAM_COMPACTION_FULL) { + tablet->set_last_full_compaction_schedule_time(UnixMillis()); + } + LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id << " table id: " << table_id; // 3. submit compaction task diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 75a22c84fee09f..379f234a7a5b28 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -49,10 +49,18 @@ CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; Status CloudCumulativeCompaction::prepare_compact() { DBUG_EXECUTE_IF("CloudCumulativeCompaction.prepare_compact.sleep", { sleep(5); }) + Status st; + Defer defer_set_st([&] { + if (!st.ok()) { + cloud_tablet()->set_last_cumu_compaction_status(st.to_string()); + cloud_tablet()->set_last_cumu_compaction_failure_time(UnixMillis()); + } + }); if (_tablet->tablet_state() != TABLET_RUNNING && (!config::enable_new_tablet_do_compaction || static_cast(_tablet.get())->alter_version() == -1)) { - return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + return st; } std::vector> cumu_compactions; @@ -76,11 +84,12 @@ Status CloudCumulativeCompaction::prepare_compact() { } } if (need_sync_tablet) { - RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + st = cloud_tablet()->sync_rowsets(); + RETURN_IF_ERROR(st); } // pick rowsets to compact - auto st = pick_rowsets_to_compact(); + st = pick_rowsets_to_compact(); if (!st.ok()) { if (_last_delete_version.first != -1) { // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. @@ -182,12 +191,21 @@ Status CloudCumulativeCompaction::execute_compact() { using namespace std::chrono; auto start = steady_clock::now(); - auto res = CloudCompactionMixin::execute_compact(); - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res + Status st; + Defer defer_set_st([&] { + cloud_tablet()->set_last_cumu_compaction_status(st.to_string()); + if (!st.ok()) { + cloud_tablet()->set_last_cumu_compaction_failure_time(UnixMillis()); + } else { + cloud_tablet()->set_last_cumu_compaction_success_time(UnixMillis()); + } + }); + st = CloudCompactionMixin::execute_compact(); + if (!st.ok()) { + LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << st << ", tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version; - return res; + return st; } LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]", _tablet->tablet_id(), duration_cast(steady_clock::now() - start).count(), @@ -216,7 +234,8 @@ Status CloudCumulativeCompaction::execute_compact() { _input_rowsets_total_size); cumu_output_size << _output_rowset->total_disk_size(); - return Status::OK(); + st = Status::OK(); + return st; } Status CloudCumulativeCompaction::modify_rowsets() { diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 0b6810a414c852..809590303d97f6 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -47,14 +47,24 @@ CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTablet CloudFullCompaction::~CloudFullCompaction() = default; Status CloudFullCompaction::prepare_compact() { + Status st; + Defer defer_set_st([&] { + if (!st.ok()) { + cloud_tablet()->set_last_full_compaction_status(st.to_string()); + cloud_tablet()->set_last_full_compaction_failure_time(UnixMillis()); + } + }); if (_tablet->tablet_state() != TABLET_RUNNING) { - return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + return st; } // always sync latest rowset for full compaction - RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + st = cloud_tablet()->sync_rowsets(); + RETURN_IF_ERROR(st); - RETURN_IF_ERROR(pick_rowsets_to_compact()); + st = pick_rowsets_to_compact(); + RETURN_IF_ERROR(st); for (auto& rs : _input_rowsets) { _input_row_num += rs->num_rows(); @@ -151,7 +161,9 @@ Status CloudFullCompaction::execute_compact() { using namespace std::chrono; auto start = steady_clock::now(); auto res = CloudCompactionMixin::execute_compact(); + cloud_tablet()->set_last_full_compaction_status(res.to_string()); if (!res.ok()) { + cloud_tablet()->set_last_full_compaction_failure_time(UnixMillis()); LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version; @@ -178,6 +190,8 @@ Status CloudFullCompaction::execute_compact() { DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_total_size); full_output_size << _output_rowset->total_disk_size(); + cloud_tablet()->set_last_full_compaction_success_time(UnixMillis()); + cloud_tablet()->set_last_full_compaction_status(Status::OK().to_string()); return Status::OK(); } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 916caf1aa06bf8..225dbf1746f741 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -613,6 +613,57 @@ void CloudTablet::get_compaction_status(std::string* json_result) { // get snapshot version path json_doc _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr); root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator()); + rapidjson::Value cumu_value; + std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load()); + cumu_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator()); + rapidjson::Value base_value; + format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load()); + base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last base failure time", base_value, root.GetAllocator()); + rapidjson::Value full_value; + format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load()); + full_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last full failure time", full_value, root.GetAllocator()); + rapidjson::Value cumu_success_value; + format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load()); + cumu_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator()); + rapidjson::Value base_success_value; + format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load()); + base_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last base success time", base_success_value, root.GetAllocator()); + rapidjson::Value full_success_value; + format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load()); + full_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last full success time", full_success_value, root.GetAllocator()); + rapidjson::Value cumu_schedule_value; + format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load()); + cumu_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator()); + rapidjson::Value base_schedule_value; + format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load()); + base_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator()); + rapidjson::Value full_schedule_value; + format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load()); + full_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator()); + rapidjson::Value cumu_compaction_status_value; + cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(), + _last_cumu_compaction_status.length(), + root.GetAllocator()); + root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator()); + rapidjson::Value base_compaction_status_value; + base_compaction_status_value.SetString(_last_base_compaction_status.c_str(), + _last_base_compaction_status.length(), + root.GetAllocator()); + root.AddMember("last base status", base_compaction_status_value, root.GetAllocator()); + rapidjson::Value full_compaction_status_value; + full_compaction_status_value.SetString(_last_full_compaction_status.c_str(), + _last_full_compaction_status.length(), + root.GetAllocator()); + root.AddMember("last full status", full_compaction_status_value, root.GetAllocator()); // print all rowsets' version as an array rapidjson::Document versions_arr; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index ca05ea15029dd1..22cd0bf85af94a 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -163,11 +163,39 @@ class CloudTablet final : public BaseTablet { _last_full_compaction_success_millis = millis; } + int64_t last_cumu_compaction_schedule_time() { return _last_cumu_compaction_schedule_millis; } + void set_last_cumu_compaction_schedule_time(int64_t millis) { + _last_cumu_compaction_schedule_millis = millis; + } + int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; } void set_last_base_compaction_schedule_time(int64_t millis) { _last_base_compaction_schedule_millis = millis; } + int64_t last_full_compaction_schedule_time() { return _last_full_compaction_schedule_millis; } + void set_last_full_compaction_schedule_time(int64_t millis) { + _last_full_compaction_schedule_millis = millis; + } + + void set_last_cumu_compaction_status(std::string status) { + _last_cumu_compaction_status = std::move(status); + } + + std::string get_last_cumu_compaction_status() { return _last_cumu_compaction_status; } + + void set_last_base_compaction_status(std::string status) { + _last_base_compaction_status = std::move(status); + } + + std::string get_last_base_compaction_status() { return _last_base_compaction_status; } + + void set_last_full_compaction_status(std::string status) { + _last_full_compaction_status = std::move(status); + } + + std::string get_last_full_compaction_status() { return _last_full_compaction_status; } + int64_t alter_version() const { return _alter_version; } void set_alter_version(int64_t alter_version) { _alter_version = alter_version; } @@ -275,8 +303,16 @@ class CloudTablet final : public BaseTablet { std::atomic _last_base_compaction_success_millis; // timestamp of last full compaction success std::atomic _last_full_compaction_success_millis; + // timestamp of last cumu compaction schedule time + std::atomic _last_cumu_compaction_schedule_millis; // timestamp of last base compaction schedule time std::atomic _last_base_compaction_schedule_millis; + // timestamp of last full compaction schedule time + std::atomic _last_full_compaction_schedule_millis; + + std::string _last_cumu_compaction_status; + std::string _last_base_compaction_status; + std::string _last_full_compaction_status; int64_t _base_compaction_cnt = 0; int64_t _cumulative_compaction_cnt = 0; diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 7769a0ea1f4522..324d9a968839f3 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -153,6 +153,7 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j std::vector tablet_vec = _engine.tablet_manager()->get_all_tablet( [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; }); for (const auto& tablet : tablet_vec) { + tablet->set_last_full_compaction_schedule_time(UnixMillis()); RETURN_IF_ERROR( _engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, false)); } @@ -183,6 +184,15 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j std::future future_obj = task.get_future(); std::thread(std::move(task)).detach(); + // 更新schedule_time + if (compaction_type == PARAM_COMPACTION_BASE) { + tablet->set_last_base_compaction_schedule_time(UnixMillis()); + } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { + tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); + } else if (compaction_type == PARAM_COMPACTION_FULL) { + tablet->set_last_full_compaction_schedule_time(UnixMillis()); + } + // 4. wait for result for 2 seconds by async std::future_status status = future_obj.wait_for(std::chrono::seconds(2)); if (status == std::future_status::ready) { diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 164288241b33d5..59ef901f3533ec 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -47,21 +47,33 @@ BaseCompaction::BaseCompaction(StorageEngine& engine, const TabletSharedPtr& tab BaseCompaction::~BaseCompaction() = default; Status BaseCompaction::prepare_compact() { + Status st; + Defer defer_set_st([&] { + if (!st.ok()) { + tablet()->set_last_base_compaction_status(st.to_string()); + tablet()->set_last_base_compaction_failure_time(UnixMillis()); + } + }); + if (!tablet()->init_succeeded()) { - return Status::Error("_tablet init failed"); + st = Status::Error("_tablet init failed"); + return st; } std::unique_lock lock(tablet()->get_base_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - return Status::Error( - "another base compaction is running. tablet={}", _tablet->tablet_id()); + st = Status::Error("another base compaction is running. tablet={}", + _tablet->tablet_id()); + return st; } // 1. pick rowsets to compact - RETURN_IF_ERROR(pick_rowsets_to_compact()); + st = pick_rowsets_to_compact(); + RETURN_IF_ERROR(st); COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size()); - return Status::OK(); + st = Status::OK(); + return st; } Status BaseCompaction::execute_compact() { @@ -70,22 +82,35 @@ Status BaseCompaction::execute_compact() { Thread::set_idle_sched(); } #endif + Status st; + Defer defer_set_st([&] { + tablet()->set_last_base_compaction_status(st.to_string()); + if (!st.ok()) { + tablet()->set_last_base_compaction_failure_time(UnixMillis()); + } else { + tablet()->set_last_base_compaction_success_time(UnixMillis()); + } + }); + std::unique_lock lock(tablet()->get_base_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - return Status::Error( - "another base compaction is running. tablet={}", _tablet->tablet_id()); + st = Status::Error("another base compaction is running. tablet={}", + _tablet->tablet_id()); + return st; } SCOPED_ATTACH_TASK(_mem_tracker); - RETURN_IF_ERROR(CompactionMixin::execute_compact()); + st = CompactionMixin::execute_compact(); + RETURN_IF_ERROR(st); + DCHECK_EQ(_state, CompactionState::SUCCESS); - tablet()->set_last_base_compaction_success_time(UnixMillis()); DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size()); DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size); - return Status::OK(); + st = Status::OK(); + return st; } void BaseCompaction::_filter_input_rowset() { diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 1df669cfe6fc2b..c29d1cc064301c 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -79,24 +79,37 @@ CumulativeCompaction::CumulativeCompaction(StorageEngine& engine, const TabletSh CumulativeCompaction::~CumulativeCompaction() = default; Status CumulativeCompaction::prepare_compact() { + Status st; + Defer defer_set_st([&] { + if (!st.ok()) { + tablet()->set_last_cumu_compaction_status(st.to_string()); + tablet()->set_last_cumu_compaction_failure_time(UnixMillis()); + } + }); + if (!tablet()->init_succeeded()) { - return Status::Error("_tablet init failed"); + st = Status::Error("_tablet init failed"); + return st; } std::unique_lock lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - return Status::Error( + st = Status::Error( "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id()); + return st; } tablet()->calculate_cumulative_point(); VLOG_CRITICAL << "after calculate, current cumulative point is " << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id(); - RETURN_IF_ERROR(pick_rowsets_to_compact()); + st = pick_rowsets_to_compact(); + RETURN_IF_ERROR(st); + COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size()); - return Status::OK(); + st = Status::OK(); + return st; } Status CumulativeCompaction::execute_compact() { @@ -114,31 +127,43 @@ Status CumulativeCompaction::execute_compact() { } }) + Status st; + Defer defer_set_st([&] { + tablet()->set_last_cumu_compaction_status(st.to_string()); + if (!st.ok()) { + tablet()->set_last_cumu_compaction_failure_time(UnixMillis()); + } else { + // TIME_SERIES_POLICY, generating an empty rowset doesn't need to update the timestamp. + if (!(tablet()->tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY && + _output_rowset->num_segments() == 0)) { + tablet()->set_last_cumu_compaction_success_time(UnixMillis()); + } + } + }); std::unique_lock lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - return Status::Error( + st = Status::Error( "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id()); + return st; } SCOPED_ATTACH_TASK(_mem_tracker); - RETURN_IF_ERROR(CompactionMixin::execute_compact()); + st = CompactionMixin::execute_compact(); + RETURN_IF_ERROR(st); + DCHECK_EQ(_state, CompactionState::SUCCESS); tablet()->cumulative_compaction_policy()->update_cumulative_point( tablet(), _input_rowsets, _output_rowset, _last_delete_version); VLOG_CRITICAL << "after cumulative compaction, current cumulative point is " << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id(); - // TIME_SERIES_POLICY, generating an empty rowset doesn't need to update the timestamp. - if (!(tablet()->tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY && - _output_rowset->num_segments() == 0)) { - tablet()->set_last_cumu_compaction_success_time(UnixMillis()); - } DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size()); DorisMetrics::instance()->cumulative_compaction_bytes_total->increment( _input_rowsets_total_size); - return Status::OK(); + st = Status::OK(); + return st; } Status CumulativeCompaction::pick_rowsets_to_compact() { diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 1060193a20bdca..2994830eb23244 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -52,8 +52,16 @@ FullCompaction::~FullCompaction() { } Status FullCompaction::prepare_compact() { + Status st; + Defer defer_set_st([&] { + if (!st.ok()) { + tablet()->set_last_full_compaction_status(st.to_string()); + tablet()->set_last_full_compaction_failure_time(UnixMillis()); + } + }); if (!tablet()->init_succeeded()) { - return Status::Error("Full compaction init failed"); + st = Status::Error("Full compaction init failed"); + return st; } std::unique_lock base_lock(tablet()->get_base_compaction_lock()); @@ -64,18 +72,30 @@ Status FullCompaction::prepare_compact() { { tablet()->set_cumulative_layer_point(tablet()->max_version_unlocked() + 1); }) // 1. pick rowsets to compact - RETURN_IF_ERROR(pick_rowsets_to_compact()); + st = pick_rowsets_to_compact(); + RETURN_IF_ERROR(st); - return Status::OK(); + st = Status::OK(); + return st; } Status FullCompaction::execute_compact() { + Status st; + Defer defer_set_st([&] { + tablet()->set_last_full_compaction_status(st.to_string()); + if (!st.ok()) { + tablet()->set_last_full_compaction_failure_time(UnixMillis()); + } else { + tablet()->set_last_full_compaction_success_time(UnixMillis()); + } + }); std::unique_lock base_lock(tablet()->get_base_compaction_lock()); std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock()); SCOPED_ATTACH_TASK(_mem_tracker); - RETURN_IF_ERROR(CompactionMixin::execute_compact()); + st = CompactionMixin::execute_compact(); + RETURN_IF_ERROR(st); Version last_version = _input_rowsets.back()->version(); tablet()->cumulative_compaction_policy()->update_cumulative_point(tablet(), _input_rowsets, @@ -83,9 +103,8 @@ Status FullCompaction::execute_compact() { VLOG_CRITICAL << "after cumulative compaction, current cumulative point is " << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id(); - tablet()->set_last_full_compaction_success_time(UnixMillis()); - - return Status::OK(); + st = Status::OK(); + return st; } Status FullCompaction::pick_rowsets_to_compact() { diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 708e7c49e0b935..715c0e94e0a078 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -700,6 +700,10 @@ void StorageEngine::_compaction_tasks_producer_callback() { for (const auto& tablet : tablets_compaction) { if (compaction_type == CompactionType::BASE_COMPACTION) { tablet->set_last_base_compaction_schedule_time(UnixMillis()); + } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { + tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); + } else if (compaction_type == CompactionType::FULL_COMPACTION) { + tablet->set_last_full_compaction_schedule_time(UnixMillis()); } Status st = _submit_compaction_task(tablet, compaction_type, false); if (!st.ok()) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 437984ccde0cd5..7551131056c397 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1402,15 +1402,33 @@ void Tablet::get_compaction_status(std::string* json_result) { format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load()); full_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); root.AddMember("last full success time", full_success_value, root.GetAllocator()); + rapidjson::Value cumu_schedule_value; + format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load()); + cumu_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator()); rapidjson::Value base_schedule_value; format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load()); base_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator()); + rapidjson::Value full_schedule_value; + format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load()); + full_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator()); + rapidjson::Value cumu_compaction_status_value; + cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(), + _last_cumu_compaction_status.length(), + root.GetAllocator()); + root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator()); rapidjson::Value base_compaction_status_value; base_compaction_status_value.SetString(_last_base_compaction_status.c_str(), _last_base_compaction_status.length(), root.GetAllocator()); root.AddMember("last base status", base_compaction_status_value, root.GetAllocator()); + rapidjson::Value full_compaction_status_value; + full_compaction_status_value.SetString(_last_full_compaction_status.c_str(), + _last_full_compaction_status.length(), + root.GetAllocator()); + root.AddMember("last full status", full_compaction_status_value, root.GetAllocator()); // last single replica compaction status // "single replica compaction status": { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 77768cf821d742..916745f0922295 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -265,11 +265,21 @@ class Tablet final : public BaseTablet { _last_full_compaction_success_millis = millis; } + int64_t last_cumu_compaction_schedule_time() { return _last_cumu_compaction_schedule_millis; } + void set_last_cumu_compaction_schedule_time(int64_t millis) { + _last_cumu_compaction_schedule_millis = millis; + } + int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; } void set_last_base_compaction_schedule_time(int64_t millis) { _last_base_compaction_schedule_millis = millis; } + int64_t last_full_compaction_schedule_time() { return _last_full_compaction_schedule_millis; } + void set_last_full_compaction_schedule_time(int64_t millis) { + _last_full_compaction_schedule_millis = millis; + } + void set_last_single_compaction_failure_status(std::string status) { _last_single_compaction_failure_status = std::move(status); } @@ -326,12 +336,24 @@ class Tablet final : public BaseTablet { return _cumulative_compaction_policy; } + void set_last_cumu_compaction_status(std::string status) { + _last_cumu_compaction_status = std::move(status); + } + + std::string get_last_cumu_compaction_status() { return _last_cumu_compaction_status; } + void set_last_base_compaction_status(std::string status) { _last_base_compaction_status = std::move(status); } std::string get_last_base_compaction_status() { return _last_base_compaction_status; } + void set_last_full_compaction_status(std::string status) { + _last_full_compaction_status = std::move(status); + } + + std::string get_last_full_compaction_status() { return _last_full_compaction_status; } + std::tuple get_visible_version_and_time() const; void set_visible_version(const std::shared_ptr& visible_version) { @@ -573,13 +595,19 @@ class Tablet final : public BaseTablet { std::atomic _last_base_compaction_success_millis; // timestamp of last full compaction success std::atomic _last_full_compaction_success_millis; + // timestamp of last cumu compaction schedule time + std::atomic _last_cumu_compaction_schedule_millis; // timestamp of last base compaction schedule time std::atomic _last_base_compaction_schedule_millis; + // timestamp of last full compaction schedule time + std::atomic _last_full_compaction_schedule_millis; std::atomic _cumulative_point; std::atomic _cumulative_promotion_size; std::atomic _newly_created_rowset_num; std::atomic _last_checkpoint_time; + std::string _last_cumu_compaction_status; std::string _last_base_compaction_status; + std::string _last_full_compaction_status; // single replica compaction status std::string _last_single_compaction_failure_status; diff --git a/regression-test/suites/compaction/test_full_compaction_status.groovy b/regression-test/suites/compaction/test_full_compaction_status.groovy new file mode 100644 index 00000000000000..1a30ea43466e4c --- /dev/null +++ b/regression-test/suites/compaction/test_full_compaction_status.groovy @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_full_compaction_status") { + def tableName = "test_full_compaction_status" + + try { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `user_id` INT NOT NULL, + `value` INT NOT NULL) + UNIQUE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) + BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true" + );""" + + sql """ INSERT INTO ${tableName} VALUES (1,1),(2,2) """ + sql """ INSERT INTO ${tableName} VALUES (1,10),(2,20) """ + sql """ INSERT INTO ${tableName} VALUES (1,100),(2,200) """ + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Initial compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assert tabletJson.rowsets.size() > 1 + + assertTrue(out.contains("last cumulative failure time")) + assertTrue(out.contains("last base failure time")) + assertTrue(out.contains("last full failure time")) + assertTrue(out.contains("last cumulative success time")) + assertTrue(out.contains("last base success time")) + assertTrue(out.contains("last full success time")) + assertTrue(out.contains("last cumulative schedule time")) + assertTrue(out.contains("last base schedule time")) + assertTrue(out.contains("last full schedule time")) + + assertTrue(out.contains("last cumulative status")) + assertTrue(out.contains("last base status")) + assertTrue(out.contains("last full status")) + + assertTrue(out.contains("\"last cumulative failure time\": \"1970-01-01")) + assertTrue(out.contains("\"last base failure time\": \"1970-01-01")) + assertTrue(out.contains("\"last full failure time\": \"1970-01-01")) + assertTrue(out.contains("\"last cumulative success time\": \"1970-01-01")) + assertTrue(out.contains("\"last base success time\": \"1970-01-01")) + assertTrue(out.contains("\"last full success time\": \"1970-01-01")) + assertTrue(out.contains("\"last cumulative schedule time\": \"1970-01-01")) + assertTrue(out.contains("\"last base schedule time\": \"1970-01-01")) + assertTrue(out.contains("\"last full schedule time\": \"1970-01-01")) + } + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + + def (code, out, err) = be_run_full_compaction( + backendId_to_backendIP.get(backend_id), + backendId_to_backendHttpPort.get(backend_id), + tablet_id + ) + logger.info("Trigger full compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def triggerJson = parseJson(out.trim()) + assertEquals("success", triggerJson.status.toLowerCase()) + + boolean running = true + int checkCount = 0 + while (running && checkCount < 10) { + Thread.sleep(1000) + (code, out, err) = be_get_compaction_status( + backendId_to_backendIP.get(backend_id), + backendId_to_backendHttpPort.get(backend_id), + tablet_id + ) + logger.info("Check compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def statusJson = parseJson(out.trim()) + assertEquals("success", statusJson.status.toLowerCase()) + running = statusJson.run_status + checkCount++ + } + assert checkCount < 10, "Full compaction didn't complete within expected time" + + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Detailed compaction status: code=" + code + ", out=" + out + ", err=" + err) + + assertTrue(out.contains("last cumulative failure time")) + assertTrue(out.contains("last base failure time")) + assertTrue(out.contains("last full failure time")) + assertTrue(out.contains("last cumulative success time")) + assertTrue(out.contains("last base success time")) + assertTrue(out.contains("last full success time")) + assertTrue(out.contains("last cumulative schedule time")) + assertTrue(out.contains("last base schedule time")) + assertTrue(out.contains("last full schedule time")) + + assertTrue(out.contains("last cumulative status")) + assertTrue(out.contains("last base status")) + assertTrue(out.contains("last full status")) + + assertTrue(out.contains("last full status\": \"[OK]\"")) + assertFalse(out.contains("last full success time\": \"1970-01-01")) + assertFalse(out.contains("last full schedule time\": \"1970-01-01")) + } + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Final compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + + def cloudMode = isCloudMode() + if (cloudMode) { + assert tabletJson.rowsets.size() == 2 + } else { + assert tabletJson.rowsets.size() == 1 + } + } + + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} \ No newline at end of file From e088da1289627faf91fbe4fea1f753b9c2e8315e Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 16 May 2025 17:39:28 +0800 Subject: [PATCH 2/2] 1 --- be/src/cloud/cloud_base_compaction.cpp | 2 +- be/src/cloud/cloud_tablet.cpp | 34 +++++++++++++++++--------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index dbc6f81c02c584..af5f1ca6c93b19 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -118,7 +118,7 @@ Status CloudBaseCompaction::request_global_lock() { compaction_job->set_expiration(_expiration); compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); cloud::StartTabletJobResponse resp; - st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); cloud_tablet()->set_last_base_compaction_status(st.to_string()); if (resp.has_alter_version()) { (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 225dbf1746f741..6663796430297d 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -33,6 +33,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet_mgr.h" +#include "common/cast_set.h" #include "common/config.h" #include "common/logging.h" #include "io/cache/block_file_cache_downloader.h" @@ -615,53 +616,62 @@ void CloudTablet::get_compaction_status(std::string* json_result) { root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator()); rapidjson::Value cumu_value; std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load()); - cumu_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + cumu_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator()); rapidjson::Value base_value; format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load()); - base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + base_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last base failure time", base_value, root.GetAllocator()); rapidjson::Value full_value; format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load()); - full_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + full_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last full failure time", full_value, root.GetAllocator()); rapidjson::Value cumu_success_value; format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load()); - cumu_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + cumu_success_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator()); rapidjson::Value base_success_value; format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load()); - base_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + base_success_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last base success time", base_success_value, root.GetAllocator()); rapidjson::Value full_success_value; format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load()); - full_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + full_success_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last full success time", full_success_value, root.GetAllocator()); rapidjson::Value cumu_schedule_value; format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load()); - cumu_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + cumu_schedule_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator()); rapidjson::Value base_schedule_value; format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load()); - base_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + base_schedule_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator()); rapidjson::Value full_schedule_value; format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load()); - full_schedule_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + full_schedule_value.SetString(format_str.c_str(), cast_set(format_str.length()), + root.GetAllocator()); root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator()); rapidjson::Value cumu_compaction_status_value; cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(), - _last_cumu_compaction_status.length(), + cast_set(_last_cumu_compaction_status.length()), root.GetAllocator()); root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator()); rapidjson::Value base_compaction_status_value; base_compaction_status_value.SetString(_last_base_compaction_status.c_str(), - _last_base_compaction_status.length(), + cast_set(_last_base_compaction_status.length()), root.GetAllocator()); root.AddMember("last base status", base_compaction_status_value, root.GetAllocator()); rapidjson::Value full_compaction_status_value; full_compaction_status_value.SetString(_last_full_compaction_status.c_str(), - _last_full_compaction_status.length(), + cast_set(_last_full_compaction_status.length()), root.GetAllocator()); root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());