Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipelineX](bug) Fix core dump if cancelled #27449

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 1 addition & 43 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}

_executor.close = std::bind<void>(&AggLocalState::_close_without_key, this);
} else {
if (p._needs_finalize) {
_executor.get_result = std::bind<Status>(
Expand All @@ -76,10 +74,9 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
&AggLocalState::_serialize_with_serialized_key_result, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
_executor.close = std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
}

_agg_data_created_without_key = p._without_key;
_shared_state->agg_data_created_without_key = p._without_key;
return Status::OK();
}

Expand All @@ -92,39 +89,6 @@ Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
return Status::OK();
}

void AggLocalState::_close_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = *agg_method.hash_table;
data.for_each_mapped([&](auto& mapped) {
if (mapped) {
static_cast<void>(_destroy_agg_status(mapped));
mapped = nullptr;
}
});
if (data.has_null_key_data()) {
auto st = _destroy_agg_status(
data.template get_null_key_data<vectorized::AggregateDataPtr>());
if (!st) {
throw Exception(st.code(), st.to_string());
}
}
},
_agg_data->method_variant);
_release_tracker();
}

void AggLocalState::_close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
//but finally call close to destory agg data, if agg data has bitmapValue
//will be core dump, it's not initialized
if (_agg_data_created_without_key) {
static_cast<void>(_destroy_agg_status(_agg_data->without_key));
_agg_data_created_without_key = false;
}
_release_tracker();
}

Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
Expand Down Expand Up @@ -598,12 +562,6 @@ Status AggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (_executor.close) {
_executor.close();
}

/// _hash_table_size_counter may be null if prepare failed.
if (_hash_table_size_counter) {
Expand Down
10 changes: 0 additions & 10 deletions be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {
friend class DistinctStreamingAggSourceOperatorX;
friend class DistinctStreamingAggSinkOperatorX;

void _close_without_key();
void _close_with_serialized_key();
Status _get_without_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
Status _serialize_without_key(RuntimeState* state, vectorized::Block* block,
Expand Down Expand Up @@ -122,11 +120,6 @@ class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {
}
}
}
void _release_tracker() {
Base::_shared_state->mem_tracker->release(
Base::_shared_state->mem_usage_record.used_in_state +
Base::_shared_state->mem_usage_record.used_in_arena);
}

RuntimeProfile::Counter* _get_results_timer;
RuntimeProfile::Counter* _serialize_result_timer;
Expand All @@ -137,17 +130,14 @@ class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {

using vectorized_get_result = std::function<Status(
RuntimeState* state, vectorized::Block* block, SourceState& source_state)>;
using vectorized_closer = std::function<void()>;

struct executor {
vectorized_get_result get_result;
vectorized_closer close;
};

executor _executor;

vectorized::AggregatedDataVariants* _agg_data;
bool _agg_data_created_without_key = false;
};

class AggSourceOperatorX : public OperatorX<AggLocalState> {
Expand Down
77 changes: 64 additions & 13 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
struct BasicSharedState {
Dependency* source_dep;
Dependency* sink_dep;

std::atomic<int> ref_count = 0;

void ref() { ref_count++; }
virtual Status close(RuntimeState* state) { return Status::OK(); }
virtual ~BasicSharedState() = default;
};

class Dependency : public std::enable_shared_from_this<Dependency> {
Expand Down Expand Up @@ -110,16 +116,6 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
virtual void block() { _ready = false; }

protected:
bool _should_log(uint64_t cur_time) {
if (cur_time < SLOW_DEPENDENCY_THRESHOLD) {
return false;
}
if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) {
return false;
}
_last_log_time = cur_time;
return true;
}
void _add_block_task(PipelineXTask* task);
bool _is_cancelled() const {
return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
Expand All @@ -134,10 +130,8 @@ class Dependency : public std::enable_shared_from_this<Dependency> {

std::shared_ptr<BasicSharedState> _shared_state {nullptr};
MonotonicStopWatch _watcher;
std::weak_ptr<Dependency> _parent;
std::list<std::shared_ptr<Dependency>> _children;

uint64_t _last_log_time = 0;
std::mutex _task_lock;
std::vector<PipelineXTask*> _blocked_task;
};
Expand Down Expand Up @@ -249,11 +243,25 @@ struct AggSharedState : public BasicSharedState {
agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
}
virtual ~AggSharedState() = default;
~AggSharedState() override = default;
void init_spill_partition_helper(size_t spill_partition_count_bits) {
spill_partition_helper =
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
}
Status close(RuntimeState* state) override {
if (ref_count.fetch_sub(1) == 1) {
for (auto* aggregate_evaluator : aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (probe_expr_ctxs.empty()) {
_close_without_key();
} else {
_close_with_serialized_key();
}
}
return Status::OK();
}

vectorized::AggregatedDataVariantsUPtr agg_data;
std::unique_ptr<vectorized::AggregateDataContainer> aggregate_data_container;
vectorized::AggSpillContext spill_context;
Expand All @@ -280,6 +288,49 @@ struct AggSharedState : public BasicSharedState {
};
MemoryRecord mem_usage_record;
std::unique_ptr<MemTracker> mem_tracker = std::make_unique<MemTracker>("AggregateOperator");
bool agg_data_created_without_key = false;

private:
void _release_tracker() {
mem_tracker->release(mem_usage_record.used_in_state + mem_usage_record.used_in_arena);
}
void _close_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = *agg_method.hash_table;
data.for_each_mapped([&](auto& mapped) {
if (mapped) {
static_cast<void>(_destroy_agg_status(mapped));
mapped = nullptr;
}
});
if (data.has_null_key_data()) {
auto st = _destroy_agg_status(
data.template get_null_key_data<vectorized::AggregateDataPtr>());
if (!st) {
throw Exception(st.code(), st.to_string());
}
}
},
agg_data->method_variant);
_release_tracker();
}
void _close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
//but finally call close to destory agg data, if agg data has bitmapValue
//will be core dump, it's not initialized
if (agg_data_created_without_key) {
static_cast<void>(_destroy_agg_status(agg_data->without_key));
agg_data_created_without_key = false;
}
_release_tracker();
}
Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method '_destroy_agg_status' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
static Status _destroy_agg_status(vectorized::AggregateDataPtr data) {

for (int i = 0; i < aggregate_evaluators.size(); ++i) {
aggregate_evaluators[i]->function()->destroy(data + offsets_of_aggregate_states[i]);
}
return Status::OK();
}
};

struct SortSharedState : public BasicSharedState {
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
auto& deps = info.upstream_dependencies;
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
_wait_for_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time");
_shared_state->source_dep = _dependency;
Expand Down Expand Up @@ -382,6 +383,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state) {
RETURN_IF_ERROR(_shared_state->close(state));
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
Expand Down Expand Up @@ -410,6 +414,7 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time");
}
_shared_state->ref();
} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx());
Expand All @@ -429,6 +434,9 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if (_closed) {
return Status::OK();
}
if (_shared_state) {
RETURN_IF_ERROR(_shared_state->close(state));
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {

protected:
DependencyType* _dependency;
typename DependencyType::SharedState* _shared_state;
typename DependencyType::SharedState* _shared_state = nullptr;
};

class DataSinkOperatorXBase;
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
}
if (reason == PPlanFragmentCancelReason::TIMEOUT) {
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
}
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
task->clear_blocking_state();
Expand Down
Loading