Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTablet
CloudBaseCompaction::~CloudBaseCompaction() = default;

Status CloudBaseCompaction::prepare_compact() {
Status st;
Defer defer_set_st([&] {
if (!st.ok()) {
cloud_tablet()->set_last_base_compaction_status(st.to_string());
cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis());
}
});
if (_tablet->tablet_state() != TABLET_RUNNING) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
return st;
}

bool need_sync_tablet = true;
Expand All @@ -59,10 +67,12 @@ Status CloudBaseCompaction::prepare_compact() {
}
}
if (need_sync_tablet) {
RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
st = cloud_tablet()->sync_rowsets();
RETURN_IF_ERROR(st);
}

RETURN_IF_ERROR(pick_rowsets_to_compact());
st = pick_rowsets_to_compact();
RETURN_IF_ERROR(st);

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
Expand Down Expand Up @@ -107,10 +117,12 @@ Status CloudBaseCompaction::request_global_lock() {
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
cloud::StartTabletJobResponse resp;
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
cloud_tablet()->set_last_base_compaction_status(st.to_string());
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis());
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
cloud_tablet()->last_sync_time_s = 0;
Expand Down Expand Up @@ -258,12 +270,21 @@ Status CloudBaseCompaction::execute_compact() {

using namespace std::chrono;
auto start = steady_clock::now();
auto res = CloudCompactionMixin::execute_compact();
if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
Status st;
Defer defer_set_st([&] {
cloud_tablet()->set_last_base_compaction_status(st.to_string());
if (!st.ok()) {
cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis());
} else {
cloud_tablet()->set_last_base_compaction_success_time(UnixMillis());
}
});
st = CloudCompactionMixin::execute_compact();
if (!st.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << st
<< ", tablet=" << _tablet->tablet_id()
<< ", output_version=" << _output_version;
return res;
return st;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
Expand All @@ -288,7 +309,8 @@ Status CloudBaseCompaction::execute_compact() {
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size);
base_output_size << _output_rowset->total_disk_size();

return Status::OK();
st = Status::OK();
return st;
}

Status CloudBaseCompaction::modify_rowsets() {
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

if (compaction_type == PARAM_COMPACTION_BASE) {
tablet->set_last_base_compaction_schedule_time(UnixMillis());
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
} else if (compaction_type == PARAM_COMPACTION_FULL) {
tablet->set_last_full_compaction_schedule_time(UnixMillis());
}

LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id
<< " table id: " << table_id;
// 3. submit compaction task
Expand Down
35 changes: 27 additions & 8 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,18 @@ CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Status CloudCumulativeCompaction::prepare_compact() {
DBUG_EXECUTE_IF("CloudCumulativeCompaction.prepare_compact.sleep", { sleep(5); })
Status st;
Defer defer_set_st([&] {
if (!st.ok()) {
cloud_tablet()->set_last_cumu_compaction_status(st.to_string());
cloud_tablet()->set_last_cumu_compaction_failure_time(UnixMillis());
}
});
if (_tablet->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction ||
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
return st;
}

std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
Expand All @@ -74,11 +82,12 @@ Status CloudCumulativeCompaction::prepare_compact() {
}
}
if (need_sync_tablet) {
RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
st = cloud_tablet()->sync_rowsets();
RETURN_IF_ERROR(st);
}

// pick rowsets to compact
auto st = pick_rowsets_to_compact();
st = pick_rowsets_to_compact();
if (!st.ok()) {
if (_last_delete_version.first != -1) {
// we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
Expand Down Expand Up @@ -181,12 +190,21 @@ Status CloudCumulativeCompaction::execute_compact() {

using namespace std::chrono;
auto start = steady_clock::now();
auto res = CloudCompactionMixin::execute_compact();
if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
Status st;
Defer defer_set_st([&] {
cloud_tablet()->set_last_cumu_compaction_status(st.to_string());
if (!st.ok()) {
cloud_tablet()->set_last_cumu_compaction_failure_time(UnixMillis());
} else {
cloud_tablet()->set_last_cumu_compaction_success_time(UnixMillis());
}
});
st = CloudCompactionMixin::execute_compact();
if (!st.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << st
<< ", tablet=" << _tablet->tablet_id()
<< ", output_version=" << _output_version;
return res;
return st;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
Expand Down Expand Up @@ -215,7 +233,8 @@ Status CloudCumulativeCompaction::execute_compact() {
_input_rowsets_total_size);
cumu_output_size << _output_rowset->total_disk_size();

return Status::OK();
st = Status::OK();
return st;
}

Status CloudCumulativeCompaction::modify_rowsets() {
Expand Down
20 changes: 17 additions & 3 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,24 @@ CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTablet
CloudFullCompaction::~CloudFullCompaction() = default;

Status CloudFullCompaction::prepare_compact() {
Status st;
Defer defer_set_st([&] {
if (!st.ok()) {
cloud_tablet()->set_last_full_compaction_status(st.to_string());
cloud_tablet()->set_last_full_compaction_failure_time(UnixMillis());
}
});
if (_tablet->tablet_state() != TABLET_RUNNING) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
return st;
}

// always sync latest rowset for full compaction
RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
st = cloud_tablet()->sync_rowsets();
RETURN_IF_ERROR(st);

RETURN_IF_ERROR(pick_rowsets_to_compact());
st = pick_rowsets_to_compact();
RETURN_IF_ERROR(st);

for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
Expand Down Expand Up @@ -151,7 +161,9 @@ Status CloudFullCompaction::execute_compact() {
using namespace std::chrono;
auto start = steady_clock::now();
auto res = CloudCompactionMixin::execute_compact();
cloud_tablet()->set_last_full_compaction_status(res.to_string());
if (!res.ok()) {
cloud_tablet()->set_last_full_compaction_failure_time(UnixMillis());
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->tablet_id()
<< ", output_version=" << _output_version;
Expand All @@ -178,6 +190,8 @@ Status CloudFullCompaction::execute_compact() {
DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_total_size);
full_output_size << _output_rowset->total_disk_size();

cloud_tablet()->set_last_full_compaction_success_time(UnixMillis());
cloud_tablet()->set_last_full_compaction_status(Status::OK().to_string());
return Status::OK();
}

Expand Down
60 changes: 60 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,66 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
// get snapshot version path json_doc
_timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
rapidjson::Value cumu_value;
std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
cumu_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
rapidjson::Value base_value;
format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
base_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last base failure time", base_value, root.GetAllocator());
rapidjson::Value full_value;
format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
full_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last full failure time", full_value, root.GetAllocator());
rapidjson::Value cumu_success_value;
format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
cumu_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
rapidjson::Value base_success_value;
format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
base_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last base success time", base_success_value, root.GetAllocator());
rapidjson::Value full_success_value;
format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
full_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last full success time", full_success_value, root.GetAllocator());
rapidjson::Value cumu_schedule_value;
format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
cumu_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
rapidjson::Value base_schedule_value;
format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
base_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
rapidjson::Value full_schedule_value;
format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
full_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
root.GetAllocator());
root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
rapidjson::Value cumu_compaction_status_value;
cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
static_cast<uint>(_last_cumu_compaction_status.length()),
root.GetAllocator());
root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
rapidjson::Value base_compaction_status_value;
base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
static_cast<uint>(_last_base_compaction_status.length()),
root.GetAllocator());
root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
rapidjson::Value full_compaction_status_value;
full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
static_cast<uint>(_last_full_compaction_status.length()),
root.GetAllocator());
root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());

// print all rowsets' version as an array
rapidjson::Document versions_arr;
Expand Down
36 changes: 36 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,39 @@ class CloudTablet final : public BaseTablet {
_last_full_compaction_success_millis = millis;
}

int64_t last_cumu_compaction_schedule_time() { return _last_cumu_compaction_schedule_millis; }
void set_last_cumu_compaction_schedule_time(int64_t millis) {
_last_cumu_compaction_schedule_millis = millis;
}

int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; }
void set_last_base_compaction_schedule_time(int64_t millis) {
_last_base_compaction_schedule_millis = millis;
}

int64_t last_full_compaction_schedule_time() { return _last_full_compaction_schedule_millis; }
void set_last_full_compaction_schedule_time(int64_t millis) {
_last_full_compaction_schedule_millis = millis;
}

void set_last_cumu_compaction_status(std::string status) {
_last_cumu_compaction_status = std::move(status);
}

std::string get_last_cumu_compaction_status() { return _last_cumu_compaction_status; }

void set_last_base_compaction_status(std::string status) {
_last_base_compaction_status = std::move(status);
}

std::string get_last_base_compaction_status() { return _last_base_compaction_status; }

void set_last_full_compaction_status(std::string status) {
_last_full_compaction_status = std::move(status);
}

std::string get_last_full_compaction_status() { return _last_full_compaction_status; }

int64_t alter_version() const { return _alter_version; }
void set_alter_version(int64_t alter_version) { _alter_version = alter_version; }

Expand Down Expand Up @@ -266,8 +294,16 @@ class CloudTablet final : public BaseTablet {
std::atomic<int64_t> _last_base_compaction_success_millis;
// timestamp of last full compaction success
std::atomic<int64_t> _last_full_compaction_success_millis;
// timestamp of last cumu compaction schedule time
std::atomic<int64_t> _last_cumu_compaction_schedule_millis;
// timestamp of last base compaction schedule time
std::atomic<int64_t> _last_base_compaction_schedule_millis;
// timestamp of last full compaction schedule time
std::atomic<int64_t> _last_full_compaction_schedule_millis;

std::string _last_cumu_compaction_status;
std::string _last_base_compaction_status;
std::string _last_full_compaction_status;

int64_t _base_compaction_cnt = 0;
int64_t _cumulative_compaction_cnt = 0;
Expand Down
10 changes: 10 additions & 0 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
std::vector<TabletSharedPtr> tablet_vec = _engine.tablet_manager()->get_all_tablet(
[table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; });
for (const auto& tablet : tablet_vec) {
tablet->set_last_full_compaction_schedule_time(UnixMillis());
RETURN_IF_ERROR(
_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, false));
}
Expand Down Expand Up @@ -183,6 +184,15 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
std::future<Status> future_obj = task.get_future();
std::thread(std::move(task)).detach();

// 更新schedule_time
if (compaction_type == PARAM_COMPACTION_BASE) {
tablet->set_last_base_compaction_schedule_time(UnixMillis());
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
} else if (compaction_type == PARAM_COMPACTION_FULL) {
tablet->set_last_full_compaction_schedule_time(UnixMillis());
}

// 4. wait for result for 2 seconds by async
std::future_status status = future_obj.wait_for(std::chrono::seconds(2));
if (status == std::future_status::ready) {
Expand Down
Loading
Loading