Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions be/src/cloud/cloud_compaction_stop_token.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_compaction_stop_token.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "gen_cpp/cloud.pb.h"

namespace doris {

CloudCompactionStopToken::CloudCompactionStopToken(CloudStorageEngine& engine,
CloudTabletSPtr tablet, int64_t initiator)
: _engine {engine}, _tablet {std::move(tablet)}, _initiator(initiator) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}

void CloudCompactionStopToken::do_lease() {
cloud::TabletJobInfoPB job;
auto* idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
idx->set_table_id(_tablet->table_id());
idx->set_index_id(_tablet->index_id());
idx->set_partition_id(_tablet->partition_id());
auto* compaction_job = job.add_compaction();
compaction_job->set_id(_uuid);
using namespace std::chrono;
int64_t lease_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count() +
(config::lease_compaction_interval_seconds * 4);
compaction_job->set_lease(lease_time);
auto st = _engine.meta_mgr().lease_tablet_job(job);
if (!st.ok()) {
LOG_WARNING("failed to lease compaction stop token")
.tag("job_id", _uuid)
.tag("delete_bitmap_lock_initiator", _initiator)
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
}

Status CloudCompactionStopToken::do_register() {
int64_t base_compaction_cnt = 0;
int64_t cumulative_compaction_cnt = 0;
{
std::lock_guard lock {_tablet->get_header_lock()};
base_compaction_cnt = _tablet->base_compaction_cnt();
cumulative_compaction_cnt = _tablet->cumulative_compaction_cnt();
}
cloud::TabletJobInfoPB job;
auto* idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
idx->set_table_id(_tablet->table_id());
idx->set_index_id(_tablet->index_id());
idx->set_partition_id(_tablet->partition_id());
auto* compaction_job = job.add_compaction();
compaction_job->set_id(_uuid);
compaction_job->set_delete_bitmap_lock_initiator(_initiator);
compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
std::to_string(config::heartbeat_service_port));
compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN);
// required by MS to check if it's a valid compaction job
compaction_job->set_base_compaction_cnt(base_compaction_cnt);
compaction_job->set_cumulative_compaction_cnt(cumulative_compaction_cnt);
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
compaction_job->set_expiration(now + config::compaction_timeout_seconds);
compaction_job->set_lease(now + (config::lease_compaction_interval_seconds * 4));
cloud::StartTabletJobResponse resp;
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
LOG_WARNING("failed to register compaction stop token")
.tag("job_id", _uuid)
.tag("delete_bitmap_lock_initiator", _initiator)
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

Status CloudCompactionStopToken::do_unregister() {
cloud::TabletJobInfoPB job;
auto* idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
idx->set_table_id(_tablet->table_id());
idx->set_index_id(_tablet->index_id());
idx->set_partition_id(_tablet->partition_id());
auto* compaction_job = job.add_compaction();
compaction_job->set_id(_uuid);
compaction_job->set_delete_bitmap_lock_initiator(_initiator);
compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
std::to_string(config::heartbeat_service_port));
compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN);
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
LOG_WARNING("failed to unregister compaction stop token")
.tag("job_id", _uuid)
.tag("delete_bitmap_lock_initiator", _initiator)
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

int64_t CloudCompactionStopToken::initiator() const {
return _initiator;
}
} // namespace doris
45 changes: 45 additions & 0 deletions be/src/cloud/cloud_compaction_stop_token.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"

namespace doris {

class CloudCompactionStopToken {
public:
CloudCompactionStopToken(CloudStorageEngine& engine, CloudTabletSPtr tablet, int64_t initiator);
~CloudCompactionStopToken() = default;

void do_lease();
Status do_register();
Status do_unregister();

int64_t initiator() const;

private:
CloudStorageEngine& _engine;
CloudTabletSPtr _tablet;
std::string _uuid;
int64_t _initiator;
};

} // namespace doris
7 changes: 6 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
}
}
LOG_INFO(
"[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_"
"input_rowsets] tablet_id={}, start={}, end={}, "
"input_rowsets->size()={}",
target_tablet_id, start_version, end_version, input_rowsets->size());
return input_rowsets->size();
}
return input_rowsets->size();
})

size_t promotion_size = cloud_promotion_size(tablet);
Expand Down
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,11 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
.tag("out_rowset_size", _output_rowsets.size())
.tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version)
.tag("alter_version", alter_version);
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator));
Defer defer {[&]() {
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet));
}};

TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap().delete_bitmap.clear();
std::shared_ptr<CloudTablet> tmp_tablet =
Expand All @@ -459,6 +464,8 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
if (max_version >= start_calc_delete_bitmap_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version}, &incremental_rowsets));
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
DBUG_BLOCK);
for (auto rowset : incremental_rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
Expand Down
64 changes: 64 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
#include <rapidjson/stringbuffer.h>

#include <algorithm>
#include <mutex>
#include <variant>

#include "cloud/cloud_base_compaction.h"
#include "cloud/cloud_compaction_stop_token.h"
#include "cloud/cloud_cumulative_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_full_compaction.h"
Expand All @@ -39,6 +41,7 @@
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
Expand Down Expand Up @@ -764,6 +767,7 @@ void CloudStorageEngine::_lease_compaction_thread_callback() {
std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens;
{
std::lock_guard lock(_compaction_mtx);
for (auto& [_, base] : _submitted_base_compactions) {
Expand All @@ -781,8 +785,16 @@ void CloudStorageEngine::_lease_compaction_thread_callback() {
full_compactions.push_back(full);
}
}
for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
if (stop_token) {
compation_stop_tokens.push_back(stop_token);
}
}
}
// TODO(plat1ko): Support batch lease rpc
for (auto& stop_token : compation_stop_tokens) {
stop_token->do_lease();
}
for (auto& comp : full_compactions) {
comp->do_lease();
}
Expand Down Expand Up @@ -860,5 +872,57 @@ std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compac
return _cumulative_compaction_policies.at(compaction_policy);
}

Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet,
int64_t initiator) {
{
std::lock_guard lock(_compaction_mtx);
auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
if (!success) {
return Status::AlreadyExist("stop token already exists for tablet_id={}",
tablet->tablet_id());
}
}

auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator);
auto st = stop_token->do_register();

if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_active_compaction_stop_tokens.erase(tablet->tablet_id());
return st;
}

{
std::lock_guard lock(_compaction_mtx);
_active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
}
LOG_INFO(
"successfully register compaction stop token for tablet_id={}, "
"delete_bitmap_lock_initiator={}",
tablet->tablet_id(), initiator);
return st;
}

Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet) {
std::shared_ptr<CloudCompactionStopToken> stop_token;
{
std::lock_guard lock(_compaction_mtx);
if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
it != _active_compaction_stop_tokens.end()) {
stop_token = it->second;
} else {
return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id());
}
_active_compaction_stop_tokens.erase(tablet->tablet_id());
}
// stop token will be removed when SC commit or abort
// RETURN_IF_ERROR(stop_token->do_unregister());
LOG_INFO(
"successfully unregister compaction stop token for tablet_id={}, "
"delete_bitmap_lock_initiator={}",
tablet->tablet_id(), stop_token->initiator());
return Status::OK();
}

#include "common/compile_check_end.h"
} // namespace doris
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class CloudBaseCompaction;
class CloudFullCompaction;
class TabletHotspot;
class CloudWarmUpManager;
class CloudCompactionStopToken;

class CloudStorageEngine final : public BaseStorageEngine {
public:
Expand Down Expand Up @@ -147,6 +148,10 @@ class CloudStorageEngine final : public BaseStorageEngine {
return *_sync_load_for_tablets_thread_pool;
}

Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);

Status unregister_compaction_stop_token(CloudTabletSPtr tablet);

private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
Expand Down Expand Up @@ -193,6 +198,9 @@ class CloudStorageEngine final : public BaseStorageEngine {
// tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx`
std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
_submitted_cumu_compactions;
// tablet_id -> active compaction stop tokens
std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>>
_active_compaction_stop_tokens;

std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
Expand Down
Loading
Loading