Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,9 @@ namespace config {
CONF_mInt32(olap_table_sink_send_interval_ms, "10");

// Fragment thread pool
CONF_Int32(fragment_pool_thread_num, "64");
CONF_Int32(fragment_pool_queue_size, "1024");
CONF_Int32(fragment_pool_thread_num_min, "64");
CONF_Int32(fragment_pool_thread_num_max, "512");
CONF_Int32(fragment_pool_queue_size, "2048");

//for cast
// CONF_Bool(cast, "true");
Expand Down
114 changes: 52 additions & 62 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,32 @@
#include <gperftools/profiler.h>
#include <boost/bind.hpp>

#include <thrift/protocol/TDebugProtocol.h>
#include "agent/cgroups_mgr.h"
#include "common/object_pool.h"
#include "common/resource_tls.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/QueryPlanExtra_types.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/client_cache.h"
#include "runtime/datetime_value.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "service/backend_options.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/exec_env.h"
#include "runtime/datetime_value.h"
#include "util/stopwatch.hpp"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "gen_cpp/HeartbeatService.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/QueryPlanExtra_types.h"
#include <thrift/protocol/TDebugProtocol.h>
#include "util/threadpool.h"
#include "gutil/strings/substitute.h"

namespace doris {

Expand Down Expand Up @@ -87,6 +89,8 @@ class FragmentExecState {

Status execute();

Status cancel_before_execute();

Status cancel(const PPlanFragmentCancelReason& reason);

TUniqueId fragment_instance_id() const {
Expand Down Expand Up @@ -219,6 +223,13 @@ Status FragmentExecState::execute() {
return Status::OK();
}

Status FragmentExecState::cancel_before_execute() {
// set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close().
_executor.set_abort();
_executor.cancel();
return Status::OK();
}

Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
std::lock_guard<std::mutex> l(_status_lock);
RETURN_IF_ERROR(_exec_status);
Expand Down Expand Up @@ -364,26 +375,31 @@ void FragmentExecState::coordinator_callback(
}
}

FragmentMgr::FragmentMgr(ExecEnv* exec_env) :
_exec_env(exec_env),
_fragment_map(),
_stop(false),
_cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)),
// TODO(zc): we need a better thread-pool
// now one user can use all the thread pool, others have no resource.
_thread_pool(config::fragment_pool_thread_num, config::fragment_pool_queue_size) {
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env),
_fragment_map(),
_stop(false),
_cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)) {
REGISTER_GAUGE_DORIS_METRIC(plan_fragment_count, [this]() {
std::lock_guard<std::mutex> lock(_lock);
return _fragment_map.size();
});
// TODO(zc): we need a better thread-pool
// now one user can use all the thread pool, others have no resource.
ThreadPoolBuilder("FragmentMgrThreadPool")
.set_min_threads(config::fragment_pool_thread_num_min)
.set_max_threads(config::fragment_pool_thread_num_max)
.set_max_queue_size(config::fragment_pool_queue_size)
.build(&_thread_pool);
}

FragmentMgr::~FragmentMgr() {
// stop thread
_stop = true;
_cancel_thread.join();
// Stop all the worker
_thread_pool.drain_and_shutdown();
// Stop all the worker, should wait for a while?
// _thread_pool->wait_for();
_thread_pool->shutdown();

// Only me can delete
{
Expand Down Expand Up @@ -421,13 +437,6 @@ Status FragmentMgr::exec_plan_fragment(
return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
}

static void* fragment_executor(void* param) {
PriorityThreadPool::WorkFunction* func = (PriorityThreadPool::WorkFunction*)param;
(*func)();
delete func;
return nullptr;
}

Status FragmentMgr::exec_plan_fragment(
const TExecPlanFragmentParams& params,
FinishCallback cb) {
Expand All @@ -448,7 +457,7 @@ Status FragmentMgr::exec_plan_fragment(
_exec_env,
params.coord));
RETURN_IF_ERROR(exec_state->prepare(params));
bool use_pool = true;

{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
Expand All @@ -458,38 +467,19 @@ Status FragmentMgr::exec_plan_fragment(
}
// register exec_state before starting exec thread
_fragment_map.insert(std::make_pair(fragment_instance_id, exec_state));

// Now, we the fragement is
if (_fragment_map.size() >= config::fragment_pool_thread_num) {
use_pool = false;
}
}

if (use_pool) {
if (!_thread_pool.offer(
boost::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb))) {
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(fragment_instance_id);
}
return Status::InternalError("Put planfragment to failed.");
}
} else {
pthread_t id;
int ret = pthread_create(&id,
nullptr,
fragment_executor,
new PriorityThreadPool::WorkFunction(
std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb)));
if (ret != 0) {
std::string err_msg("Could not create thread.");
err_msg.append(strerror(ret));
err_msg.append(",");
err_msg.append(std::to_string(ret));
return Status::InternalError(err_msg);
auto st = _thread_pool->submit_func(
std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb));
if (!st.ok()) {
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(fragment_instance_id);
}
pthread_detach(id);
exec_state->cancel_before_execute();
return Status::InternalError(strings::Substitute(
"Put planfragment to thread pool failed. err = $0", st.get_error_msg()));
}

return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "gen_cpp/DorisExternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "util/priority_thread_pool.hpp"
#include "util/hash_util.hpp"
#include "http/rest_monitor_iface.h"

Expand All @@ -40,6 +39,7 @@ class FragmentExecState;
class TExecPlanFragmentParams;
class TUniqueId;
class PlanFragmentExecutor;
class ThreadPool;

std::string to_load_error_http_path(const std::string& file_name);

Expand Down Expand Up @@ -90,7 +90,7 @@ class FragmentMgr : public RestMonitorIface {
bool _stop;
std::thread _cancel_thread;
// every job is a pool
PriorityThreadPool _thread_pool;
std::unique_ptr<ThreadPool> _thread_pool;
};

}
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ void PlanFragmentExecutor::cancel() {
_runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id());
}

void PlanFragmentExecutor::set_abort() {
update_status(Status::Aborted("Execution aborted before start"));
}

const RowDescriptor& PlanFragmentExecutor::row_desc() {
return _plan->row_desc();
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class PlanFragmentExecutor {
// in open()/get_next().
void close();

// Abort this execution. Must be called if we skip running open().
// It will let DataSink node closed with error status, to avoid use resources which created in open() phase.
// DataSink node should distinguish Aborted status from other error status.
void set_abort();

// Initiate cancellation. Must not be called until after prepare() returned.
void cancel();

Expand Down
36 changes: 33 additions & 3 deletions be/test/runtime/fragment_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace doris {

static Status s_prepare_status;
static Status s_open_status;
static int s_abort_cnt;
// Mock used for this unittest
PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
const report_status_callback& report_status_cb)
Expand All @@ -47,6 +48,11 @@ Status PlanFragmentExecutor::open() {

void PlanFragmentExecutor::cancel() {}

void PlanFragmentExecutor::set_abort() {
LOG(INFO) << "Plan Aborted";
s_abort_cnt++;
}

void PlanFragmentExecutor::close() {}

class FragmentMgrTest : public testing::Test {
Expand All @@ -57,9 +63,9 @@ class FragmentMgrTest : public testing::Test {
virtual void SetUp() {
s_prepare_status = Status::OK();
s_open_status = Status::OK();
LOG(INFO) << "fragment_pool_thread_num=" << config::fragment_pool_thread_num
<< ", pool_size=" << config::fragment_pool_queue_size;
config::fragment_pool_thread_num = 32;

config::fragment_pool_thread_num_min = 32;
config::fragment_pool_thread_num_max = 32;
config::fragment_pool_queue_size = 1024;
}
virtual void TearDown() {}
Expand Down Expand Up @@ -117,6 +123,30 @@ TEST_F(FragmentMgrTest, PrepareFailed) {
ASSERT_FALSE(mgr.exec_plan_fragment(params).ok());
}

TEST_F(FragmentMgrTest, OfferPoolFailed) {
config::fragment_pool_thread_num_min = 1;
config::fragment_pool_thread_num_max = 1;
config::fragment_pool_queue_size = 0;
s_abort_cnt = 0;
FragmentMgr mgr(nullptr);

TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());

// the first plan open will cost 50ms, so the next 3 plans will be aborted.
for (int i = 1; i < 4; ++i) {
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100 + i);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_FALSE(mgr.exec_plan_fragment(params).ok());
}
ASSERT_EQ(3, s_abort_cnt);
}

} // namespace doris

int main(int argc, char** argv) {
Expand Down