Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "cloud/cloud_internal_service.h"

#include <bthread/countdown_event.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
Expand Down Expand Up @@ -149,13 +151,26 @@ bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_request_to_handle_slow_count(
"file_cache_warm_up_rowset_request_to_handle_slow_count");
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_handle_to_finish_slow_count(
"file_cache_warm_up_rowset_handle_to_finish_slow_count");
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
"file_cache_warm_up_rowset_wait_for_compaction_num");
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num");

void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* controller
[[maybe_unused]],
const PWarmUpRowsetRequest* request,
PWarmUpRowsetResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<bthread::CountdownEvent> wait = nullptr;
timespec due_time;
if (request->has_sync_wait_timeout_ms() && request->sync_wait_timeout_ms() > 0) {
g_file_cache_warm_up_rowset_wait_for_compaction_num << 1;
wait = std::make_shared<bthread::CountdownEvent>(0);
VLOG_DEBUG << "sync_wait_timeout: " << request->sync_wait_timeout_ms() << " ms";
due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms());
}

for (auto& rs_meta_pb : request->rowset_metas()) {
RowsetMeta rs_meta;
rs_meta.init_from_pb(rs_meta_pb);
Expand Down Expand Up @@ -197,9 +212,10 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
auto download_done = [=, tablet_id = rs_meta.tablet_id(),
auto download_done = [&, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string(),
segment_size = rs_meta.segment_file_size(segment_id)](Status st) {
segment_size = rs_meta.segment_file_size(segment_id),
wait](Status st) {
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
Expand Down Expand Up @@ -228,6 +244,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
if (wait) {
wait->signal();
}
};

io::DownloadFileMeta download_meta {
Expand All @@ -248,6 +267,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size
<< rs_meta.segment_file_size(segment_id);
if (wait) {
wait->add_count();
}
_engine.file_cache_block_downloader().submit_download_task(download_meta);

auto download_inverted_index = [&](std::string index_path, uint64_t idx_size) {
Expand Down Expand Up @@ -286,6 +308,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
if (wait) {
wait->signal();
}
};
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
Expand All @@ -302,6 +327,10 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;

if (wait) {
wait->add_count();
}
_engine.file_cache_block_downloader().submit_download_task(download_meta);
};

Expand Down Expand Up @@ -342,6 +371,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}
}
}
if (wait && wait->timed_wait(due_time)) {
g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num << 1;
LOG_WARNING("the time spent warming up {} rowsets exceeded {} ms",
request->rowset_metas().size(), request->sync_wait_timeout_ms());
}
}

bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_segment_num(
Expand Down
19 changes: 18 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,25 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
RETURN_IF_ERROR(
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
}

int64_t timeout_ms = -1;
// if the `job_id` is not empty, it means this rowset was produced by a compaction job.
if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) {
// 1. assume the download speed is 100MB/s
// 2. we double the download time as timeout for safety
// 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out
const double speed_mbps = 100.0; // 100MB/s
const double safety_factor = 2.0;
timeout_ms = std::min(
std::max(static_cast<int64_t>(static_cast<double>(rs_meta.data_disk_size()) /
(speed_mbps * 1024 * 1024) * safety_factor * 1000),
config::warm_up_rowset_sync_wait_min_timeout_ms),
config::warm_up_rowset_sync_wait_max_timeout_ms);
LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta);
manager.warm_up_rowset(rs_meta, timeout_ms);
return st;
}

Expand Down
29 changes: 27 additions & 2 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
"file_cache_warm_up_rowset_last_call_unix_ts", 0);
bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num");

bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency(
"file_cache_warm_up_rowset_wait_for_compaction_latency");

CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
}
Expand Down Expand Up @@ -489,8 +492,9 @@ std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t tablet_id
return replicas;
}

void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
auto replicas = get_replica_info(rs_meta.tablet_id());
void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms) {
auto tablet_id = rs_meta.tablet_id();
auto replicas = get_replica_info(tablet_id);
if (replicas.empty()) {
LOG(INFO) << "There is no need to warmup tablet=" << rs_meta.tablet_id()
<< ", skipping rowset=" << rs_meta.rowset_id().to_string();
Expand All @@ -504,6 +508,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
PWarmUpRowsetRequest request;
request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
request.set_unix_ts_us(now_ts);
request.set_sync_wait_timeout_ms(sync_wait_timeout_ms);
for (auto& replica : replicas) {
// send sync request
std::string host = replica.host;
Expand Down Expand Up @@ -568,8 +573,28 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
}

brpc::Controller cntl;
if (sync_wait_timeout_ms > 0) {
cntl.set_timeout_ms(sync_wait_timeout_ms + 1000);
}
PWarmUpRowsetResponse response;
MonotonicStopWatch watch;
watch.start();
brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
LOG_WARNING("warm up rowset {} for tablet {} failed, rpc error: {}",
rs_meta.rowset_id().to_string(), tablet_id, cntl.ErrorText());
return;
}
if (sync_wait_timeout_ms > 0) {
auto cost_us = watch.elapsed_time_microseconds();
VLOG_DEBUG << "warm up rowset wait for compaction: " << cost_us << " us";
if (cost_us / 1000 > sync_wait_timeout_ms) {
LOG_WARNING(
"Warm up rowset {} for tabelt {} wait for compaction timeout, takes {} ms",
rs_meta.rowset_id().to_string(), tablet_id, cost_us / 1000);
}
g_file_cache_warm_up_rowset_wait_for_compaction_latency << cost_us;
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ class CloudWarmUpManager {

Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false);

void warm_up_rowset(RowsetMeta& rs_meta);
// If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
// and return immediately without waiting for the warm-up to complete.
// If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
// to finish or until the specified timeout (in milliseconds) is reached.
//
// @param rs_meta Metadata of the rowset to be warmed up.
// @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
// to complete. Non-positive value means no waiting.
void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1);

void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,11 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");

DEFINE_mDouble(standby_compaction_version_ratio, "0.8");

DEFINE_mBool(enable_compaction_delay_commit_for_warm_up, "false");

DEFINE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms, "10000");

DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000");

#include "common/compile_check_end.h"
} // namespace doris::config
11 changes: 11 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,16 @@ DECLARE_mBool(enable_standby_passive_compaction);

DECLARE_mDouble(standby_compaction_version_ratio);

// When event driven warm-up is enabled by the user, turning on this option can help
// avoid file cache misses in the read cluster caused by compaction.
// If enabled, compaction will wait for the warm-up to complete before committing.
//
// ATTN: Enabling this option may slow down compaction due to the added wait.
DECLARE_mBool(enable_compaction_delay_commit_for_warm_up);

DECLARE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms);

DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms);

#include "common/compile_check_end.h"
} // namespace doris::config
9 changes: 9 additions & 0 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"
Expand Down Expand Up @@ -130,6 +131,14 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea

LIMIT_REMOTE_SCAN_IO(bytes_read);

DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", {
auto sleep_time = dp->param("sleep", 3);
LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s", sleep_time)
.tag("bucket", _bucket)
.tag("key", _key);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});

int total_sleep_time = 0;
while (retry_count <= max_retries) {
*bytes_read = 0;
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ message PGetFileCacheMetaResponse {
message PWarmUpRowsetRequest {
repeated RowsetMetaPB rowset_metas = 1;
optional int64 unix_ts_us = 2;
optional int64 sync_wait_timeout_ms = 3;
}

message PWarmUpRowsetResponse {
Expand Down
Loading
Loading