Skip to content
Merged
2 changes: 1 addition & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
TQueryOptions query_options;
TQueryGlobals query_globals;
_runtime_state.reset(new RuntimeState(fragment_params, query_options, query_globals,
_runtime_state.reset(new RuntimeState(params, query_options, query_globals,
ExecEnv::GetInstance()));
DescriptorTbl* desc_tbl = NULL;
Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl);
Expand Down
193 changes: 138 additions & 55 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,22 @@ using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

class RuntimeProfile;

class FragmentExecState {
public:
// Constructor by using QueryFragmentsCtx
FragmentExecState(
const TUniqueId& query_id,
const TUniqueId& instance_id,
int backend_num,
ExecEnv* exec_env,
std::shared_ptr<QueryFragmentsCtx> fragments_ctx);

FragmentExecState(
const TUniqueId& query_id,
const TUniqueId& instance_id,
int backend_num,
ExecEnv* exec_env,
const TNetworkAddress& coord_hostport);
const TNetworkAddress& coord_addr);

~FragmentExecState();

Expand Down Expand Up @@ -135,6 +142,10 @@ class FragmentExecState {

int get_timeout_second() const { return _timeout_second; }

std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() {
return _fragments_ctx;
}

private:
void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);

Expand All @@ -160,8 +171,29 @@ class FragmentExecState {
int _timeout_second;

std::unique_ptr<std::thread> _exec_thread;

// This context is shared by all fragments of this host in a query
std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
};

FragmentExecState::FragmentExecState(
const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
int backend_num,
ExecEnv* exec_env,
std::shared_ptr<QueryFragmentsCtx> fragments_ctx) :
_query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_backend_num(backend_num),
_exec_env(exec_env),
_executor(exec_env, boost::bind<void>(
boost::mem_fn(&FragmentExecState::coordinator_callback), this, _1, _2, _3)),
_timeout_second(-1),
_fragments_ctx(fragments_ctx) {
_start_time = DateTimeValue::local_time();
_coord_addr = _fragments_ctx->coord_addr;
}

FragmentExecState::FragmentExecState(
const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
Expand All @@ -175,7 +207,6 @@ FragmentExecState::FragmentExecState(
_coord_addr(coord_addr),
_executor(exec_env, boost::bind<void>(
boost::mem_fn(&FragmentExecState::coordinator_callback), this, _1, _2, _3)),
_set_rsc_info(false),
_timeout_second(-1) {
_start_time = DateTimeValue::local_time();
}
Expand All @@ -188,35 +219,24 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
_timeout_second = params.query_options.query_timeout;
}

if (params.__isset.resource_info) {
set_group(params.resource_info);
if (_fragments_ctx == nullptr) {
if (params.__isset.resource_info) {
set_group(params.resource_info);
}
}

return _executor.prepare(params);
}

static void register_cgroups(const std::string& user, const std::string& group) {
TResourceInfo* new_info = new TResourceInfo();
new_info->user = user;
new_info->group = group;
int ret = ResourceTls::set_resource_tls(new_info);
if (ret != 0) {
delete new_info;
return;
if (_fragments_ctx == nullptr) {
return _executor.prepare(params);
} else {
return _executor.prepare(params, _fragments_ctx.get());
}
CgroupsMgr::apply_cgroup(new_info->user, new_info->group);
}

Status FragmentExecState::execute() {
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
if (_set_rsc_info) {
register_cgroups(_user, _group);
} else {
CgroupsMgr::apply_system_cgroup();
}

CgroupsMgr::apply_system_cgroup();
WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while opening fragment $0",
print_id(_fragment_instance_id)));
_executor.close();
Expand Down Expand Up @@ -381,6 +401,7 @@ void FragmentExecState::coordinator_callback(
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env),
_fragment_map(),
_fragments_ctx_map(),
_stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
Expand Down Expand Up @@ -418,31 +439,37 @@ FragmentMgr::~FragmentMgr() {
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
_fragments_ctx_map.clear();
}
}

static void empty_function(PlanFragmentExecutor* exec) {
}

void FragmentMgr::exec_actual(
void FragmentMgr::_exec_actual(
std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb) {

exec_state->execute();

std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
bool all_done = false;
if (fragments_ctx != nullptr) {
// decrease the number of unfinished fragments
all_done = fragments_ctx->countdown();
}

// remove exec state after this fragment finished
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(exec_state->fragment_instance_id());
if (iter != _fragment_map.end()) {
_fragment_map.erase(iter);
} else {
// Impossible
LOG(WARNING) << "missing entry in fragment exec state map: instance_id="
<< exec_state->fragment_instance_id();
_fragment_map.erase(exec_state->fragment_instance_id());
if (all_done) {
_fragments_ctx_map.erase(fragments_ctx->query_id);
}
}

// Callback after remove from this id
cb(exec_state->executor());
// NOTE: 'exec_state' is desconstructed here without lock
}

Status FragmentMgr::exec_plan_fragment(
Expand All @@ -454,7 +481,6 @@ Status FragmentMgr::exec_plan_fragment(
const TExecPlanFragmentParams& params,
FinishCallback cb) {
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
std::shared_ptr<FragmentExecState> exec_state;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
Expand All @@ -463,32 +489,82 @@ Status FragmentMgr::exec_plan_fragment(
return Status::OK();
}
}
exec_state.reset(new FragmentExecState(
params.params.query_id,
fragment_instance_id,
params.backend_num,
_exec_env,
params.coord));
RETURN_IF_ERROR(exec_state->prepare(params));

std::shared_ptr<FragmentExecState> exec_state;
if (!params.__isset.is_simplified_param) {
// This is an old version params, all @Common components is set in TExecPlanFragmentParams.
exec_state.reset(new FragmentExecState(
params.params.query_id,
params.params.fragment_instance_id,
params.backend_num,
_exec_env,
params.coord));
} else {
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(strings::Substitute(
"Failed to get query fragments context. Query may be timeout or be cancelled. host: ",
BackendOptions::get_localhost()));
}
fragments_ctx = search->second;
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host));
fragments_ctx->query_id = params.params.query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, &(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
fragments_ctx->query_globals = params.query_globals;

if (params.__isset.resource_info) {
fragments_ctx->user = params.resource_info.user;
fragments_ctx->group = params.resource_info.group;
fragments_ctx->set_rsc_info = true;
}

if (params.__isset.query_options) {
fragments_ctx->timeout_second = params.query_options.query_timeout;
}

{
// Find _fragments_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
} else {
// Already has a query fragmentscontext, use it
fragments_ctx = search->second;
}
}
}

exec_state.reset(new FragmentExecState(
fragments_ctx->query_id,
params.params.fragment_instance_id,
params.backend_num,
_exec_env,
fragments_ctx));
}

RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
// Duplicated
return Status::InternalError("Double execute");
}
// register exec_state before starting exec thread
_fragment_map.insert(std::make_pair(fragment_instance_id, exec_state));
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
}

auto st = _thread_pool->submit_func(
std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb));
std::bind<void>(&FragmentMgr::_exec_actual, this, exec_state, cb));
if (!st.ok()) {
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(fragment_instance_id);
_fragment_map.erase(params.params.fragment_instance_id);
}
exec_state->cancel_before_execute();
return Status::InternalError(strings::Substitute(
Expand All @@ -498,11 +574,11 @@ Status FragmentMgr::exec_plan_fragment(
return Status::OK();
}

Status FragmentMgr::cancel(const TUniqueId& id, const PPlanFragmentCancelReason& reason) {
Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason) {
std::shared_ptr<FragmentExecState> exec_state;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(id);
auto iter = _fragment_map.find(fragment_id);
if (iter == _fragment_map.end()) {
// No match
return Status::OK();
Expand All @@ -517,18 +593,25 @@ Status FragmentMgr::cancel(const TUniqueId& id, const PPlanFragmentCancelReason&
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
std::vector<TUniqueId> to_delete;
std::vector<TUniqueId> to_cancel;
DateTimeValue now = DateTimeValue::local_time();
{
std::lock_guard<std::mutex> lock(_lock);
for (auto& it : _fragment_map) {
if (it.second->is_timeout(now)) {
to_delete.push_back(it.second->fragment_instance_id());
to_cancel.push_back(it.second->fragment_instance_id());
}
}
for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
if (it->second->is_timeout(now)) {
it = _fragments_ctx_map.erase(it);
} else {
++it;
}
}
}
timeout_canceled_fragment_count->increment(to_delete.size());
for (auto& id : to_delete) {
timeout_canceled_fragment_count->increment(to_cancel.size());
for (auto& id : to_cancel) {
cancel(id, PPlanFragmentCancelReason::TIMEOUT);
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment " << print_id(id);
}
Expand Down
10 changes: 7 additions & 3 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@

namespace doris {

class QueryFragmentsCtx;
class ExecEnv;
class FragmentExecState;
class TExecPlanFragmentParams;
class TUniqueId;
class PlanFragmentExecutor;
class ThreadPool;
class TExecPlanFragmentParams;
class TExecPlanFragmentParamsList;
class TUniqueId;

std::string to_load_error_http_path(const std::string& file_name);

Expand Down Expand Up @@ -79,7 +81,7 @@ class FragmentMgr : public RestMonitorIface {
Status exec_external_plan_fragment(const TScanOpenParams& params, const TUniqueId& fragment_instance_id, std::vector<TScanColumnDesc>* selected_columns);

private:
void exec_actual(std::shared_ptr<FragmentExecState> exec_state,
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb);

// This is input params
Expand All @@ -89,6 +91,8 @@ class FragmentMgr : public RestMonitorIface {

// Make sure that remove this before no data reference FragmentExecState
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map;
// query id -> QueryFragmentsCtx
std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>> _fragments_ctx_map;

CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _cancel_thread;
Expand Down
Loading