From 4d031de3ec01da12958d7d31d8523f871508ec86 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Fri, 20 Jun 2025 13:57:55 +0800 Subject: [PATCH 1/4] (selectdb-cloud) add log for balance warm up (#4060) --- be/src/cloud/cloud_backend_service.cpp | 28 +++++++++++++++++++ .../io/cache/block_file_cache_downloader.cpp | 25 ++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index de5c77208c5e89..e1d76dd5202e6c 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -155,6 +155,16 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response, const TWarmUpCacheAsyncRequest& request) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < request.tablet_ids.size(); ++i) { + if (i > 0) oss << ","; + oss << request.tablet_ids[i]; + } + oss << "]"; + LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port + << ", tablet_ids=" << oss.str(); + std::string host = request.host; auto dns_cache = ExecEnv::GetInstance()->dns_cache(); if (dns_cache == nullptr) { @@ -174,6 +184,7 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr); if (!brpc_stub) { st = Status::RpcError("Address {} is wrong", brpc_addr); + LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; return; } brpc::Controller cntl; @@ -181,12 +192,17 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); }); PGetFileCacheMetaResponse brpc_response; + brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr); + LOG(INFO) << "warm_up_cache_async: request=" << brpc_request.DebugString() + << ", response=" << brpc_response.DebugString(); if (!cntl.Failed()) { _engine.file_cache_block_downloader().submit_download_task( std::move(*brpc_response.mutable_file_cache_block_metas())); } else { st = Status::RpcError("{} isn't connected", brpc_addr); + LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr + << ", error=" << cntl.ErrorText(); } st.to_thrift(&t_status); response.status = t_status; @@ -194,6 +210,14 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response, const TCheckWarmUpCacheAsyncRequest& request) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < request.tablets.size(); ++i) { + if (i > 0) oss << ","; + oss << request.tablets[i]; + } + oss << "]"; + LOG(INFO) << "check_warm_up_cache_async: enter, request tablet_ids=" << oss.str(); std::map task_done; _engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done); DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", { @@ -203,6 +227,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon }); response.__set_task_done(task_done); + for (const auto& [tablet_id, done] : task_done) { + LOG(INFO) << "check_warm_up_cache_async: tablet_id=" << tablet_id << ", done=" << done; + } + Status st = Status::OK(); TStatus t_status; st.to_thrift(&t_status); diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 05c18e0b945ce3..38fe094216ae54 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -75,6 +75,8 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) { std::lock_guard lock(_inflight_mtx); for (auto& meta : std::get<0>(task.task_message)) { ++_inflight_tablets[meta.tablet_id()]; + LOG(INFO) << "submit_download_task: inflight_tablets[" << meta.tablet_id() + << "] = " << _inflight_tablets[meta.tablet_id()]; } } @@ -88,8 +90,11 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) { Status::InternalError("The downloader queue is full")); } } + LOG(INFO) << "submit_download_task: task queue full, pop front"; _task_queue.pop_front(); // Eliminate the earliest task in the queue } + LOG(INFO) << "submit_download_task: push task, queue size before push: " + << _task_queue.size(); _task_queue.push_back(std::move(task)); _empty.notify_all(); } @@ -103,16 +108,20 @@ void FileCacheBlockDownloader::polling_download_task() { std::unique_lock lock(_mtx); _empty.wait(lock, [this]() { return !_task_queue.empty() || _closed; }); if (_closed) { + LOG(INFO) << "polling_download_task: downloader closed, exit polling"; break; } task = std::move(_task_queue.front()); _task_queue.pop_front(); + LOG(INFO) << "polling_download_task: pop task, queue size after pop: " + << _task_queue.size(); } if (std::chrono::duration_cast(std::chrono::steady_clock::now() - task.atime) .count() < hot_interval) { + LOG(INFO) << "polling_download_task: submit download_blocks to thread pool"; auto st = _workers->submit_func( [this, task_ = std::move(task)]() mutable { download_blocks(task_); }); if (!st.ok()) { @@ -143,6 +152,9 @@ std::unordered_map snapshot_rs_metas(BaseTable void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { + LOG(INFO) << "download_file_cache_block: start, tablet_id=" << meta.tablet_id() + << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id() + << ", offset=" << meta.offset() << ", size=" << meta.size(); CloudTabletSPtr tablet; if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) { LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error(); @@ -154,6 +166,8 @@ void FileCacheBlockDownloader::download_file_cache_block( auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get()); auto find_it = id_to_rowset_meta_map.find(meta.rowset_id()); if (find_it == id_to_rowset_meta_map.end()) { + LOG(WARNING) << "download_file_cache_block: rowset_id not found, rowset_id=" + << meta.rowset_id(); return; } @@ -163,7 +177,7 @@ void FileCacheBlockDownloader::download_file_cache_block( return; } - auto download_done = [&, tablet_id = meta.tablet_id()](Status) { + auto download_done = [&, tablet_id = meta.tablet_id()](Status st) { std::lock_guard lock(_inflight_mtx); auto it = _inflight_tablets.find(tablet_id); TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block"); @@ -171,11 +185,16 @@ void FileCacheBlockDownloader::download_file_cache_block( LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id; } else { it->second--; + LOG(INFO) << "download_file_cache_block: inflight_tablets[" << tablet_id + << "] = " << it->second; if (it->second <= 0) { DCHECK_EQ(it->second, 0) << it->first; _inflight_tablets.erase(it); + LOG(INFO) << "download_file_cache_block: erase inflight_tablets[" << tablet_id + << "]"; } } + LOG(INFO) << "download_file_cache_block: download_done, status=" << st.to_string(); }; DownloadFileMeta download_meta { @@ -199,6 +218,8 @@ void FileCacheBlockDownloader::download_file_cache_block( } void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& meta) { + LOG(INFO) << "download_segment_file: start, path=" << meta.path << ", offset=" << meta.offset + << ", download_size=" << meta.download_size << ", file_size=" << meta.file_size; FileReaderSPtr file_reader; FileReaderOptions opts { .cache_type = FileCachePolicy::FILE_BLOCK_CACHE, @@ -227,6 +248,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met size_t size = std::min(one_single_task_size, static_cast(meta.download_size - offset)); size_t bytes_read; + LOG(INFO) << "download_segment_file: read_at offset=" << offset << ", size=" << size; // TODO(plat1ko): // 1. Directly append buffer data to file cache // 2. Provide `FileReader::async_read()` interface @@ -242,6 +264,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met } if (meta.download_done) { + LOG(INFO) << "download_segment_file: download finished, path=" << meta.path; meta.download_done(Status::OK()); } } From 427801f1963e583f6e6d958495b22d1349bd266a Mon Sep 17 00:00:00 2001 From: deardeng Date: Wed, 9 Jul 2025 11:19:52 +0800 Subject: [PATCH 2/4] fix --- be/src/cloud/cloud_backend_service.cpp | 9 +++++---- .../io/cache/block_file_cache_downloader.cpp | 18 ++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index e1d76dd5202e6c..b01d223a5fb526 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -157,13 +157,13 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons const TWarmUpCacheAsyncRequest& request) { std::ostringstream oss; oss << "["; - for (size_t i = 0; i < request.tablet_ids.size(); ++i) { + for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) { if (i > 0) oss << ","; oss << request.tablet_ids[i]; } oss << "]"; LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port - << ", tablet_ids=" << oss.str(); + << ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str(); std::string host = request.host; auto dns_cache = ExecEnv::GetInstance()->dns_cache(); @@ -212,12 +212,13 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon const TCheckWarmUpCacheAsyncRequest& request) { std::ostringstream oss; oss << "["; - for (size_t i = 0; i < request.tablets.size(); ++i) { + for (size_t i = 0; i < request.tablets.size() && i < 10; ++i) { if (i > 0) oss << ","; oss << request.tablets[i]; } oss << "]"; - LOG(INFO) << "check_warm_up_cache_async: enter, request tablet_ids=" << oss.str(); + LOG(INFO) << "check_warm_up_cache_async: enter, request tablets num=" << request.tablets.size() + << ", tablet_ids=" << oss.str(); std::map task_done; _engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done); DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", { diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 38fe094216ae54..432501a5cda1e1 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -93,8 +93,8 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) { LOG(INFO) << "submit_download_task: task queue full, pop front"; _task_queue.pop_front(); // Eliminate the earliest task in the queue } - LOG(INFO) << "submit_download_task: push task, queue size before push: " - << _task_queue.size(); + VLOG_DEBUG << "submit_download_task: push task, queue size before push: " + << _task_queue.size(); _task_queue.push_back(std::move(task)); _empty.notify_all(); } @@ -166,8 +166,8 @@ void FileCacheBlockDownloader::download_file_cache_block( auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get()); auto find_it = id_to_rowset_meta_map.find(meta.rowset_id()); if (find_it == id_to_rowset_meta_map.end()) { - LOG(WARNING) << "download_file_cache_block: rowset_id not found, rowset_id=" - << meta.rowset_id(); + LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id() + << "rowset_id not found, rowset_id=" << meta.rowset_id(); return; } @@ -194,7 +194,8 @@ void FileCacheBlockDownloader::download_file_cache_block( << "]"; } } - LOG(INFO) << "download_file_cache_block: download_done, status=" << st.to_string(); + LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id + << "status=" << st.to_string(); }; DownloadFileMeta download_meta { @@ -229,7 +230,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met }; auto st = meta.file_system->open_file(meta.path, &file_reader, &opts); if (!st.ok()) { - LOG(WARNING) << "failed to download file: " << st; + LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st; if (meta.download_done) { meta.download_done(std::move(st)); } @@ -248,14 +249,15 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met size_t size = std::min(one_single_task_size, static_cast(meta.download_size - offset)); size_t bytes_read; - LOG(INFO) << "download_segment_file: read_at offset=" << offset << ", size=" << size; + VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", read_at offset=" << offset + << ", size=" << size; // TODO(plat1ko): // 1. Directly append buffer data to file cache // 2. Provide `FileReader::async_read()` interface DCHECK(meta.ctx.is_dryrun == config::enable_reader_dryrun_when_download_file_cache); auto st = file_reader->read_at(offset, {buffer.get(), size}, &bytes_read, &meta.ctx); if (!st.ok()) { - LOG(WARNING) << "failed to download file: " << st; + LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st; if (meta.download_done) { meta.download_done(std::move(st)); } From c1b82bce7b73c4ed835697bcf692031abb19fc86 Mon Sep 17 00:00:00 2001 From: deardeng Date: Wed, 9 Jul 2025 11:38:09 +0800 Subject: [PATCH 3/4] fix --- be/src/cloud/cloud_internal_service.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 66e089c22e9d53..7ac94154cc625b 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -80,6 +80,8 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( } CloudTabletSPtr tablet = std::move(res.value()); auto rowsets = tablet->get_snapshot_rowset(); + LOG(INFO) << "warm up get meta from tablet_id=" << tablet_id + << "rowsets size=" << rowsets.size(); std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const RowsetSharedPtr& rowset) { std::string rowset_id = rowset->rowset_id().to_string(); for (int64_t segment_id = 0; segment_id < rowset->num_segments(); segment_id++) { @@ -103,6 +105,8 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( } }); } + VLOG_DEBUG << "warm up get meta request=" << request->DebugString() + << ", response=" << response->DebugString(); } } // namespace doris From 3119ec2f8344c7972eda0a0c410740cf657dcf18 Mon Sep 17 00:00:00 2001 From: deardeng Date: Wed, 9 Jul 2025 14:31:29 +0800 Subject: [PATCH 4/4] fix review --- be/src/cloud/cloud_backend_service.cpp | 6 ++--- be/src/cloud/cloud_internal_service.cpp | 3 +-- .../io/cache/block_file_cache_downloader.cpp | 25 +++++++++++-------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index b01d223a5fb526..8e32af1cf9189b 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -194,8 +194,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons PGetFileCacheMetaResponse brpc_response; brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr); - LOG(INFO) << "warm_up_cache_async: request=" << brpc_request.DebugString() - << ", response=" << brpc_response.DebugString(); + VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString() + << ", response=" << brpc_response.DebugString(); if (!cntl.Failed()) { _engine.file_cache_block_downloader().submit_download_task( std::move(*brpc_response.mutable_file_cache_block_metas())); @@ -229,7 +229,7 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon response.__set_task_done(task_done); for (const auto& [tablet_id, done] : task_done) { - LOG(INFO) << "check_warm_up_cache_async: tablet_id=" << tablet_id << ", done=" << done; + VLOG_DEBUG << "check_warm_up_cache_async: tablet_id=" << tablet_id << ", done=" << done; } Status st = Status::OK(); diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 7ac94154cc625b..0144be6eed6fa9 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -71,6 +71,7 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( LOG_WARNING("try to access tablet file cache meta, but file cache not enabled"); return; } + LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size(); for (const auto& tablet_id : request->tablet_ids()) { auto res = _engine.tablet_mgr().get_tablet(tablet_id); if (!res.has_value()) { @@ -80,8 +81,6 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( } CloudTabletSPtr tablet = std::move(res.value()); auto rowsets = tablet->get_snapshot_rowset(); - LOG(INFO) << "warm up get meta from tablet_id=" << tablet_id - << "rowsets size=" << rowsets.size(); std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const RowsetSharedPtr& rowset) { std::string rowset_id = rowset->rowset_id().to_string(); for (int64_t segment_id = 0; segment_id < rowset->num_segments(); segment_id++) { diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 432501a5cda1e1..fa5d068013a1f1 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -40,6 +40,8 @@ namespace doris::io { +bvar::Adder block_file_cache_downloader_task_total("file_cache_downloader_queue_total"); + FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) { _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this); auto st = ThreadPoolBuilder("FileCacheBlockDownloader") @@ -92,10 +94,12 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) { } LOG(INFO) << "submit_download_task: task queue full, pop front"; _task_queue.pop_front(); // Eliminate the earliest task in the queue + block_file_cache_downloader_task_total << -1; } VLOG_DEBUG << "submit_download_task: push task, queue size before push: " << _task_queue.size(); _task_queue.push_back(std::move(task)); + block_file_cache_downloader_task_total << 1; _empty.notify_all(); } } @@ -114,14 +118,15 @@ void FileCacheBlockDownloader::polling_download_task() { task = std::move(_task_queue.front()); _task_queue.pop_front(); - LOG(INFO) << "polling_download_task: pop task, queue size after pop: " - << _task_queue.size(); + block_file_cache_downloader_task_total << -1; + VLOG_DEBUG << "polling_download_task: pop task, queue size after pop: " + << _task_queue.size(); } if (std::chrono::duration_cast(std::chrono::steady_clock::now() - task.atime) .count() < hot_interval) { - LOG(INFO) << "polling_download_task: submit download_blocks to thread pool"; + VLOG_DEBUG << "polling_download_task: submit download_blocks to thread pool"; auto st = _workers->submit_func( [this, task_ = std::move(task)]() mutable { download_blocks(task_); }); if (!st.ok()) { @@ -152,9 +157,9 @@ std::unordered_map snapshot_rs_metas(BaseTable void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { - LOG(INFO) << "download_file_cache_block: start, tablet_id=" << meta.tablet_id() - << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id() - << ", offset=" << meta.offset() << ", size=" << meta.size(); + VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id() + << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id() + << ", offset=" << meta.offset() << ", size=" << meta.size(); CloudTabletSPtr tablet; if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) { LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error(); @@ -185,13 +190,13 @@ void FileCacheBlockDownloader::download_file_cache_block( LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id; } else { it->second--; - LOG(INFO) << "download_file_cache_block: inflight_tablets[" << tablet_id - << "] = " << it->second; + VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id + << "] = " << it->second; if (it->second <= 0) { DCHECK_EQ(it->second, 0) << it->first; _inflight_tablets.erase(it); - LOG(INFO) << "download_file_cache_block: erase inflight_tablets[" << tablet_id - << "]"; + VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id + << "]"; } } LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id