Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions be/src/io/tools/file_cache_microbench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@
#include "io/file_factory.h"
#include "io/fs/s3_file_system.h"
#include "io/fs/s3_file_writer.h"
#include "olap/utils.h"
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wkeyword-macro"
Expand All @@ -80,6 +83,7 @@

using doris::io::FileCacheFactory;
using doris::io::BlockFileCache;
using namespace doris;

bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append");
bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at");
Expand Down Expand Up @@ -339,12 +343,13 @@ class MicrobenchFileReader {
std::shared_ptr<doris::S3RateLimiterHolder> _rate_limiter;
};

class ThreadPool {
class BenchThreadPool {
public:
ThreadPool(size_t num_threads) : stop(false) {
BenchThreadPool(size_t num_threads) : stop(false) {
try {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
SCOPED_INIT_THREAD_CONTEXT();
try {
while (true) {
std::function<void()> task;
Expand Down Expand Up @@ -415,7 +420,7 @@ class ThreadPool {
}
}

~ThreadPool() {
~BenchThreadPool() {
if (!stop) {
try {
stop_and_wait();
Expand Down Expand Up @@ -1051,7 +1056,7 @@ class JobManager {
std::atomic<int> completed_writes(0);
std::vector<std::future<void>> write_futures;
write_futures.reserve(keys.size());
ThreadPool write_pool(config.num_threads);
BenchThreadPool write_pool(config.num_threads);

// Start write tasks
doris::MonotonicStopWatch write_stopwatch;
Expand Down Expand Up @@ -1146,7 +1151,7 @@ class JobManager {
} else { // default NORMAL
// do nothing
}
ThreadPool read_pool(config.num_threads);
BenchThreadPool read_pool(config.num_threads);
std::atomic<int> completed_reads(0);
doris::MonotonicStopWatch read_stopwatch; // Add read task timer

Expand Down Expand Up @@ -1369,7 +1374,7 @@ class JobManager {
std::mutex _mutex;
std::atomic<int> _next_job_id;
std::map<std::string, std::shared_ptr<Job>> _jobs;
ThreadPool _job_executor_pool;
BenchThreadPool _job_executor_pool;
};

namespace microbenchService {
Expand Down Expand Up @@ -2236,7 +2241,78 @@ class HttpServer {
};

void init_exec_env() {
SCOPED_INIT_THREAD_CONTEXT();
std::vector<doris::StorePath> paths;
auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths);
if (!olap_res) {
LOG(ERROR) << "parse config storage path failed, path=" << doris::config::storage_root_path;
exit(-1);
}

std::vector<doris::StorePath> spill_paths;
if (doris::config::spill_storage_root_path.empty()) {
doris::config::spill_storage_root_path = doris::config::storage_root_path;
}
olap_res = doris::parse_conf_store_paths(doris::config::spill_storage_root_path, &spill_paths);
if (!olap_res) {
LOG(ERROR) << "parse config spill storage path failed, path="
<< doris::config::spill_storage_root_path;
exit(-1);
}
std::set<std::string> broken_paths;
doris::parse_conf_broken_store_paths(doris::config::broken_storage_path, &broken_paths);

auto it = paths.begin();
for (; it != paths.end();) {
if (broken_paths.count(it->path) > 0) {
if (doris::config::ignore_broken_disk) {
LOG(WARNING) << "ignore broken disk, path = " << it->path;
it = paths.erase(it);
} else {
LOG(ERROR) << "a broken disk is found " << it->path;
exit(-1);
}
} else if (!doris::check_datapath_rw(it->path)) {
if (doris::config::ignore_broken_disk) {
LOG(WARNING) << "read write test file failed, path=" << it->path;
it = paths.erase(it);
} else {
LOG(ERROR) << "read write test file failed, path=" << it->path;
// if only one disk and the disk is full, also need exit because rocksdb will open failed
exit(-1);
}
} else {
++it;
}
}

if (paths.empty()) {
LOG(ERROR) << "All disks are broken, exit.";
exit(-1);
}

it = spill_paths.begin();
for (; it != spill_paths.end();) {
if (!doris::check_datapath_rw(it->path)) {
if (doris::config::ignore_broken_disk) {
LOG(WARNING) << "read write test file failed, path=" << it->path;
it = spill_paths.erase(it);
} else {
LOG(ERROR) << "read write test file failed, path=" << it->path;
exit(-1);
}
} else {
++it;
}
}
if (spill_paths.empty()) {
LOG(ERROR) << "All spill disks are broken, exit.";
exit(-1);
}

auto* exec_env = doris::ExecEnv::GetInstance();
auto status = exec_env->init_mem_env();

std::unique_ptr<doris::ThreadPool> s3_upload_pool;
static_cast<void>(doris::ThreadPoolBuilder("MicrobenchS3FileUploadThreadPool")
.set_min_threads(256)
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class ExecEnv {
void clear_stream_mgr();

DeleteBitmapAggCache* delete_bitmap_agg_cache() { return _delete_bitmap_agg_cache; }
Status init_mem_env();

private:
ExecEnv();
Expand All @@ -396,7 +397,6 @@ class ExecEnv {
const std::set<std::string>& broken_paths);
void _destroy();

Status _init_mem_env();
Status _check_deploy_mode();

Status _create_internal_workload_group();
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
.set_min_threads(config::min_s3_file_system_thread_num)
.set_max_threads(config::max_s3_file_system_thread_num)
.build(&_s3_file_system_thread_pool));
RETURN_IF_ERROR(_init_mem_env());
RETURN_IF_ERROR(init_mem_env());

// NOTE: runtime query statistics mgr could be visited by query and daemon thread
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
Expand Down Expand Up @@ -473,7 +473,7 @@ void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths
}
}

Status ExecEnv::_init_mem_env() {
Status ExecEnv::init_mem_env() {
bool is_percent = false;
std::stringstream ss;
// 1. init mem tracker
Expand Down
Loading