Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,

void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
if (i > 0) oss << ",";
oss << request.tablet_ids[i];
}
oss << "]";
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
<< ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();

std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
Expand All @@ -174,26 +184,41 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
return;
}
brpc::Controller cntl;
PGetFileCacheMetaRequest brpc_request;
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
PGetFileCacheMetaResponse brpc_response;

brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
<< ", response=" << brpc_response.DebugString();
if (!cntl.Failed()) {
_engine.file_cache_block_downloader().submit_download_task(
std::move(*brpc_response.mutable_file_cache_block_metas()));
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
<< ", error=" << cntl.ErrorText();
}
st.to_thrift(&t_status);
response.status = t_status;
}

void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) {
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < request.tablets.size() && i < 10; ++i) {
if (i > 0) oss << ",";
oss << request.tablets[i];
}
oss << "]";
LOG(INFO) << "check_warm_up_cache_async: enter, request tablets num=" << request.tablets.size()
<< ", tablet_ids=" << oss.str();
std::map<int64_t, bool> task_done;
_engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done);
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", {
Expand All @@ -203,6 +228,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
});
response.__set_task_done(task_done);

for (const auto& [tablet_id, done] : task_done) {
VLOG_DEBUG << "check_warm_up_cache_async: tablet_id=" << tablet_id << ", done=" << done;
}

Status st = Status::OK();
TStatus t_status;
st.to_thrift(&t_status);
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
return;
}
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size();
for (const auto& tablet_id : request->tablet_ids()) {
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
if (!res.has_value()) {
Expand Down Expand Up @@ -103,6 +104,8 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
}
});
}
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
}

} // namespace doris
36 changes: 33 additions & 3 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

namespace doris::io {

bvar::Adder<uint64_t> block_file_cache_downloader_task_total("file_cache_downloader_queue_total");

FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this);
auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
Expand Down Expand Up @@ -75,6 +77,8 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
std::lock_guard lock(_inflight_mtx);
for (auto& meta : std::get<0>(task.task_message)) {
++_inflight_tablets[meta.tablet_id()];
LOG(INFO) << "submit_download_task: inflight_tablets[" << meta.tablet_id()
<< "] = " << _inflight_tablets[meta.tablet_id()];
}
}

Expand All @@ -88,9 +92,14 @@ void FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
Status::InternalError("The downloader queue is full"));
}
}
LOG(INFO) << "submit_download_task: task queue full, pop front";
_task_queue.pop_front(); // Eliminate the earliest task in the queue
block_file_cache_downloader_task_total << -1;
}
VLOG_DEBUG << "submit_download_task: push task, queue size before push: "
<< _task_queue.size();
_task_queue.push_back(std::move(task));
block_file_cache_downloader_task_total << 1;
_empty.notify_all();
}
}
Expand All @@ -103,16 +112,21 @@ void FileCacheBlockDownloader::polling_download_task() {
std::unique_lock lock(_mtx);
_empty.wait(lock, [this]() { return !_task_queue.empty() || _closed; });
if (_closed) {
LOG(INFO) << "polling_download_task: downloader closed, exit polling";
break;
}

task = std::move(_task_queue.front());
_task_queue.pop_front();
block_file_cache_downloader_task_total << -1;
VLOG_DEBUG << "polling_download_task: pop task, queue size after pop: "
<< _task_queue.size();
}

if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() -
task.atime)
.count() < hot_interval) {
VLOG_DEBUG << "polling_download_task: submit download_blocks to thread pool";
auto st = _workers->submit_func(
[this, task_ = std::move(task)]() mutable { download_blocks(task_); });
if (!st.ok()) {
Expand Down Expand Up @@ -143,6 +157,9 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTable
void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id()
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
<< ", offset=" << meta.offset() << ", size=" << meta.size();
CloudTabletSPtr tablet;
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) {
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error();
Expand All @@ -154,6 +171,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
<< "rowset_id not found, rowset_id=" << meta.rowset_id();
return;
}

Expand All @@ -163,19 +182,25 @@ void FileCacheBlockDownloader::download_file_cache_block(
return;
}

auto download_done = [&, tablet_id = meta.tablet_id()](Status) {
auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
std::lock_guard lock(_inflight_mtx);
auto it = _inflight_tablets.find(tablet_id);
TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
if (it == _inflight_tablets.end()) {
LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id;
} else {
it->second--;
VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id
<< "] = " << it->second;
if (it->second <= 0) {
DCHECK_EQ(it->second, 0) << it->first;
_inflight_tablets.erase(it);
VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id
<< "]";
}
}
LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id
<< "status=" << st.to_string();
};

DownloadFileMeta download_meta {
Expand All @@ -199,6 +224,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
}

void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& meta) {
LOG(INFO) << "download_segment_file: start, path=" << meta.path << ", offset=" << meta.offset
<< ", download_size=" << meta.download_size << ", file_size=" << meta.file_size;
FileReaderSPtr file_reader;
FileReaderOptions opts {
.cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
Expand All @@ -208,7 +235,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to download file: " << st;
LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
if (meta.download_done) {
meta.download_done(std::move(st));
}
Expand All @@ -227,13 +254,15 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
size_t size =
std::min(one_single_task_size, static_cast<size_t>(meta.download_size - offset));
size_t bytes_read;
VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", read_at offset=" << offset
<< ", size=" << size;
// TODO(plat1ko):
// 1. Directly append buffer data to file cache
// 2. Provide `FileReader::async_read()` interface
DCHECK(meta.ctx.is_dryrun == config::enable_reader_dryrun_when_download_file_cache);
auto st = file_reader->read_at(offset, {buffer.get(), size}, &bytes_read, &meta.ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to download file: " << st;
LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
if (meta.download_done) {
meta.download_done(std::move(st));
}
Expand All @@ -242,6 +271,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
}

if (meta.download_done) {
LOG(INFO) << "download_segment_file: download finished, path=" << meta.path;
meta.download_done(Status::OK());
}
}
Expand Down
Loading