diff --git a/be/src/io/tools/file_cache_microbench.cpp b/be/src/io/tools/file_cache_microbench.cpp index fa6b74bbf24608..55dd302a624cac 100644 --- a/be/src/io/tools/file_cache_microbench.cpp +++ b/be/src/io/tools/file_cache_microbench.cpp @@ -51,9 +51,12 @@ #include "io/file_factory.h" #include "io/fs/s3_file_system.h" #include "io/fs/s3_file_writer.h" +#include "olap/utils.h" #include "rapidjson/document.h" #include "rapidjson/stringbuffer.h" #include "rapidjson/writer.h" +#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/thread_context.h" #ifdef __clang__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wkeyword-macro" @@ -80,6 +83,7 @@ using doris::io::FileCacheFactory; using doris::io::BlockFileCache; +using namespace doris; bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append"); bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at"); @@ -339,12 +343,13 @@ class MicrobenchFileReader { std::shared_ptr _rate_limiter; }; -class ThreadPool { +class BenchThreadPool { public: - ThreadPool(size_t num_threads) : stop(false) { + BenchThreadPool(size_t num_threads) : stop(false) { try { for (size_t i = 0; i < num_threads; ++i) { workers.emplace_back([this] { + SCOPED_INIT_THREAD_CONTEXT(); try { while (true) { std::function task; @@ -415,7 +420,7 @@ class ThreadPool { } } - ~ThreadPool() { + ~BenchThreadPool() { if (!stop) { try { stop_and_wait(); @@ -1051,7 +1056,7 @@ class JobManager { std::atomic completed_writes(0); std::vector> write_futures; write_futures.reserve(keys.size()); - ThreadPool write_pool(config.num_threads); + BenchThreadPool write_pool(config.num_threads); // Start write tasks doris::MonotonicStopWatch write_stopwatch; @@ -1146,7 +1151,7 @@ class JobManager { } else { // default NORMAL // do nothing } - ThreadPool read_pool(config.num_threads); + BenchThreadPool read_pool(config.num_threads); std::atomic completed_reads(0); doris::MonotonicStopWatch read_stopwatch; // Add read task timer @@ -1369,7 +1374,7 @@ class JobManager { std::mutex _mutex; std::atomic _next_job_id; std::map> _jobs; - ThreadPool _job_executor_pool; + BenchThreadPool _job_executor_pool; }; namespace microbenchService { @@ -2236,7 +2241,78 @@ class HttpServer { }; void init_exec_env() { + SCOPED_INIT_THREAD_CONTEXT(); + std::vector paths; + auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths); + if (!olap_res) { + LOG(ERROR) << "parse config storage path failed, path=" << doris::config::storage_root_path; + exit(-1); + } + + std::vector spill_paths; + if (doris::config::spill_storage_root_path.empty()) { + doris::config::spill_storage_root_path = doris::config::storage_root_path; + } + olap_res = doris::parse_conf_store_paths(doris::config::spill_storage_root_path, &spill_paths); + if (!olap_res) { + LOG(ERROR) << "parse config spill storage path failed, path=" + << doris::config::spill_storage_root_path; + exit(-1); + } + std::set broken_paths; + doris::parse_conf_broken_store_paths(doris::config::broken_storage_path, &broken_paths); + + auto it = paths.begin(); + for (; it != paths.end();) { + if (broken_paths.count(it->path) > 0) { + if (doris::config::ignore_broken_disk) { + LOG(WARNING) << "ignore broken disk, path = " << it->path; + it = paths.erase(it); + } else { + LOG(ERROR) << "a broken disk is found " << it->path; + exit(-1); + } + } else if (!doris::check_datapath_rw(it->path)) { + if (doris::config::ignore_broken_disk) { + LOG(WARNING) << "read write test file failed, path=" << it->path; + it = paths.erase(it); + } else { + LOG(ERROR) << "read write test file failed, path=" << it->path; + // if only one disk and the disk is full, also need exit because rocksdb will open failed + exit(-1); + } + } else { + ++it; + } + } + + if (paths.empty()) { + LOG(ERROR) << "All disks are broken, exit."; + exit(-1); + } + + it = spill_paths.begin(); + for (; it != spill_paths.end();) { + if (!doris::check_datapath_rw(it->path)) { + if (doris::config::ignore_broken_disk) { + LOG(WARNING) << "read write test file failed, path=" << it->path; + it = spill_paths.erase(it); + } else { + LOG(ERROR) << "read write test file failed, path=" << it->path; + exit(-1); + } + } else { + ++it; + } + } + if (spill_paths.empty()) { + LOG(ERROR) << "All spill disks are broken, exit."; + exit(-1); + } + auto* exec_env = doris::ExecEnv::GetInstance(); + auto status = exec_env->init_mem_env(); + std::unique_ptr s3_upload_pool; static_cast(doris::ThreadPoolBuilder("MicrobenchS3FileUploadThreadPool") .set_min_threads(256) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 59e7af6df10114..66b32648902737 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -387,6 +387,7 @@ class ExecEnv { void clear_stream_mgr(); DeleteBitmapAggCache* delete_bitmap_agg_cache() { return _delete_bitmap_agg_cache; } + Status init_mem_env(); private: ExecEnv(); @@ -396,7 +397,6 @@ class ExecEnv { const std::set& broken_paths); void _destroy(); - Status _init_mem_env(); Status _check_deploy_mode(); Status _create_internal_workload_group(); diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a0cb2cfa223082..78bdfa34f7ec12 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -279,7 +279,7 @@ Status ExecEnv::_init(const std::vector& store_paths, .set_min_threads(config::min_s3_file_system_thread_num) .set_max_threads(config::max_s3_file_system_thread_num) .build(&_s3_file_system_thread_pool)); - RETURN_IF_ERROR(_init_mem_env()); + RETURN_IF_ERROR(init_mem_env()); // NOTE: runtime query statistics mgr could be visited by query and daemon thread // so it should be created before all query begin and deleted after all query and daemon thread stoppped @@ -473,7 +473,7 @@ void ExecEnv::init_file_cache_factory(std::vector& cache_paths } } -Status ExecEnv::_init_mem_env() { +Status ExecEnv::init_mem_env() { bool is_percent = false; std::stringstream ss; // 1. init mem tracker