Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix](scannercore) scanner will core in deconstructor during collect profile #28727

Merged
merged 36 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ class ExecNode {

size_t children_count() const { return _children.size(); }

// when the fragment is normal finished, call this method to do some finish work
// such as send the last buffer to remote.
virtual Status try_close(RuntimeState* state) { return Status::OK(); }

protected:
friend class DataSink;

Expand Down
64 changes: 19 additions & 45 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ bool ScanOperator::can_read() {
}
}

bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}

Status ScanOperator::try_close(RuntimeState* state) {
return _node->try_close(state);
}

bool ScanOperator::runtime_filters_are_ready_or_timeout() {
return _node->runtime_filters_are_ready_or_timeout();
}
Expand All @@ -81,9 +73,8 @@ std::string ScanOperator::debug_string() const {
fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
SourceOperator::debug_string(), _node->_scanner_ctx == nullptr);
if (_node->_scanner_ctx) {
fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ",
_node->_scanner_ctx->get_num_running_scanners(),
_node->_scanner_ctx->get_num_scheduling_ctx());
fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
_node->_scanner_ctx->get_num_running_scanners());
}
return fmt::to_string(debug_string_buffer);
}
Expand All @@ -101,9 +92,6 @@ std::string ScanOperator::debug_string() const {
template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
Expand Down Expand Up @@ -177,7 +165,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {

auto status = _eos ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
_finish_dependency->block();
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
Expand Down Expand Up @@ -570,15 +557,14 @@ std::string ScanLocalState<Derived>::debug_string(int indentation_level) const {
PipelineXLocalState<>::debug_string(indentation_level), _eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
fmt::format_to(
debug_string_buffer,
", Scanner Context: (_is_finished = {}, _should_stop = {}, "
"_num_running_scanners={}, "
"_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, status = {}, error = {})",
_scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
_scanner_ctx->get_num_running_scanners(), _scanner_ctx->get_num_scheduling_ctx(),
_scanner_ctx->get_num_unfinished_scanners(), _scanner_ctx->status().to_string(),
_scanner_ctx->status_error());
fmt::format_to(debug_string_buffer,
", Scanner Context: (_is_finished = {}, _should_stop = {}, "
"_num_running_scanners={}, "
" _num_unfinished_scanners = {}, status = {}, error = {})",
_scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
_scanner_ctx->get_num_running_scanners(),
_scanner_ctx->get_num_unfinished_scanners(),
_scanner_ctx->status().to_string(), _scanner_ctx->status_error());
}

return fmt::to_string(debug_string_buffer);
Expand Down Expand Up @@ -1226,24 +1212,27 @@ template <typename Derived>
Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
// Init scanner wrapper
for (auto it = scanners.begin(); it != scanners.end(); ++it) {
_scanners.emplace_back(std::make_shared<vectorized::ScannerDelegate>(*it));
}
if (scanners.empty()) {
_eos = true;
_scan_dependency->set_ready();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
RETURN_IF_ERROR(_start_scanners(_scanners));
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners,
p.limit(), state()->scan_queue_mem_limit(),
p._col_distribute_ids, 1, _scan_dependency,
_finish_dependency);
p._col_distribute_ids, 1, _scan_dependency);
return Status::OK();
}

Expand Down Expand Up @@ -1319,9 +1308,6 @@ Status ScanLocalState<Derived>::_init_profile() {

_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");

return Status::OK();
}

Expand Down Expand Up @@ -1429,17 +1415,6 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState* state) {
return Status::OK();
}

template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
auto& local_state = get_local_state(state);
if (local_state._scanner_ctx) {
// mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore
// TODO: there is a lock in `set_should_stop` may cause some slight impact
local_state._scanner_ctx->set_should_stop();
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::close(RuntimeState* state) {
if (_closed) {
Expand All @@ -1451,10 +1426,9 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {

SCOPED_TIMER(exec_time_counter());
if (_scanner_ctx) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state);
_scanner_ctx->stop_scanners(state);
}
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());

return PipelineXLocalState<>::close(state);
Expand Down Expand Up @@ -1511,7 +1485,7 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
if (eos) {
source_state = SourceState::FINISHED;
// reach limit, stop the scanners.
local_state._scanner_ctx->set_should_stop();
local_state._scanner_ctx->stop_scanners(state);
}

return Status::OK();
Expand Down
16 changes: 6 additions & 10 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
namespace doris {
class ExecNode;
} // namespace doris
namespace doris::vectorized {
class ScannerDelegate;
}

namespace doris::pipeline {
class PipScannerContext;
Expand All @@ -48,13 +51,9 @@ class ScanOperator : public SourceOperator<ScanOperatorBuilder> {

bool can_read() override; // for source

bool is_pending_finish() const override;

bool runtime_filters_are_ready_or_timeout() override;

std::string debug_string() const override;

Status try_close(RuntimeState* state) override;
};

class ScanDependency final : public Dependency {
Expand Down Expand Up @@ -171,7 +170,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};

Expand Down Expand Up @@ -214,7 +212,6 @@ class ScanLocalState : public ScanLocalStateBase {
Dependency* dependency() override { return _scan_dependency.get(); }

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
Dependency* finishdependency() override { return _finish_dependency.get(); }

protected:
template <typename LocalStateType>
Expand Down Expand Up @@ -350,7 +347,7 @@ class ScanLocalState : public ScanLocalStateBase {
Status _prepare_scanners();

// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<vectorized::VScannerSPtr>& scanners);
Status _start_scanners(const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);

// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
Expand Down Expand Up @@ -413,14 +410,13 @@ class ScanLocalState : public ScanLocalStateBase {

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;

std::shared_ptr<Dependency> _finish_dependency;
// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
};

template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
Status try_close(RuntimeState* state) override;

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); }
Status open(RuntimeState* state) override;
Expand Down
20 changes: 8 additions & 12 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,23 @@ class PipScannerContext : public vectorized::ScannerContext {
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances)
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const std::vector<int>& col_distribute_ids, const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}

PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const std::vector<int>& col_distribute_ids, const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency)
: vectorized::ScannerContext(state, output_tuple_desc, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances,
local_state, dependency, finish_dependency),
local_state, dependency),
_need_colocate_distribute(false) {}

Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
Expand Down Expand Up @@ -111,9 +110,6 @@ class PipScannerContext : public vectorized::ScannerContext {
return Status::OK();
}

// We should make those method lock free.
bool done() override { return _is_finished || _should_stop; }

void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) override {
const int queue_size = _blocks_queues.size();
const int block_size = blocks.size();
Expand Down
Loading
Loading