Skip to content

Commit

Permalink
[bugfix](scannercore) scanner will core in deconstructor during colle…
Browse files Browse the repository at this point in the history
…ct profile (apache#28727)
  • Loading branch information
yiguolei authored and HappenLee committed Jan 12, 2024
1 parent 84a6998 commit 0b4e275
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 336 deletions.
4 changes: 4 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ class ExecNode {

size_t children_count() const { return _children.size(); }

// when the fragment is normal finished, call this method to do some finish work
// such as send the last buffer to remote.
virtual Status try_close(RuntimeState* state) { return Status::OK(); }

protected:
friend class DataSink;

Expand Down
64 changes: 19 additions & 45 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ bool ScanOperator::can_read() {
}
}

bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}

Status ScanOperator::try_close(RuntimeState* state) {
return _node->try_close(state);
}

bool ScanOperator::runtime_filters_are_ready_or_timeout() {
return _node->runtime_filters_are_ready_or_timeout();
}
Expand All @@ -81,9 +73,8 @@ std::string ScanOperator::debug_string() const {
fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
SourceOperator::debug_string(), _node->_scanner_ctx == nullptr);
if (_node->_scanner_ctx) {
fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ",
_node->_scanner_ctx->get_num_running_scanners(),
_node->_scanner_ctx->get_num_scheduling_ctx());
fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
_node->_scanner_ctx->get_num_running_scanners());
}
return fmt::to_string(debug_string_buffer);
}
Expand All @@ -101,9 +92,6 @@ std::string ScanOperator::debug_string() const {
template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
Expand Down Expand Up @@ -177,7 +165,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {

auto status = _eos ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
_finish_dependency->block();
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
Expand Down Expand Up @@ -570,15 +557,14 @@ std::string ScanLocalState<Derived>::debug_string(int indentation_level) const {
PipelineXLocalState<>::debug_string(indentation_level), _eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
fmt::format_to(
debug_string_buffer,
", Scanner Context: (_is_finished = {}, _should_stop = {}, "
"_num_running_scanners={}, "
"_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, status = {}, error = {})",
_scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
_scanner_ctx->get_num_running_scanners(), _scanner_ctx->get_num_scheduling_ctx(),
_scanner_ctx->get_num_unfinished_scanners(), _scanner_ctx->status().to_string(),
_scanner_ctx->status_error());
fmt::format_to(debug_string_buffer,
", Scanner Context: (_is_finished = {}, _should_stop = {}, "
"_num_running_scanners={}, "
" _num_unfinished_scanners = {}, status = {}, error = {})",
_scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
_scanner_ctx->get_num_running_scanners(),
_scanner_ctx->get_num_unfinished_scanners(),
_scanner_ctx->status().to_string(), _scanner_ctx->status_error());
}

return fmt::to_string(debug_string_buffer);
Expand Down Expand Up @@ -1226,24 +1212,27 @@ template <typename Derived>
Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
// Init scanner wrapper
for (auto it = scanners.begin(); it != scanners.end(); ++it) {
_scanners.emplace_back(std::make_shared<vectorized::ScannerDelegate>(*it));
}
if (scanners.empty()) {
_eos = true;
_scan_dependency->set_ready();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
RETURN_IF_ERROR(_start_scanners(_scanners));
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners,
p.limit(), state()->scan_queue_mem_limit(),
p._col_distribute_ids, 1, _scan_dependency,
_finish_dependency);
p._col_distribute_ids, 1, _scan_dependency);
return Status::OK();
}

Expand Down Expand Up @@ -1319,9 +1308,6 @@ Status ScanLocalState<Derived>::_init_profile() {

_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");

return Status::OK();
}

Expand Down Expand Up @@ -1429,17 +1415,6 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState* state) {
return Status::OK();
}

template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
auto& local_state = get_local_state(state);
if (local_state._scanner_ctx) {
// mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore
// TODO: there is a lock in `set_should_stop` may cause some slight impact
local_state._scanner_ctx->set_should_stop();
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::close(RuntimeState* state) {
if (_closed) {
Expand All @@ -1451,10 +1426,9 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {

SCOPED_TIMER(exec_time_counter());
if (_scanner_ctx) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state);
_scanner_ctx->stop_scanners(state);
}
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());

return PipelineXLocalState<>::close(state);
Expand Down Expand Up @@ -1511,7 +1485,7 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
if (eos) {
source_state = SourceState::FINISHED;
// reach limit, stop the scanners.
local_state._scanner_ctx->set_should_stop();
local_state._scanner_ctx->stop_scanners(state);
}

return Status::OK();
Expand Down
16 changes: 6 additions & 10 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
namespace doris {
class ExecNode;
} // namespace doris
namespace doris::vectorized {
class ScannerDelegate;
}

namespace doris::pipeline {
class PipScannerContext;
Expand All @@ -48,13 +51,9 @@ class ScanOperator : public SourceOperator<ScanOperatorBuilder> {

bool can_read() override; // for source

bool is_pending_finish() const override;

bool runtime_filters_are_ready_or_timeout() override;

std::string debug_string() const override;

Status try_close(RuntimeState* state) override;
};

class ScanDependency final : public Dependency {
Expand Down Expand Up @@ -171,7 +170,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};

Expand Down Expand Up @@ -214,7 +212,6 @@ class ScanLocalState : public ScanLocalStateBase {
Dependency* dependency() override { return _scan_dependency.get(); }

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
Dependency* finishdependency() override { return _finish_dependency.get(); }

protected:
template <typename LocalStateType>
Expand Down Expand Up @@ -350,7 +347,7 @@ class ScanLocalState : public ScanLocalStateBase {
Status _prepare_scanners();

// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<vectorized::VScannerSPtr>& scanners);
Status _start_scanners(const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);

// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
Expand Down Expand Up @@ -413,14 +410,13 @@ class ScanLocalState : public ScanLocalStateBase {

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;

std::shared_ptr<Dependency> _finish_dependency;
// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
};

template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
Status try_close(RuntimeState* state) override;

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); }
Status open(RuntimeState* state) override;
Expand Down
20 changes: 8 additions & 12 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,23 @@ class PipScannerContext : public vectorized::ScannerContext {
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances)
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const std::vector<int>& col_distribute_ids, const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}

PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const std::vector<int>& col_distribute_ids, const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency)
: vectorized::ScannerContext(state, output_tuple_desc, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances,
local_state, dependency, finish_dependency),
local_state, dependency),
_need_colocate_distribute(false) {}

Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
Expand Down Expand Up @@ -111,9 +110,6 @@ class PipScannerContext : public vectorized::ScannerContext {
return Status::OK();
}

// We should make those method lock free.
bool done() override { return _is_finished || _should_stop; }

void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) override {
const int queue_size = _blocks_queues.size();
const int block_size = blocks.size();
Expand Down
Loading

0 comments on commit 0b4e275

Please sign in to comment.