diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 6bfe4d6f5b5da2..0576cbe4debcc3 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -17,6 +17,8 @@ #include "cloud/cloud_internal_service.h" +#include + #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet_mgr.h" #include "cloud/config.h" @@ -149,6 +151,10 @@ bvar::Adder g_file_cache_warm_up_rowset_request_to_handle_slow_count( "file_cache_warm_up_rowset_request_to_handle_slow_count"); bvar::Adder g_file_cache_warm_up_rowset_handle_to_finish_slow_count( "file_cache_warm_up_rowset_handle_to_finish_slow_count"); +bvar::Adder g_file_cache_warm_up_rowset_wait_for_compaction_num( + "file_cache_warm_up_rowset_wait_for_compaction_num"); +bvar::Adder g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num( + "file_cache_warm_up_rowset_wait_for_compaction_timeout_num"); void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* controller [[maybe_unused]], @@ -156,6 +162,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c PWarmUpRowsetResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard closure_guard(done); + std::shared_ptr wait = nullptr; + timespec due_time; + if (request->has_sync_wait_timeout_ms() && request->sync_wait_timeout_ms() > 0) { + g_file_cache_warm_up_rowset_wait_for_compaction_num << 1; + wait = std::make_shared(0); + VLOG_DEBUG << "sync_wait_timeout: " << request->sync_wait_timeout_ms() << " ms"; + due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms()); + } + for (auto& rs_meta_pb : request->rowset_metas()) { RowsetMeta rs_meta; rs_meta.init_from_pb(rs_meta_pb); @@ -197,9 +212,10 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c } for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) { - auto download_done = [=, tablet_id = rs_meta.tablet_id(), + auto download_done = [&, tablet_id = rs_meta.tablet_id(), rowset_id = rs_meta.rowset_id().to_string(), - segment_size = rs_meta.segment_file_size(segment_id)](Status st) { + segment_size = rs_meta.segment_file_size(segment_id), + wait](Status st) { if (st.ok()) { g_file_cache_event_driven_warm_up_finished_segment_num << 1; g_file_cache_event_driven_warm_up_finished_segment_size << segment_size; @@ -228,6 +244,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id << " rowset_id: " << rowset_id << ", error: " << st; } + if (wait) { + wait->signal(); + } }; io::DownloadFileMeta download_meta { @@ -248,6 +267,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c g_file_cache_event_driven_warm_up_submitted_segment_num << 1; g_file_cache_event_driven_warm_up_submitted_segment_size << rs_meta.segment_file_size(segment_id); + if (wait) { + wait->add_count(); + } _engine.file_cache_block_downloader().submit_download_task(download_meta); auto download_inverted_index = [&](std::string index_path, uint64_t idx_size) { @@ -286,6 +308,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id << " rowset_id: " << rowset_id << ", error: " << st; } + if (wait) { + wait->signal(); + } }; io::DownloadFileMeta download_meta { .path = io::Path(index_path), @@ -302,6 +327,10 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c }; g_file_cache_event_driven_warm_up_submitted_index_num << 1; g_file_cache_event_driven_warm_up_submitted_index_size << idx_size; + + if (wait) { + wait->add_count(); + } _engine.file_cache_block_downloader().submit_download_task(download_meta); }; @@ -342,6 +371,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c } } } + if (wait && wait->timed_wait(due_time)) { + g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num << 1; + LOG_WARNING("the time spent warming up {} rowsets exceeded {} ms", + request->rowset_metas().size(), request->sync_wait_timeout_ms()); + } } bvar::Adder g_file_cache_recycle_cache_finished_segment_num( diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index e0440991727926..da626493d8a4ec 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1042,8 +1042,25 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i RETURN_IF_ERROR( engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id())); } + + int64_t timeout_ms = -1; + // if the `job_id` is not empty, it means this rowset was produced by a compaction job. + if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) { + // 1. assume the download speed is 100MB/s + // 2. we double the download time as timeout for safety + // 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out + const double speed_mbps = 100.0; // 100MB/s + const double safety_factor = 2.0; + timeout_ms = std::min( + std::max(static_cast(static_cast(rs_meta.data_disk_size()) / + (speed_mbps * 1024 * 1024) * safety_factor * 1000), + config::warm_up_rowset_sync_wait_min_timeout_ms), + config::warm_up_rowset_sync_wait_max_timeout_ms); + LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id + << ", with timeout: " << timeout_ms << " ms"; + } auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); - manager.warm_up_rowset(rs_meta); + manager.warm_up_rowset(rs_meta, timeout_ms); return st; } diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index bd661f46cb7837..833cb18d95caf1 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -76,6 +76,9 @@ bvar::Status g_file_cache_warm_up_rowset_last_call_unix_ts( "file_cache_warm_up_rowset_last_call_unix_ts", 0); bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); +bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency( + "file_cache_warm_up_rowset_wait_for_compaction_latency"); + CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) { _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this); } @@ -489,8 +492,9 @@ std::vector CloudWarmUpManager::get_replica_info(int64_t tablet_id return replicas; } -void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) { - auto replicas = get_replica_info(rs_meta.tablet_id()); +void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms) { + auto tablet_id = rs_meta.tablet_id(); + auto replicas = get_replica_info(tablet_id); if (replicas.empty()) { LOG(INFO) << "There is no need to warmup tablet=" << rs_meta.tablet_id() << ", skipping rowset=" << rs_meta.rowset_id().to_string(); @@ -504,6 +508,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) { PWarmUpRowsetRequest request; request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb()); request.set_unix_ts_us(now_ts); + request.set_sync_wait_timeout_ms(sync_wait_timeout_ms); for (auto& replica : replicas) { // send sync request std::string host = replica.host; @@ -568,8 +573,28 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) { } brpc::Controller cntl; + if (sync_wait_timeout_ms > 0) { + cntl.set_timeout_ms(sync_wait_timeout_ms + 1000); + } PWarmUpRowsetResponse response; + MonotonicStopWatch watch; + watch.start(); brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr); + if (cntl.Failed()) { + LOG_WARNING("warm up rowset {} for tablet {} failed, rpc error: {}", + rs_meta.rowset_id().to_string(), tablet_id, cntl.ErrorText()); + return; + } + if (sync_wait_timeout_ms > 0) { + auto cost_us = watch.elapsed_time_microseconds(); + VLOG_DEBUG << "warm up rowset wait for compaction: " << cost_us << " us"; + if (cost_us / 1000 > sync_wait_timeout_ms) { + LOG_WARNING( + "Warm up rowset {} for tabelt {} wait for compaction timeout, takes {} ms", + rs_meta.rowset_id().to_string(), tablet_id, cost_us / 1000); + } + g_file_cache_warm_up_rowset_wait_for_compaction_latency << cost_us; + } } } diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index 6feef0e9d421ea..8c64499dce7015 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -72,7 +72,15 @@ class CloudWarmUpManager { Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false); - void warm_up_rowset(RowsetMeta& rs_meta); + // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC + // and return immediately without waiting for the warm-up to complete. + // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up + // to finish or until the specified timeout (in milliseconds) is reached. + // + // @param rs_meta Metadata of the rowset to be warmed up. + // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up + // to complete. Non-positive value means no waiting. + void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1); void recycle_cache(int64_t tablet_id, const std::vector& rowsets); diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 9edaeb8c652409..3db8a758b2780e 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -98,5 +98,11 @@ DEFINE_mBool(enable_standby_passive_compaction, "true"); DEFINE_mDouble(standby_compaction_version_ratio, "0.8"); +DEFINE_mBool(enable_compaction_delay_commit_for_warm_up, "false"); + +DEFINE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms, "10000"); + +DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000"); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 4f7fee510bd1aa..18a6097c56b354 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -134,5 +134,16 @@ DECLARE_mBool(enable_standby_passive_compaction); DECLARE_mDouble(standby_compaction_version_ratio); +// When event driven warm-up is enabled by the user, turning on this option can help +// avoid file cache misses in the read cluster caused by compaction. +// If enabled, compaction will wait for the warm-up to complete before committing. +// +// ATTN: Enabling this option may slow down compaction due to the added wait. +DECLARE_mBool(enable_compaction_delay_commit_for_warm_up); + +DECLARE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms); + +DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 80c582353001f0..eede868468f663 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -38,6 +38,7 @@ #include "runtime/thread_context.h" #include "runtime/workload_management/io_throttle.h" #include "util/bvar_helper.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "util/s3_util.h" @@ -130,6 +131,14 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea LIMIT_REMOTE_SCAN_IO(bytes_read); + DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", { + auto sleep_time = dp->param("sleep", 3); + LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s", sleep_time) + .tag("bucket", _bucket) + .tag("key", _key); + std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); + }); + int total_sleep_time = 0; while (retry_count <= max_retries) { *bytes_read = 0; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 108f7d7613812d..2f23eeae453a2a 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -840,6 +840,7 @@ message PGetFileCacheMetaResponse { message PWarmUpRowsetRequest { repeated RowsetMetaPB rowset_metas = 1; optional int64 unix_ts_us = 2; + optional int64 sync_wait_timeout_ms = 3; } message PWarmUpRowsetResponse { diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy new file mode 100644 index 00000000000000..4879f83aab82fc --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import groovy.json.JsonSlurper + +suite('test_warm_up_cluster_event_compaction_sync_wait', 'docker') { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'file_cache_background_monitor_interval_ms=1000', + 'warm_up_rowset_slow_log_ms=1', + 'enable_compaction_delay_commit_for_warm_up=true', + 'warm_up_rowset_sync_wait_min_timeout_ms=20000', + 'warm_up_rowset_sync_wait_max_timeout_ms=30000', + ] + options.enableDebugPoints() + options.cloudMode = true + + def clearFileCache = {ip, port -> + def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true" + def response = new URL(url).text + def json = new JsonSlurper().parseText(response) + + // Check the status + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}") + } + } + + def clearFileCacheOnAllBackends = { + def backends = sql """SHOW BACKENDS""" + + for (be in backends) { + def ip = be[1] + def port = be[4] + clearFileCache(ip, port) + } + + // clear file cache is async, wait it done + sleep(5000) + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def getBeIpAndPort = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + + if (cluster_bes.isEmpty()) { + throw new RuntimeException("No BE found for cluster: ${cluster}") + } + + def firstBe = cluster_bes[0] + return [ip: firstBe[1], http_port:firstBe[4], rpc_port: firstBe[5]] + } + + def logFileCacheDownloadMetrics = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def submitted = getBrpcMetrics(ip, port, "file_cache_download_submitted_num") + def finished = getBrpcMetrics(ip, port, "file_cache_download_finished_num") + def failed = getBrpcMetrics(ip, port, "file_cache_download_failed_num") + logger.info("${cluster} be ${ip}:${port}, downloader submitted=${submitted}" + + ", finished=${finished}, failed=${failed}") + } + } + + def logWarmUpRowsetMetrics = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def submitted_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_segment_num") + def finished_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_segment_num") + def failed_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_segment_num") + def submitted_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_index_num") + def finished_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_index_num") + def failed_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_index_num") + def compaction_sync_wait = getBrpcMetrics(ip, port, "file_cache_warm_up_rowset_wait_for_compaction_num") + logger.info("${cluster} be ${ip}:${port}, submitted_segment=${submitted_segment}" + + ", finished_segment=${finished_segment}, failed_segment=${failed_segment}" + + ", submitted_index=${submitted_index}" + + ", finished_index=${finished_index}" + + ", failed_index=${failed_index}" + + ", compaction_sync_wait=${compaction_sync_wait}") + } + } + + def getTTLCacheSize = { ip, port -> + return getBrpcMetrics(ip, port, "ttl_cache_size") + } + + def checkTTLCacheSizeSumEqual = { cluster1, cluster2 -> + def backends = sql """SHOW BACKENDS""" + + def srcBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") } + def tgtBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") } + + long srcSum = 0 + for (src in srcBes) { + def ip = src[1] + def port = src[5] + srcSum += getTTLCacheSize(ip, port) + } + + long tgtSum = 0 + for (tgt in tgtBes) { + def ip = tgt[1] + def port = tgt[5] + tgtSum += getTTLCacheSize(ip, port) + } + + logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}") + assertTrue(srcSum > 0, "ttl_cache_size should > 0") + assertEquals(srcSum, tgtSum) + } + + def waitForBrpcMetricValue = { ip, port, metricName, targetValue, timeoutMs -> + def delta_time = 100 + def useTime = 0 + + for(int t = delta_time; t <= timeoutMs; t += delta_time){ + try { + def currentValue = getBrpcMetrics(ip, port, metricName) + + if (currentValue == targetValue) { + logger.info("BE ${ip}:${port} metric ${metricName} reached target value: ${targetValue}") + return true + } + + logger.info("BE ${ip}:${port} metric ${metricName} current value: ${currentValue}, target: ${targetValue}") + + } catch (Exception e) { + logger.warn("Failed to get metric ${metricName} from BE ${ip}:${port}: ${e.message}") + } + + useTime = t + sleep(delta_time) + } + + assertTrue(useTime <= timeoutMs, "waitForBrpcMetricValue timeout") + } + + def getTabletStatus = { ip, port, tablet_id -> + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${ip}:${port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def checkFileCacheRecycle = { cluster, rowsets -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + assert cluster_bes.size() > 0, "No backend found for cluster ${cluster}" + def be = cluster_bes[0] + def ip = be[1] + def port = be[4] + + for (int i = 0; i < rowsets.size(); i++) { + def rowsetStr = rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + def rowset_id = rowsetStr.split(" ")[4] + + logger.info("rowset ${i}, start: ${start_version}, end: ${end_version}, id: ${rowset_id}") + def data = Http.GET("http://${ip}:${port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat", true) + logger.info("file cache data: ${data}") + // in this case only [2-11] and [12-12] should have data in cache + if ((start_version == 2 && end_version == 11) || (start_version == 12)) { + assertTrue(data.size() > 0) + } else { + assertTrue(data.size() == 0) + } + } + } + + docker(options) { + def clusterName1 = "warmup_source" + def clusterName2 = "warmup_target" + + // Add two clusters + cluster.addBackend(1, clusterName1) + cluster.addBackend(1, clusterName2) + + def tag1 = getCloudBeTagByName(clusterName1) + def tag2 = getCloudBeTagByName(clusterName2) + + logger.info("Cluster tag1: {}", tag1) + logger.info("Cluster tag2: {}", tag2) + + def jsonSlurper = new JsonSlurper() + + def getJobState = { jobId -> + def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + return jobStateResult[0][3] + } + + // Ensure we are in source cluster + sql """use @${clusterName1}""" + + // Start warm up job + def jobId_ = sql """ + WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1} + PROPERTIES ( + "sync_mode" = "event_driven", + "sync_event" = "load" + ) + """ + + def jobId = jobId_[0][0] + logger.info("Warm-up job ID: ${jobId}") + + sql """ + create table test ( + col0 int not null, + col1 variant NOT NULL + ) UNIQUE KEY(`col0`) + DISTRIBUTED BY HASH(col0) BUCKETS 1 + PROPERTIES ("file_cache_ttl_seconds" = "3600", "disable_auto_compaction" = "true"); + """ + + clearFileCacheOnAllBackends() + sleep(15000) + + sql """use @${clusterName1}""" + // load data + sql """insert into test values (1, '{"a" : 1.0}')""" + sql """insert into test values (2, '{"a" : 111.1111}')""" + sql """insert into test values (3, '{"a" : "11111"}')""" + sql """insert into test values (4, '{"a" : 1111111111}')""" + sql """insert into test values (5, '{"a" : 1111.11111}')""" + sql """insert into test values (6, '{"a" : "11111"}')""" + sql """insert into test values (7, '{"a" : 11111.11111}')""" + sql """insert into test values (7, '{"a" : 11111.11111}')""" + sleep(15000) + + def tablets = sql_return_maparray """ show tablets from test; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + + def be = getBeIpAndPort(clusterName2) + + logFileCacheDownloadMetrics(clusterName2) + logWarmUpRowsetMetrics(clusterName2) + def num_submitted = getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_submitted_segment_num") + def num_finished = getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_finished_segment_num") + assertTrue(num_submitted >= 8) + assertEquals(num_finished, num_submitted) + + // inject slow io, which should cause the warmup takes longger than 10s + GetDebugPoint().enableDebugPoint(be.ip, be.http_port as int, NodeType.BE, "S3FileReader::read_at_impl.io_slow", [sleep:10]) + + // trigger and wait compaction async + def future = thread { + sql """use @${clusterName1}""" + trigger_and_wait_compaction("test", "cumulative") + } + // wait until the warmup for compaction started + waitForBrpcMetricValue(be.ip, be.rpc_port, "file_cache_warm_up_rowset_wait_for_compaction_num", 1, /*timeout*/10000) + sleep(1000) + logFileCacheDownloadMetrics(clusterName2) + logWarmUpRowsetMetrics(clusterName2) + assertEquals(num_submitted + 1, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_submitted_segment_num")) + assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_finished_segment_num")) + + // in this moment, compaction has completed, but not commited, it's waiting for warm up + // trigger a query on read cluster, can't read the compaction data + sql """use @${clusterName2}""" + sql "select * from test" + def tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id) + def rowsets = tablet_status ["rowsets"] + for (int i = 0; i < rowsets.size(); i++) { + def rowsetStr = rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + if (start_version != 0) { + assertEquals(start_version, end_version) + } + } + + // wait the compaction complete + future.get() + + assertEquals(num_finished + 1, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_finished_segment_num")) + assertEquals(0, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_warm_up_rowset_wait_for_compaction_timeout_num")) + + // a new insert will trigger the sync rowset operation in the following query + sql """insert into test values (9, '{"a" : 11111.11111}')""" + + // now the compaction rowsets it accessible + sql """use @${clusterName2}""" + sql "select * from test" + tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id) + rowsets = tablet_status ["rowsets"] + def found_compaction_rowsets = false + for (int i = 0; i < rowsets.size(); i++) { + def rowsetStr = rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + if (start_version != 0) { + if (start_version != end_version) { + found_compaction_rowsets = true; + } + } + } + assertTrue(found_compaction_rowsets) + + logFileCacheDownloadMetrics(clusterName2) + logWarmUpRowsetMetrics(clusterName2) + // checkTTLCacheSizeSumEqual(clusterName1, clusterName2) + + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals(jobInfo[0][0], jobId) + assertEquals(jobInfo[0][1], clusterName1) + assertEquals(jobInfo[0][2], clusterName2) + assertEquals(jobInfo[0][4], "CLUSTER") + assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"], + "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING") + assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)") + + // Cancel job and confirm + sql """CANCEL WARM UP JOB WHERE ID = ${jobId}""" + def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals(cancelInfo[0][3], "CANCELLED") + } +} diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy new file mode 100644 index 00000000000000..64b72dfcec5461 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import groovy.json.JsonSlurper + +suite('test_warm_up_cluster_event_compaction_sync_wait_timeout', 'docker') { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'file_cache_background_monitor_interval_ms=1000', + 'warm_up_rowset_slow_log_ms=1', + 'enable_compaction_delay_commit_for_warm_up=true', + 'warm_up_rowset_sync_wait_min_timeout_ms=10000', + 'warm_up_rowset_sync_wait_max_timeout_ms=10000', + ] + options.enableDebugPoints() + options.cloudMode = true + + def clearFileCache = {ip, port -> + def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true" + def response = new URL(url).text + def json = new JsonSlurper().parseText(response) + + // Check the status + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}") + } + } + + def clearFileCacheOnAllBackends = { + def backends = sql """SHOW BACKENDS""" + + for (be in backends) { + def ip = be[1] + def port = be[4] + clearFileCache(ip, port) + } + + // clear file cache is async, wait it done + sleep(5000) + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def getBeIpAndPort = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + + if (cluster_bes.isEmpty()) { + throw new RuntimeException("No BE found for cluster: ${cluster}") + } + + def firstBe = cluster_bes[0] + return [ip: firstBe[1], http_port:firstBe[4], rpc_port: firstBe[5]] + } + + def logFileCacheDownloadMetrics = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def submitted = getBrpcMetrics(ip, port, "file_cache_download_submitted_num") + def finished = getBrpcMetrics(ip, port, "file_cache_download_finished_num") + def failed = getBrpcMetrics(ip, port, "file_cache_download_failed_num") + logger.info("${cluster} be ${ip}:${port}, downloader submitted=${submitted}" + + ", finished=${finished}, failed=${failed}") + } + } + + def logWarmUpRowsetMetrics = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def submitted_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_segment_num") + def finished_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_segment_num") + def failed_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_segment_num") + def submitted_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_index_num") + def finished_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_index_num") + def failed_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_index_num") + def compaction_sync_wait = getBrpcMetrics(ip, port, "file_cache_warm_up_rowset_wait_for_compaction_num") + logger.info("${cluster} be ${ip}:${port}, submitted_segment=${submitted_segment}" + + ", finished_segment=${finished_segment}, failed_segment=${failed_segment}" + + ", submitted_index=${submitted_index}" + + ", finished_index=${finished_index}" + + ", failed_index=${failed_index}" + + ", compaction_sync_wait=${compaction_sync_wait}") + } + } + + def getTTLCacheSize = { ip, port -> + return getBrpcMetrics(ip, port, "ttl_cache_size") + } + + def checkTTLCacheSizeSumEqual = { cluster1, cluster2 -> + def backends = sql """SHOW BACKENDS""" + + def srcBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") } + def tgtBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") } + + long srcSum = 0 + for (src in srcBes) { + def ip = src[1] + def port = src[5] + srcSum += getTTLCacheSize(ip, port) + } + + long tgtSum = 0 + for (tgt in tgtBes) { + def ip = tgt[1] + def port = tgt[5] + tgtSum += getTTLCacheSize(ip, port) + } + + logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}") + assertTrue(srcSum > 0, "ttl_cache_size should > 0") + assertEquals(srcSum, tgtSum) + } + + def waitForBrpcMetricValue = { ip, port, metricName, targetValue, timeoutMs -> + def delta_time = 100 + def useTime = 0 + + for(int t = delta_time; t <= timeoutMs; t += delta_time){ + try { + def currentValue = getBrpcMetrics(ip, port, metricName) + + if (currentValue == targetValue) { + logger.info("BE ${ip}:${port} metric ${metricName} reached target value: ${targetValue}") + return true + } + + logger.info("BE ${ip}:${port} metric ${metricName} current value: ${currentValue}, target: ${targetValue}") + + } catch (Exception e) { + logger.warn("Failed to get metric ${metricName} from BE ${ip}:${port}: ${e.message}") + } + + useTime = t + sleep(delta_time) + } + + assertTrue(useTime <= timeoutMs, "waitForBrpcMetricValue timeout") + } + + def getTabletStatus = { ip, port, tablet_id -> + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${ip}:${port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def checkFileCacheRecycle = { cluster, rowsets -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + assert cluster_bes.size() > 0, "No backend found for cluster ${cluster}" + def be = cluster_bes[0] + def ip = be[1] + def port = be[4] + + for (int i = 0; i < rowsets.size(); i++) { + def rowsetStr = rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + def rowset_id = rowsetStr.split(" ")[4] + + logger.info("rowset ${i}, start: ${start_version}, end: ${end_version}, id: ${rowset_id}") + def data = Http.GET("http://${ip}:${port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat", true) + logger.info("file cache data: ${data}") + // in this case only [2-11] and [12-12] should have data in cache + if ((start_version == 2 && end_version == 11) || (start_version == 12)) { + assertTrue(data.size() > 0) + } else { + assertTrue(data.size() == 0) + } + } + } + + docker(options) { + def clusterName1 = "warmup_source" + def clusterName2 = "warmup_target" + + // Add two clusters + cluster.addBackend(1, clusterName1) + cluster.addBackend(1, clusterName2) + + def tag1 = getCloudBeTagByName(clusterName1) + def tag2 = getCloudBeTagByName(clusterName2) + + logger.info("Cluster tag1: {}", tag1) + logger.info("Cluster tag2: {}", tag2) + + def jsonSlurper = new JsonSlurper() + + def getJobState = { jobId -> + def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + return jobStateResult[0][3] + } + + // Ensure we are in source cluster + sql """use @${clusterName1}""" + + // Start warm up job + def jobId_ = sql """ + WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1} + PROPERTIES ( + "sync_mode" = "event_driven", + "sync_event" = "load" + ) + """ + + def jobId = jobId_[0][0] + logger.info("Warm-up job ID: ${jobId}") + + sql """ + create table test ( + col0 int not null, + col1 variant NOT NULL + ) UNIQUE KEY(`col0`) + DISTRIBUTED BY HASH(col0) BUCKETS 1 + PROPERTIES ("file_cache_ttl_seconds" = "3600", "disable_auto_compaction" = "true"); + """ + + clearFileCacheOnAllBackends() + sleep(15000) + + sql """use @${clusterName1}""" + // load data + sql """insert into test values (1, '{"a" : 1.0}')""" + sql """insert into test values (2, '{"a" : 111.1111}')""" + sql """insert into test values (3, '{"a" : "11111"}')""" + sql """insert into test values (4, '{"a" : 1111111111}')""" + sql """insert into test values (5, '{"a" : 1111.11111}')""" + sql """insert into test values (6, '{"a" : "11111"}')""" + sql """insert into test values (7, '{"a" : 11111.11111}')""" + sql """insert into test values (7, '{"a" : 11111.11111}')""" + sleep(15000) + + def tablets = sql_return_maparray """ show tablets from test; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + + def be = getBeIpAndPort(clusterName2) + + logFileCacheDownloadMetrics(clusterName2) + logWarmUpRowsetMetrics(clusterName2) + def num_submitted = getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_submitted_segment_num") + def num_finished = getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_finished_segment_num") + assertTrue(num_submitted >= 8) + assertEquals(num_finished, num_submitted) + + // inject slow io, which should cause the warmup takes longger than 10s + GetDebugPoint().enableDebugPoint(be.ip, be.http_port as int, NodeType.BE, "S3FileReader::read_at_impl.io_slow", [sleep:20]) + + // trigger and wait compaction async + def future = thread { + sql """use @${clusterName1}""" + trigger_and_wait_compaction("test", "cumulative") + } + // wait until the warmup for compaction started + waitForBrpcMetricValue(be.ip, be.rpc_port, "file_cache_warm_up_rowset_wait_for_compaction_num", 1, /*timeout*/10000) + sleep(1000) + logFileCacheDownloadMetrics(clusterName2) + logWarmUpRowsetMetrics(clusterName2) + assertEquals(num_submitted + 1, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_submitted_segment_num")) + assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_finished_segment_num")) + + // in this moment, compaction has completed, but not commited, it's waiting for warm up + // trigger a query on read cluster, can't read the compaction data + sql """use @${clusterName2}""" + sql "select * from test" + def tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id) + def rowsets = tablet_status ["rowsets"] + for (int i = 0; i < rowsets.size(); i++) { + def rowsetStr = rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + if (start_version != 0) { + assertEquals(start_version, end_version) + } + } + + // wait the compaction complete + // we inject 20s sleep on s3 file read, so the compaction will be timeout + future.get() + + // still not finished, so `num_finished` not change + assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_event_driven_warm_up_finished_segment_num")) + assertEquals(1, getBrpcMetrics(be.ip, be.rpc_port, "file_cache_warm_up_rowset_wait_for_compaction_timeout_num")) + + // a new insert will trigger the sync rowset operation in the following query + sql """insert into test values (9, '{"a" : 11111.11111}')""" + + // now the compaction rowsets it accessible + sql """use @${clusterName2}""" + sql "select * from test" + tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id) + rowsets = tablet_status ["rowsets"] + def found_compaction_rowsets = false + for (int i = 0; i < rowsets.size(); i++) { + def rowsetStr = rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + if (start_version != 0) { + if (start_version != end_version) { + found_compaction_rowsets = true; + } + } + } + assertTrue(found_compaction_rowsets) + + logFileCacheDownloadMetrics(clusterName2) + logWarmUpRowsetMetrics(clusterName2) + // checkTTLCacheSizeSumEqual(clusterName1, clusterName2) + + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals(jobInfo[0][0], jobId) + assertEquals(jobInfo[0][1], clusterName1) + assertEquals(jobInfo[0][2], clusterName2) + assertEquals(jobInfo[0][4], "CLUSTER") + assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"], + "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING") + assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)") + + // Cancel job and confirm + sql """CANCEL WARM UP JOB WHERE ID = ${jobId}""" + def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals(cancelInfo[0][3], "CANCELLED") + } +}