Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_BATCH")
.tag("job_id", request.job_id)
.tag("batch_id", request.batch_id)
.tag("jobs size", request.job_metas.size());
.tag("jobs size", request.job_metas.size())
.tag("tablet num of first meta",
request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size());
bool retry = false;
st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry);
if (!retry && st) {
Expand Down
159 changes: 88 additions & 71 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

#include "cloud/cloud_warm_up_manager.h"

#include <bthread/countdown_event.h>
#include <bvar/bvar.h>
#include <bvar/reducer.h>

#include <algorithm>
#include <cstddef>
Expand Down Expand Up @@ -69,6 +70,7 @@ bvar::Adder<uint64_t> g_file_cache_recycle_cache_requested_index_num(
"file_cache_recycle_cache_requested_index_num");
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
"file_cache_warm_up_rowset_last_call_unix_ts", 0);
bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num");

CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
Expand All @@ -95,6 +97,52 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}

void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait) {
if (file_size < 0) {
auto st = file_system->file_size(path, &file_size);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "get file size failed: " << path;
file_cache_warm_up_failed_task_num << 1;
return;
}
}

const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
int64_t offset = 0;
int64_t remaining_size = file_size;

while (remaining_size > 0) {
int64_t current_chunk_size = std::min(chunk_size, remaining_size);
wait->add_count();

_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = path,
.file_size = file_size,
.offset = offset,
.download_size = current_chunk_size,
.file_system = file_system,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
});

offset += current_chunk_size;
remaining_size -= current_chunk_size;
}
}

void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
Expand All @@ -116,6 +164,10 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up job is null");
continue;
}

std::shared_ptr<bthread::CountdownEvent> wait =
std::make_shared<bthread::CountdownEvent>(0);

for (int64_t tablet_id : cur_job->tablet_ids) {
if (_cur_job_id == 0) { // The job is canceled
break;
Expand All @@ -131,8 +183,7 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st);
continue;
}
std::shared_ptr<bthread::CountdownEvent> wait =
std::make_shared<bthread::CountdownEvent>(0);

auto tablet_meta = tablet->tablet_meta();
auto rs_metas = snapshot_rs_metas(tablet.get());
for (auto& [_, rs] : rs_metas) {
Expand All @@ -151,96 +202,62 @@ void CloudWarmUpManager::handle_jobs() {
expiration_time = 0;
}

wait->add_count();
g_file_cache_once_or_periodic_warm_up_submitted_segment_num << 1;
if (rs->segment_file_size(seg_id) > 0) {
g_file_cache_once_or_periodic_warm_up_submitted_segment_size
<< rs->segment_file_size(seg_id);
}
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rs, seg_id),
.file_size = rs->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::
enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
});

auto download_idx_file = [&](const io::Path& idx_path, int64_t idx_size) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = idx_size,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::
enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
};
// clang-format on
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
g_file_cache_once_or_periodic_warm_up_submitted_index_size << idx_size;
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
// 1st. download segment files
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(seg_id), storage_resource.value()->fs,
expiration_time, wait);

// 2nd. download inverted index files
int64_t file_size = -1;
auto schema_ptr = rs->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
const auto& idx_file_info = rs->inverted_index_file_info(seg_id);
if (idx_version == InvertedIndexStorageFormatPB::V1) {
auto&& inverted_index_info = rs->inverted_index_file_info(seg_id);
std::unordered_map<int64_t, int64_t> index_size_map;
for (const auto& info : inverted_index_info.index_info()) {
if (info.index_file_size() != -1) {
index_size_map[info.index_id()] = info.index_file_size();
} else {
VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
<< ", index_id " << info.index_id();
}
}
for (const auto& index : schema_ptr->inverted_indexes()) {
wait->add_count();
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index->index_id(), index->get_index_suffix());
download_idx_file(idx_path, index_size_map[index->index_id()]);
if (idx_file_info.index_info_size() > 0) {
for (const auto& idx_info : idx_file_info.index_info()) {
if (index->index_id() == idx_info.index_id() &&
index->get_index_suffix() == idx_info.index_suffix()) {
file_size = idx_info.index_file_size();
break;
}
}
}
submit_download_tasks(idx_path, file_size, storage_resource.value()->fs,
expiration_time, wait);
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
}
} else {
if (schema_ptr->has_inverted_index()) {
auto&& inverted_index_info = rs->inverted_index_file_info(seg_id);
int64_t idx_size = 0;
if (inverted_index_info.has_index_size()) {
idx_size = inverted_index_info.index_size();
} else {
VLOG_DEBUG << "index_size is not set for segment " << seg_id;
}
wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
download_idx_file(idx_path, idx_size);
file_size = idx_file_info.has_index_size() ? idx_file_info.index_size()
: -1;
submit_download_tasks(idx_path, file_size, storage_resource.value()->fs,
expiration_time, wait);
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
}
}
}
}
timespec time;
time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
if (!wait->timed_wait(time)) {
LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size());
}
}

timespec time;
time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
if (wait->timed_wait(time)) {
LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size());
}
{
std::unique_lock lock(_mtx);
Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <bthread/countdown_event.h>

#include <condition_variable>
#include <deque>
#include <mutex>
Expand Down Expand Up @@ -79,6 +81,9 @@ class CloudWarmUpManager {
void handle_jobs();
std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id);

void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");

DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
DEFINE_Int32(file_cache_downloader_thread_num_max, "32");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
// inverted index searcher cache size
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,7 @@ DECLARE_mBool(enbale_dump_error_file);
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us);
DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);

// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file cache.
Expand All @@ -1167,6 +1168,8 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
DECLARE_Int32(file_cache_downloader_thread_num_min);
DECLARE_Int32(file_cache_downloader_thread_num_max);

DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);

Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ bvar::Adder<uint64_t> g_file_cache_download_failed_num("file_cache_download_fail
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this);
auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
.set_min_threads(4)
.set_max_threads(16)
.set_min_threads(config::file_cache_downloader_thread_num_min)
.set_max_threads(config::file_cache_downloader_thread_num_max)
.build(&_workers);
CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/block_file_cache_downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class FileCacheBlockDownloader {
// tablet id -> inflight block num of tablet
std::unordered_map<int64_t, int64_t> _inflight_tablets;

static inline constexpr size_t _max_size {10240};
static inline constexpr size_t _max_size {102400};
};

} // namespace doris::io
Loading