Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
continue;
}
int64_t tablet_id = rs_meta.tablet_id();
auto rowset_id = rs_meta.rowset_id();
bool local_only = !(request->has_skip_existence_check() && request->skip_existence_check());
auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = */ false,
/* sync_delete_bitmap = */ true,
Expand All @@ -212,7 +213,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
g_file_cache_warm_up_rowset_request_to_handle_slow_count << 1;
LOG(INFO) << "warm up rowset (request to handle) took " << handle_ts - request_ts
<< " us, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id().to_string();
<< ", rowset_id: " << rowset_id.to_string();
}
int64_t expiration_time =
tablet_meta->ttl_seconds() == 0 || rs_meta.newest_write_timestamp() <= 0
Expand All @@ -223,10 +224,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
auto download_done = [&, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string(),
segment_size = rs_meta.segment_file_size(segment_id),
wait](Status st) {
auto segment_size = rs_meta.segment_file_size(segment_id);
auto download_done = [=](Status st) {
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
Expand All @@ -240,20 +239,22 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_slow_count << 1;
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
<< " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
<< " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (wait) {
wait->signal();
Expand All @@ -262,9 +263,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c

io::DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
.file_size = rs_meta.segment_file_size(segment_id),
.file_size = segment_size,
.offset = 0,
.download_size = rs_meta.segment_file_size(segment_id),
.download_size = segment_size,
.file_system = storage_resource.value()->fs,
.ctx =
{
Expand All @@ -276,17 +277,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.download_done = std::move(download_done),
};
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size
<< rs_meta.segment_file_size(segment_id);
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
if (wait) {
wait->add_count();
}
_engine.file_cache_block_downloader().submit_download_task(download_meta);

auto download_inverted_index = [&](std::string index_path, uint64_t idx_size) {
auto storage_resource = rs_meta.remote_storage_resource();
auto download_done = [=, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string()](Status st) {
auto download_done = [=](Status st) {
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_index_num << 1;
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
Expand All @@ -303,21 +302,22 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
g_file_cache_warm_up_rowset_slow_count << 1;
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
<< " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
LOG(INFO) << "warm up rowset (handle to finish) took "
<< now_ts - handle_ts << " us, tablet_id: " << tablet_id
<< ", rowset_id: " << rowset_id
<< ", rowset_id: " << rowset_id.to_string()
<< ", segment_id: " << segment_id;
}
} else {
g_file_cache_event_driven_warm_up_failed_index_num << 1;
g_file_cache_event_driven_warm_up_failed_index_size << idx_size;
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
LOG(WARNING)
<< "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (wait) {
wait->signal();
Expand Down
Loading