Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1b04f4e
[feature](cloud) support event driven or periodic warm up (#4101)
kaijchen Jun 17, 2025
1b847b9
[fix](warmup) recycle_cache should also recycle index files (#4104)
kaijchen Jun 18, 2025
e84e7f2
[fix](warmup) call recycle_cache in remove_unused_rowsets (#4105)
kaijchen Jun 18, 2025
fed5347
[fix](warmup) fix use-after-free
kaijchen Jun 18, 2025
6d35db5
[fix](warmup) a job should not potentially block itself
kaijchen Jun 18, 2025
c7ce20f
[fix](metrics) prevent NPE when processing null values
kaijchen Jun 18, 2025
865d7fb
[fix](warmup) safely remove unused rowsets and avoid RPC under lock (…
kaijchen Jun 19, 2025
d5437dd
[opt](warmup) inverted index should not be added to the index queue
kaijchen Jun 19, 2025
aaa0b99
[metrics](warmup) Add metric to record last call timestamp of warm_up…
kaijchen Jun 22, 2025
b60e6fc
[metrics](warmup) change timestamp to microseconds
kaijchen Jun 25, 2025
90ccabb
fixup
kaijchen Jun 26, 2025
0391161
fixup
kaijchen Jun 26, 2025
00654db
fixup
kaijchen Jun 26, 2025
15823e9
Merge remote-tracking branch 'origin/master' into warmup-merge
kaijchen Jun 27, 2025
7a06d9b
[metrics](warmup) enhance warm_up_rowset bvars
kaijchen Jun 26, 2025
6c808bc
[config](warmup) add warm_up_rowset_slow_log_ms = 1000
kaijchen Jun 26, 2025
c8f6224
[metrics](warmup) add warm up slow count bvars
kaijchen Jun 26, 2025
0559ea6
fix coredump
kaijchen Jul 11, 2025
70f0cc1
Merge remote-tracking branch 'origin/master' into warmup-merge
kaijchen Jul 22, 2025
81ff757
Merge branch 'master' into warmup-merge
kaijchen Jul 22, 2025
feba5b3
Merge branch 'master' into warmup-merge
kaijchen Jul 23, 2025
5dd430a
Merge branch 'master' into warmup-merge
kaijchen Jul 23, 2025
47b25d7
Merge branch 'master' into warmup-merge
kaijchen Jul 23, 2025
8e1f9c4
Merge branch 'master' into warmup-merge
kaijchen Jul 23, 2025
63c5ca7
Merge remote-tracking branch 'origin/master' into warmup-merge
kaijchen Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

namespace doris {

bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
"file_cache_warm_up_cache_async_submitted_segment_num");

CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
: BaseBackendService(exec_env), _engine(engine) {}

Expand Down Expand Up @@ -95,8 +98,15 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
LOG_INFO("receive the warm up request.")
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
st = manager.check_and_set_job_id(request.job_id);
if (!st) {
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
if (st.ok()) {
break;
}
} else {
st = manager.check_and_set_job_id(request.job_id);
}
if (!st.ok()) {
LOG_WARNING("SET_JOB failed.").error(st);
break;
}
Expand Down Expand Up @@ -144,7 +154,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
LOG_INFO("receive the warm up request.")
.tag("request_type", "CLEAR_JOB")
.tag("job_id", request.job_id);
st = manager.clear_job(request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event, /* clear: */ true);
} else {
st = manager.clear_job(request.job_id);
}
break;
}
default:
Expand Down Expand Up @@ -197,6 +211,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
<< ", response=" << brpc_response.DebugString();
if (!cntl.Failed()) {
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
_engine.file_cache_block_downloader().submit_download_task(
std::move(*brpc_response.mutable_file_cache_block_metas()));
} else {
Expand Down
266 changes: 266 additions & 0 deletions be/src/cloud/cloud_internal_service.cpp

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions be/src/cloud/cloud_internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class CloudInternalServiceImpl final : public PInternalService {
PGetFileCacheMetaResponse* response,
google::protobuf::Closure* done) override;

void warm_up_rowset(google::protobuf::RpcController* controller,
const PWarmUpRowsetRequest* request, PWarmUpRowsetResponse* response,
google::protobuf::Closure* done) override;

void recycle_cache(google::protobuf::RpcController* controller,
const PRecycleCacheRequest* request, PRecycleCacheResponse* response,
google::protobuf::Closure* done) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
#include "cloud/schema_cloud_dictionary_cache.h"
Expand Down Expand Up @@ -1031,7 +1032,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string
return st;
}

Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
Expand Down Expand Up @@ -1080,6 +1081,8 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string&
RETURN_IF_ERROR(
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta);
return st;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CloudMetaMgr {
Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Expand Down
180 changes: 140 additions & 40 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "common/cast_set.h"
#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -69,6 +70,30 @@ bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");

static constexpr int LOAD_INITIATOR_ID = -1;

bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
"file_cache_cloud_tablet_submitted_segment_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
"file_cache_cloud_tablet_submitted_segment_num");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
"file_cache_cloud_tablet_submitted_index_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
"file_cache_cloud_tablet_submitted_index_num");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
"file_cache_cloud_tablet_finished_segment_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
"file_cache_cloud_tablet_finished_segment_num");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
"file_cache_cloud_tablet_finished_index_size");
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
"file_cache_cloud_tablet_finished_index_num");

bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
"file_cache_recycle_cached_data_segment_num");
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
"file_cache_recycle_cached_data_segment_size");
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
"file_cache_recycle_cached_data_index_num");

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}

Expand Down Expand Up @@ -276,6 +301,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
g_file_cache_cloud_tablet_submitted_segment_num << 1;
if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
g_file_cache_cloud_tablet_submitted_segment_size
<< rs->rowset_meta()->segment_file_size(seg_id);
}
// clang-format off
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
Expand All @@ -293,10 +323,10 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
}},
});

auto download_idx_file = [&](const io::Path& idx_path) {
auto download_idx_file = [&](const io::Path& idx_path, int64_t idx_size) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_size = idx_size,
.file_system = storage_resource.value()->fs,
.ctx =
{
Expand All @@ -310,22 +340,42 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
}},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
g_file_cache_cloud_tablet_submitted_index_num << 1;
g_file_cache_cloud_tablet_submitted_index_size << idx_size;
};
// clang-format on
auto schema_ptr = rowset_meta->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
std::unordered_map<int64_t, int64_t> index_size_map;
auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
for (const auto& info : inverted_index_info.index_info()) {
if (info.index_file_size() != -1) {
index_size_map[info.index_id()] = info.index_file_size();
} else {
VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
<< ", index_id " << info.index_id();
}
}
for (const auto& index : schema_ptr->inverted_indexes()) {
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
download_idx_file(idx_path);
download_idx_file(idx_path, index_size_map[index->index_id()]);
}
} else {
if (schema_ptr->has_inverted_index()) {
auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
int64_t idx_size = 0;
if (inverted_index_info.has_index_size()) {
idx_size = inverted_index_info.index_size();
} else {
VLOG_DEBUG << "index_size is not set for segment " << seg_id;
}
auto idx_path = storage_resource.value()->remote_idx_v2_path(
*rowset_meta, seg_id);
download_idx_file(idx_path);
download_idx_file(idx_path, idx_size);
}
}
}
Expand Down Expand Up @@ -535,53 +585,84 @@ void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets
}

void CloudTablet::remove_unused_rowsets() {
int64_t removed_rowsets_num = 0;
std::vector<std::shared_ptr<Rowset>> removed_rowsets;
int64_t removed_delete_bitmap_num = 0;
OlapStopWatch watch;
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
// it->second is std::shared_ptr<Rowset>
auto&& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() << " has "
<< rs.use_count() << " references, it cannot be removed";
++it;
continue;
{
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
auto& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
<< " has " << rs.use_count() << " references, it cannot be removed";
++it;
continue;
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
rs->clear_cache();
g_unused_rowsets_count << -1;
g_unused_rowsets_bytes << -rs->total_disk_size();
removed_rowsets.push_back(std::move(rs));
it = _unused_rowsets.erase(it);
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
rs->clear_cache();
g_unused_rowsets_count << -1;
g_unused_rowsets_bytes << -rs->total_disk_size();
it = _unused_rowsets.erase(it);
removed_rowsets_num++;
}

// 2. remove delete bitmap of pre rowsets
for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
auto& rowset_ids = std::get<0>(*it);
bool find_unused_rowset = false;
for (const auto& rowset_id : rowset_ids) {
if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
<< ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
find_unused_rowset = true;
break;
{
std::vector<RowsetId> rowset_ids;
std::vector<int64_t> num_segments;
std::vector<std::vector<std::string>> index_file_names;

for (auto& rs : removed_rowsets) {
rowset_ids.push_back(rs->rowset_id());
num_segments.push_back(rs->num_segments());
auto index_names = rs->get_index_file_names();
index_file_names.push_back(index_names);
int64_t segment_size_sum = 0;
for (int32_t i = 0; i < rs->num_segments(); i++) {
segment_size_sum += rs->rowset_meta()->segment_file_size(i);
}
g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
g_file_cache_recycle_cached_data_index_num << index_names.size();
}
if (find_unused_rowset) {
++it;
continue;

if (removed_rowsets.size() > 0) {
auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(tablet_id(), rowset_ids, num_segments, index_file_names);
}
}

{
std::lock_guard<std::mutex> lock(_gc_mutex);
// 2. remove delete bitmap of pre rowsets
for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
auto& rowset_ids = std::get<0>(*it);
bool find_unused_rowset = false;
for (const auto& rowset_id : rowset_ids) {
if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
<< ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
find_unused_rowset = true;
break;
}
}
if (find_unused_rowset) {
++it;
continue;
}
auto& key_ranges = std::get<1>(*it);
tablet_meta()->delete_bitmap().remove(key_ranges);
it = _unused_delete_bitmap.erase(it);
removed_delete_bitmap_num++;
// TODO(kaijie): recycle cache for unused delete bitmap
}
auto& key_ranges = std::get<1>(*it);
tablet_meta()->delete_bitmap().remove(key_ranges);
it = _unused_delete_bitmap.erase(it);
removed_delete_bitmap_num++;
}

LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
<< ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
<< ", removed_rowsets_num=" << removed_rowsets_num
<< ", removed_rowsets_num=" << removed_rowsets.size()
<< ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
<< ", cost(us)=" << watch.get_elapse_time_us();
}
Expand All @@ -599,14 +680,33 @@ void CloudTablet::clear_cache() {
}

void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
std::vector<RowsetId> rowset_ids;
std::vector<int64_t> num_segments;
std::vector<std::vector<std::string>> index_file_names;
for (const auto& rs : rowsets) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
return;
continue;
}
rs->clear_cache();
rowset_ids.push_back(rs->rowset_id());
num_segments.push_back(rs->num_segments());
auto index_names = rs->get_index_file_names();
index_file_names.push_back(index_names);
int64_t segment_size_sum = 0;
for (int32_t i = 0; i < rs->num_segments(); i++) {
segment_size_sum += rs->rowset_meta()->segment_file_size(i);
}
g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
g_file_cache_recycle_cached_data_index_num << index_names.size();
}
if (!rowsets.empty()) {
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.recycle_cache(rowsets.front()->rowset_meta()->tablet_id(), rowset_ids, num_segments,
index_file_names);
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ class CloudTablet final : public BaseTablet {

void build_tablet_report_info(TTabletInfo* tablet_info);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

// check that if the delete bitmap in delete bitmap cache has the same cardinality with the expected_delete_bitmap's
Status check_delete_bitmap_cache(int64_t txn_id, DeleteBitmap* expected_delete_bitmap) override;

Expand All @@ -286,6 +284,8 @@ class CloudTablet final : public BaseTablet {
void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
void remove_unused_rowsets();

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down
Loading
Loading