diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index de5c77208c5e89..8e32af1cf9189b 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -155,6 +155,16 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response, const TWarmUpCacheAsyncRequest& request) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) { + if (i > 0) oss << ","; + oss << request.tablet_ids[i]; + } + oss << "]"; + LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port + << ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str(); + std::string host = request.host; auto dns_cache = ExecEnv::GetInstance()->dns_cache(); if (dns_cache == nullptr) { @@ -174,6 +184,7 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr); if (!brpc_stub) { st = Status::RpcError("Address {} is wrong", brpc_addr); + LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; return; } brpc::Controller cntl; @@ -181,12 +192,17 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); }); PGetFileCacheMetaResponse brpc_response; + brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr); + VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString() + << ", response=" << brpc_response.DebugString(); if (!cntl.Failed()) { _engine.file_cache_block_downloader().submit_download_task( std::move(*brpc_response.mutable_file_cache_block_metas())); } else { st = Status::RpcError("{} isn't connected", brpc_addr); + LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr + << ", error=" << cntl.ErrorText(); } st.to_thrift(&t_status); response.status = t_status; @@ -194,6 +210,15 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response, const TCheckWarmUpCacheAsyncRequest& request) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < request.tablets.size() && i < 10; ++i) { + if (i > 0) oss << ","; + oss << request.tablets[i]; + } + oss << "]"; + LOG(INFO) << "check_warm_up_cache_async: enter, request tablets num=" << request.tablets.size() + << ", tablet_ids=" << oss.str(); std::map task_done; _engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done); DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", { @@ -203,6 +228,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon }); response.__set_task_done(task_done); + for (const auto& [tablet_id, done] : task_done) { + VLOG_DEBUG << "check_warm_up_cache_async: tablet_id=" << tablet_id << ", done=" << done; + } + Status st = Status::OK(); TStatus t_status; st.to_thrift(&t_status); diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 66e089c22e9d53..0144be6eed6fa9 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -71,6 +71,7 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( LOG_WARNING("try to access tablet file cache meta, but file cache not enabled"); return; } + LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size(); for (const auto& tablet_id : request->tablet_ids()) { auto res = _engine.tablet_mgr().get_tablet(tablet_id); if (!res.has_value()) { @@ -103,6 +104,8 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( } }); } + VLOG_DEBUG << "warm up get meta request=" << request->DebugString() + << ", response=" << response->DebugString(); } } // namespace doris diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 05c18e0b945ce3..fa5d068013a1f1 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -40,6 +40,8 @@ namespace doris::io { +bvar::Adder block_file_cache_downloader_task_total("file_cache_downloader_queue_total"); + FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) { _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this); auto st = ThreadPoolBuilder("FileCacheBlockDownloader") @@ -75,6 +77,8 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) { std::lock_guard lock(_inflight_mtx); for (auto& meta : std::get<0>(task.task_message)) { ++_inflight_tablets[meta.tablet_id()]; + LOG(INFO) << "submit_download_task: inflight_tablets[" << meta.tablet_id() + << "] = " << _inflight_tablets[meta.tablet_id()]; } } @@ -88,9 +92,14 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) { Status::InternalError("The downloader queue is full")); } } + LOG(INFO) << "submit_download_task: task queue full, pop front"; _task_queue.pop_front(); // Eliminate the earliest task in the queue + block_file_cache_downloader_task_total << -1; } + VLOG_DEBUG << "submit_download_task: push task, queue size before push: " + << _task_queue.size(); _task_queue.push_back(std::move(task)); + block_file_cache_downloader_task_total << 1; _empty.notify_all(); } } @@ -103,16 +112,21 @@ void FileCacheBlockDownloader::polling_download_task() { std::unique_lock lock(_mtx); _empty.wait(lock, [this]() { return !_task_queue.empty() || _closed; }); if (_closed) { + LOG(INFO) << "polling_download_task: downloader closed, exit polling"; break; } task = std::move(_task_queue.front()); _task_queue.pop_front(); + block_file_cache_downloader_task_total << -1; + VLOG_DEBUG << "polling_download_task: pop task, queue size after pop: " + << _task_queue.size(); } if (std::chrono::duration_cast(std::chrono::steady_clock::now() - task.atime) .count() < hot_interval) { + VLOG_DEBUG << "polling_download_task: submit download_blocks to thread pool"; auto st = _workers->submit_func( [this, task_ = std::move(task)]() mutable { download_blocks(task_); }); if (!st.ok()) { @@ -143,6 +157,9 @@ std::unordered_map snapshot_rs_metas(BaseTable void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { + VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id() + << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id() + << ", offset=" << meta.offset() << ", size=" << meta.size(); CloudTabletSPtr tablet; if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) { LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error(); @@ -154,6 +171,8 @@ void FileCacheBlockDownloader::download_file_cache_block( auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get()); auto find_it = id_to_rowset_meta_map.find(meta.rowset_id()); if (find_it == id_to_rowset_meta_map.end()) { + LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id() + << "rowset_id not found, rowset_id=" << meta.rowset_id(); return; } @@ -163,7 +182,7 @@ void FileCacheBlockDownloader::download_file_cache_block( return; } - auto download_done = [&, tablet_id = meta.tablet_id()](Status) { + auto download_done = [&, tablet_id = meta.tablet_id()](Status st) { std::lock_guard lock(_inflight_mtx); auto it = _inflight_tablets.find(tablet_id); TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block"); @@ -171,11 +190,17 @@ void FileCacheBlockDownloader::download_file_cache_block( LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id; } else { it->second--; + VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id + << "] = " << it->second; if (it->second <= 0) { DCHECK_EQ(it->second, 0) << it->first; _inflight_tablets.erase(it); + VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id + << "]"; } } + LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id + << "status=" << st.to_string(); }; DownloadFileMeta download_meta { @@ -199,6 +224,8 @@ void FileCacheBlockDownloader::download_file_cache_block( } void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& meta) { + LOG(INFO) << "download_segment_file: start, path=" << meta.path << ", offset=" << meta.offset + << ", download_size=" << meta.download_size << ", file_size=" << meta.file_size; FileReaderSPtr file_reader; FileReaderOptions opts { .cache_type = FileCachePolicy::FILE_BLOCK_CACHE, @@ -208,7 +235,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met }; auto st = meta.file_system->open_file(meta.path, &file_reader, &opts); if (!st.ok()) { - LOG(WARNING) << "failed to download file: " << st; + LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st; if (meta.download_done) { meta.download_done(std::move(st)); } @@ -227,13 +254,15 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met size_t size = std::min(one_single_task_size, static_cast(meta.download_size - offset)); size_t bytes_read; + VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", read_at offset=" << offset + << ", size=" << size; // TODO(plat1ko): // 1. Directly append buffer data to file cache // 2. Provide `FileReader::async_read()` interface DCHECK(meta.ctx.is_dryrun == config::enable_reader_dryrun_when_download_file_cache); auto st = file_reader->read_at(offset, {buffer.get(), size}, &bytes_read, &meta.ctx); if (!st.ok()) { - LOG(WARNING) << "failed to download file: " << st; + LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st; if (meta.download_done) { meta.download_done(std::move(st)); } @@ -242,6 +271,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met } if (meta.download_done) { + LOG(INFO) << "download_segment_file: download finished, path=" << meta.path; meta.download_done(Status::OK()); } }