From b6061bada22f5a78377d84a08af09032cbed9c7a Mon Sep 17 00:00:00 2001 From: zhengyu Date: Mon, 30 Jun 2025 22:59:03 +0800 Subject: [PATCH] [optimization](filecache) speed up filecache warm up pick #51776 pick #51776 pick #51776 this pr does the following: make file cache downloader worker pool thread num configurable make warm up job split batch size configurable split large file downloading task to smaller ones to maintain load balance between threads, thus improve concurrency use meta info to deduce size of inverted idx file size to reduce S3 HEAD ops some log print optimization in our test, this opt can improve more than 3x file cache warm up performance Signed-off-by: zhengyu --- be/src/cloud/cloud_backend_service.cpp | 4 +- be/src/cloud/cloud_tablet.cpp | 12 +- be/src/cloud/cloud_warm_up_manager.cpp | 138 +++++++++++------- be/src/cloud/cloud_warm_up_manager.h | 6 +- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + .../io/cache/block_file_cache_downloader.cpp | 4 +- be/src/io/cache/block_file_cache_downloader.h | 2 +- .../java/org/apache/doris/common/Config.java | 3 + .../doris/cloud/CacheHotspotManager.java | 2 +- 10 files changed, 116 insertions(+), 61 deletions(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 265e6c44aac9ab..f94807282b380c 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -105,7 +105,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, .tag("request_type", "SET_BATCH") .tag("job_id", request.job_id) .tag("batch_id", request.batch_id) - .tag("jobs size", request.job_metas.size()); + .tag("jobs size", request.job_metas.size()) + .tag("tablet num of first meta", + request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size()); bool retry = false; st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry); if (!retry && st) { diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 4344b085a838dc..78663de6ed86d6 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -279,7 +279,11 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, }, - .download_done {}, + .download_done {[](Status st) { + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, }); auto download_idx_file = [&](const io::Path& idx_path) { @@ -292,7 +296,11 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, }, - .download_done {}, + .download_done {[](Status st) { + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, }; _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); }; diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index d8bce097465dde..510c677f06f34c 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -17,7 +17,8 @@ #include "cloud/cloud_warm_up_manager.h" -#include +#include +#include #include #include @@ -34,6 +35,8 @@ namespace doris { +bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); + CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) { _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this); } @@ -59,6 +62,52 @@ std::unordered_map snapshot_rs_metas(BaseTable return id_to_rowset_meta_map; } +void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, + io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait) { + if (file_size < 0) { + auto st = file_system->file_size(path, &file_size); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << "get file size failed: " << path; + file_cache_warm_up_failed_task_num << 1; + return; + } + } + + const int64_t chunk_size = 10 * 1024 * 1024; // 10MB + int64_t offset = 0; + int64_t remaining_size = file_size; + + while (remaining_size > 0) { + int64_t current_chunk_size = std::min(chunk_size, remaining_size); + wait->add_count(); + + _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { + .path = path, + .file_size = file_size, + .offset = offset, + .download_size = current_chunk_size, + .file_system = file_system, + .ctx = + { + .expiration_time = expiration_time, + .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + }, + .download_done = + [wait](Status st) { + if (!st) { + LOG_WARNING("Warm up error ").error(st); + } + wait->signal(); + }, + }); + + offset += current_chunk_size; + remaining_size -= current_chunk_size; + } +} + void CloudWarmUpManager::handle_jobs() { #ifndef BE_TEST constexpr int WAIT_TIME_SECONDS = 600; @@ -77,6 +126,10 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up job is null"); continue; } + + std::shared_ptr wait = + std::make_shared(0); + for (int64_t tablet_id : cur_job->tablet_ids) { if (_cur_job_id == 0) { // The job is canceled break; @@ -92,8 +145,7 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st); continue; } - std::shared_ptr wait = - std::make_shared(0); + auto tablet_meta = tablet->tablet_meta(); auto rs_metas = snapshot_rs_metas(tablet.get()); for (auto& [_, rs] : rs_metas) { @@ -112,71 +164,51 @@ void CloudWarmUpManager::handle_jobs() { expiration_time = 0; } - wait->add_count(); - // clang-format off - _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { - .path = storage_resource.value()->remote_segment_path(*rs, seg_id), - .file_size = rs->segment_file_size(seg_id), - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }); - - auto download_idx_file = [&](const io::Path& idx_path) { - io::DownloadFileMeta meta { - .path = idx_path, - .file_size = -1, - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }; - // clang-format on - _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); - }; + // 1st. download segment files + submit_download_tasks( + storage_resource.value()->remote_segment_path(*rs, seg_id), + rs->segment_file_size(seg_id), storage_resource.value()->fs, + expiration_time, wait); + + // 2nd. download inverted index files + int64_t file_size = -1; auto schema_ptr = rs->tablet_schema(); auto idx_version = schema_ptr->get_inverted_index_storage_format(); + const auto& idx_file_info = rs->inverted_index_file_info(seg_id); if (idx_version == InvertedIndexStorageFormatPB::V1) { for (const auto& index : schema_ptr->inverted_indexes()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v1_path( *rs, seg_id, index->index_id(), index->get_index_suffix()); - download_idx_file(idx_path); + if (idx_file_info.index_info_size() > 0) { + for (const auto& idx_info : idx_file_info.index_info()) { + if (index->index_id() == idx_info.index_id() && + index->get_index_suffix() == idx_info.index_suffix()) { + file_size = idx_info.index_file_size(); + break; + } + } + } + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); } } else { if (schema_ptr->has_inverted_index()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v2_path(*rs, seg_id); - download_idx_file(idx_path); + file_size = idx_file_info.has_index_size() ? idx_file_info.index_size() + : -1; + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); } } } } - timespec time; - time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; - if (!wait->timed_wait(time)) { - LOG_WARNING("Warm up tablet {} take a long time", tablet_meta->tablet_id()); - } + } + + timespec time; + time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; + if (wait->timed_wait(time)) { + LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size()); } { std::unique_lock lock(_mtx); diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index 219dedc58065a6..356d7284f6f3ee 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include #include @@ -69,7 +71,9 @@ class CloudWarmUpManager { private: void handle_jobs(); - + void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait); std::mutex _mtx; std::condition_variable _cond; int64_t _cur_job_id {0}; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3df7058c12d1ce..2fcedc2fd235d3 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1128,6 +1128,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000"); DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000"); DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000"); +DEFINE_Int32(file_cache_downloader_thread_num_min, "32"); +DEFINE_Int32(file_cache_downloader_thread_num_max, "32"); + DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); // inverted index searcher cache size diff --git a/be/src/common/config.h b/be/src/common/config.h index 0fa93d0f8f8ef1..5f5a8f14c2dd81 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1152,6 +1152,7 @@ DECLARE_mBool(enbale_dump_error_file); DECLARE_mInt64(file_cache_error_log_limit_bytes); DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us); DECLARE_mInt64(cache_lock_held_long_tail_threshold_us); + // Base compaction may retrieve and produce some less frequently accessed data, // potentially affecting the file cache hit rate. // This configuration determines whether to retain the output within the file cache. @@ -1165,6 +1166,8 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache); DECLARE_mInt64(file_cache_background_monitor_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_batch); +DECLARE_Int32(file_cache_downloader_thread_num_min); +DECLARE_Int32(file_cache_downloader_thread_num_max); // inverted index searcher cache // cache entry stay time after lookup diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index b9944e39989d2b..05c18e0b945ce3 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -43,8 +43,8 @@ namespace doris::io { FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) { _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this); auto st = ThreadPoolBuilder("FileCacheBlockDownloader") - .set_min_threads(4) - .set_max_threads(16) + .set_min_threads(config::file_cache_downloader_thread_num_min) + .set_max_threads(config::file_cache_downloader_thread_num_max) .build(&_workers); CHECK(st.ok()) << "failed to create FileCacheBlockDownloader"; } diff --git a/be/src/io/cache/block_file_cache_downloader.h b/be/src/io/cache/block_file_cache_downloader.h index 30827b69580553..c9a4689167363f 100644 --- a/be/src/io/cache/block_file_cache_downloader.h +++ b/be/src/io/cache/block_file_cache_downloader.h @@ -92,7 +92,7 @@ class FileCacheBlockDownloader { // tablet id -> inflight block num of tablet std::unordered_map _inflight_tablets; - static inline constexpr size_t _max_size {10240}; + static inline constexpr size_t _max_size {102400}; }; } // namespace doris::io diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index de2604ad011a13..b3d92cad6f75ac 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3210,6 +3210,9 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static int cloud_warm_up_job_scheduler_interval_millisecond = 1000; // 1 seconds + @ConfField(mutable = true, masterOnly = true) + public static long cloud_warm_up_job_max_bytes_per_batch = 21474836480L; // 20GB + @ConfField(mutable = true, masterOnly = true) public static boolean enable_fetch_cluster_cache_hotspot = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index b73e467836d91c..4e073b21473566 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -365,7 +365,7 @@ Long getFileCacheCapacity(String clusterName) throws RuntimeException { } private Map>> splitBatch(Map> beToWarmUpTablets) { - final Long maxSizePerBatch = 10737418240L; // 10G + final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch; Map>> beToTabletIdBatches = new HashMap<>(); for (Map.Entry> entry : beToWarmUpTablets.entrySet()) { List> batches = new ArrayList<>();