diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index d260e256afc450..a50d0e36419f0c 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -115,7 +115,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, .tag("request_type", "SET_BATCH") .tag("job_id", request.job_id) .tag("batch_id", request.batch_id) - .tag("jobs size", request.job_metas.size()); + .tag("jobs size", request.job_metas.size()) + .tag("tablet num of first meta", + request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size()); bool retry = false; st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry); if (!retry && st) { diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 15c346e465dc0a..7a304a872a496b 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -17,7 +17,8 @@ #include "cloud/cloud_warm_up_manager.h" -#include +#include +#include #include #include @@ -69,6 +70,7 @@ bvar::Adder g_file_cache_recycle_cache_requested_index_num( "file_cache_recycle_cache_requested_index_num"); bvar::Status g_file_cache_warm_up_rowset_last_call_unix_ts( "file_cache_warm_up_rowset_last_call_unix_ts", 0); +bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) { _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this); @@ -95,6 +97,52 @@ std::unordered_map snapshot_rs_metas(BaseTable return id_to_rowset_meta_map; } +void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, + io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait) { + if (file_size < 0) { + auto st = file_system->file_size(path, &file_size); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << "get file size failed: " << path; + file_cache_warm_up_failed_task_num << 1; + return; + } + } + + const int64_t chunk_size = 10 * 1024 * 1024; // 10MB + int64_t offset = 0; + int64_t remaining_size = file_size; + + while (remaining_size > 0) { + int64_t current_chunk_size = std::min(chunk_size, remaining_size); + wait->add_count(); + + _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { + .path = path, + .file_size = file_size, + .offset = offset, + .download_size = current_chunk_size, + .file_system = file_system, + .ctx = + { + .expiration_time = expiration_time, + .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + }, + .download_done = + [wait](Status st) { + if (!st) { + LOG_WARNING("Warm up error ").error(st); + } + wait->signal(); + }, + }); + + offset += current_chunk_size; + remaining_size -= current_chunk_size; + } +} + void CloudWarmUpManager::handle_jobs() { #ifndef BE_TEST constexpr int WAIT_TIME_SECONDS = 600; @@ -116,6 +164,10 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up job is null"); continue; } + + std::shared_ptr wait = + std::make_shared(0); + for (int64_t tablet_id : cur_job->tablet_ids) { if (_cur_job_id == 0) { // The job is canceled break; @@ -131,8 +183,7 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st); continue; } - std::shared_ptr wait = - std::make_shared(0); + auto tablet_meta = tablet->tablet_meta(); auto rs_metas = snapshot_rs_metas(tablet.get()); for (auto& [_, rs] : rs_metas) { @@ -151,96 +202,62 @@ void CloudWarmUpManager::handle_jobs() { expiration_time = 0; } - wait->add_count(); g_file_cache_once_or_periodic_warm_up_submitted_segment_num << 1; if (rs->segment_file_size(seg_id) > 0) { g_file_cache_once_or_periodic_warm_up_submitted_segment_size << rs->segment_file_size(seg_id); } - _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { - .path = storage_resource.value()->remote_segment_path(*rs, seg_id), - .file_size = rs->segment_file_size(seg_id), - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config:: - enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }); - - auto download_idx_file = [&](const io::Path& idx_path, int64_t idx_size) { - io::DownloadFileMeta meta { - .path = idx_path, - .file_size = idx_size, - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config:: - enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }; - // clang-format on - g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1; - g_file_cache_once_or_periodic_warm_up_submitted_index_size << idx_size; - _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); - }; + // 1st. download segment files + submit_download_tasks( + storage_resource.value()->remote_segment_path(*rs, seg_id), + rs->segment_file_size(seg_id), storage_resource.value()->fs, + expiration_time, wait); + + // 2nd. download inverted index files + int64_t file_size = -1; auto schema_ptr = rs->tablet_schema(); auto idx_version = schema_ptr->get_inverted_index_storage_format(); + const auto& idx_file_info = rs->inverted_index_file_info(seg_id); if (idx_version == InvertedIndexStorageFormatPB::V1) { auto&& inverted_index_info = rs->inverted_index_file_info(seg_id); std::unordered_map index_size_map; - for (const auto& info : inverted_index_info.index_info()) { - if (info.index_file_size() != -1) { - index_size_map[info.index_id()] = info.index_file_size(); - } else { - VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id - << ", index_id " << info.index_id(); - } - } for (const auto& index : schema_ptr->inverted_indexes()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v1_path( *rs, seg_id, index->index_id(), index->get_index_suffix()); - download_idx_file(idx_path, index_size_map[index->index_id()]); + if (idx_file_info.index_info_size() > 0) { + for (const auto& idx_info : idx_file_info.index_info()) { + if (index->index_id() == idx_info.index_id() && + index->get_index_suffix() == idx_info.index_suffix()) { + file_size = idx_info.index_file_size(); + break; + } + } + } + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); + g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1; + g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size; } } else { if (schema_ptr->has_inverted_index()) { - auto&& inverted_index_info = rs->inverted_index_file_info(seg_id); - int64_t idx_size = 0; - if (inverted_index_info.has_index_size()) { - idx_size = inverted_index_info.index_size(); - } else { - VLOG_DEBUG << "index_size is not set for segment " << seg_id; - } - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v2_path(*rs, seg_id); - download_idx_file(idx_path, idx_size); + file_size = idx_file_info.has_index_size() ? idx_file_info.index_size() + : -1; + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); + g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1; + g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size; } } } } - timespec time; - time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; - if (!wait->timed_wait(time)) { - LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size()); - } + } + + timespec time; + time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; + if (wait->timed_wait(time)) { + LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size()); } { std::unique_lock lock(_mtx); diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index 55fbcc476da25d..85a460fda1e4ad 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include #include @@ -79,6 +81,9 @@ class CloudWarmUpManager { void handle_jobs(); std::vector get_replica_info(int64_t tablet_id); + void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait); std::mutex _mtx; std::condition_variable _cond; int64_t _cur_job_id {0}; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 43bda8759741ae..b0bf713a28eae9 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1130,6 +1130,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000"); DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000"); DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000"); +DEFINE_Int32(file_cache_downloader_thread_num_min, "32"); +DEFINE_Int32(file_cache_downloader_thread_num_max, "32"); + DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); // inverted index searcher cache size diff --git a/be/src/common/config.h b/be/src/common/config.h index 037353fe0a4008..c1ff44b8d10b44 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1154,6 +1154,7 @@ DECLARE_mBool(enbale_dump_error_file); DECLARE_mInt64(file_cache_error_log_limit_bytes); DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us); DECLARE_mInt64(cache_lock_held_long_tail_threshold_us); + // Base compaction may retrieve and produce some less frequently accessed data, // potentially affecting the file cache hit rate. // This configuration determines whether to retain the output within the file cache. @@ -1167,6 +1168,8 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache); DECLARE_mInt64(file_cache_background_monitor_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_batch); +DECLARE_Int32(file_cache_downloader_thread_num_min); +DECLARE_Int32(file_cache_downloader_thread_num_max); DECLARE_mBool(enable_reader_dryrun_when_download_file_cache); diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index dc4c622e807daf..1732197e5b407c 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -49,8 +49,8 @@ bvar::Adder g_file_cache_download_failed_num("file_cache_download_fail FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) { _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this); auto st = ThreadPoolBuilder("FileCacheBlockDownloader") - .set_min_threads(4) - .set_max_threads(16) + .set_min_threads(config::file_cache_downloader_thread_num_min) + .set_max_threads(config::file_cache_downloader_thread_num_max) .build(&_workers); CHECK(st.ok()) << "failed to create FileCacheBlockDownloader"; } diff --git a/be/src/io/cache/block_file_cache_downloader.h b/be/src/io/cache/block_file_cache_downloader.h index 30827b69580553..c9a4689167363f 100644 --- a/be/src/io/cache/block_file_cache_downloader.h +++ b/be/src/io/cache/block_file_cache_downloader.h @@ -92,7 +92,7 @@ class FileCacheBlockDownloader { // tablet id -> inflight block num of tablet std::unordered_map _inflight_tablets; - static inline constexpr size_t _max_size {10240}; + static inline constexpr size_t _max_size {102400}; }; } // namespace doris::io