Skip to content

Commit

Permalink
[pipelineX](bug) Fix core dump if cancelled (apache#27449)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and seawinde committed Nov 28, 2023
1 parent f65c6e9 commit 91ede00
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 67 deletions.
44 changes: 1 addition & 43 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}

_executor.close = std::bind<void>(&AggLocalState::_close_without_key, this);
} else {
if (p._needs_finalize) {
_executor.get_result = std::bind<Status>(
Expand All @@ -75,10 +73,9 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
&AggLocalState::_serialize_with_serialized_key_result, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
_executor.close = std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
}

_agg_data_created_without_key = p._without_key;
_shared_state->agg_data_created_without_key = p._without_key;
return Status::OK();
}

Expand All @@ -91,39 +88,6 @@ Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
return Status::OK();
}

void AggLocalState::_close_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = *agg_method.hash_table;
data.for_each_mapped([&](auto& mapped) {
if (mapped) {
static_cast<void>(_destroy_agg_status(mapped));
mapped = nullptr;
}
});
if (data.has_null_key_data()) {
auto st = _destroy_agg_status(
data.template get_null_key_data<vectorized::AggregateDataPtr>());
if (!st) {
throw Exception(st.code(), st.to_string());
}
}
},
_agg_data->method_variant);
_release_tracker();
}

void AggLocalState::_close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
//but finally call close to destory agg data, if agg data has bitmapValue
//will be core dump, it's not initialized
if (_agg_data_created_without_key) {
static_cast<void>(_destroy_agg_status(_agg_data->without_key));
_agg_data_created_without_key = false;
}
_release_tracker();
}

Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
Expand Down Expand Up @@ -597,12 +561,6 @@ Status AggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (_executor.close) {
_executor.close();
}

/// _hash_table_size_counter may be null if prepare failed.
if (_hash_table_size_counter) {
Expand Down
10 changes: 0 additions & 10 deletions be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {
friend class DistinctStreamingAggSourceOperatorX;
friend class DistinctStreamingAggSinkOperatorX;

void _close_without_key();
void _close_with_serialized_key();
Status _get_without_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
Status _serialize_without_key(RuntimeState* state, vectorized::Block* block,
Expand Down Expand Up @@ -122,11 +120,6 @@ class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {
}
}
}
void _release_tracker() {
Base::_shared_state->mem_tracker->release(
Base::_shared_state->mem_usage_record.used_in_state +
Base::_shared_state->mem_usage_record.used_in_arena);
}

RuntimeProfile::Counter* _get_results_timer;
RuntimeProfile::Counter* _serialize_result_timer;
Expand All @@ -137,17 +130,14 @@ class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {

using vectorized_get_result = std::function<Status(
RuntimeState* state, vectorized::Block* block, SourceState& source_state)>;
using vectorized_closer = std::function<void()>;

struct executor {
vectorized_get_result get_result;
vectorized_closer close;
};

executor _executor;

vectorized::AggregatedDataVariants* _agg_data;
bool _agg_data_created_without_key = false;
};

class AggSourceOperatorX : public OperatorX<AggLocalState> {
Expand Down
77 changes: 64 additions & 13 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
struct BasicSharedState {
Dependency* source_dep;
Dependency* sink_dep;

std::atomic<int> ref_count = 0;

void ref() { ref_count++; }
virtual Status close(RuntimeState* state) { return Status::OK(); }
virtual ~BasicSharedState() = default;
};

class Dependency : public std::enable_shared_from_this<Dependency> {
Expand Down Expand Up @@ -110,16 +116,6 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
virtual void block() { _ready = false; }

protected:
bool _should_log(uint64_t cur_time) {
if (cur_time < SLOW_DEPENDENCY_THRESHOLD) {
return false;
}
if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) {
return false;
}
_last_log_time = cur_time;
return true;
}
void _add_block_task(PipelineXTask* task);
bool _is_cancelled() const {
return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
Expand All @@ -134,10 +130,8 @@ class Dependency : public std::enable_shared_from_this<Dependency> {

std::shared_ptr<BasicSharedState> _shared_state {nullptr};
MonotonicStopWatch _watcher;
std::weak_ptr<Dependency> _parent;
std::list<std::shared_ptr<Dependency>> _children;

uint64_t _last_log_time = 0;
std::mutex _task_lock;
std::vector<PipelineXTask*> _blocked_task;
};
Expand Down Expand Up @@ -249,11 +243,25 @@ struct AggSharedState : public BasicSharedState {
agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
}
virtual ~AggSharedState() = default;
~AggSharedState() override = default;
void init_spill_partition_helper(size_t spill_partition_count_bits) {
spill_partition_helper =
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
}
Status close(RuntimeState* state) override {
if (ref_count.fetch_sub(1) == 1) {
for (auto* aggregate_evaluator : aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (probe_expr_ctxs.empty()) {
_close_without_key();
} else {
_close_with_serialized_key();
}
}
return Status::OK();
}

vectorized::AggregatedDataVariantsUPtr agg_data;
std::unique_ptr<vectorized::AggregateDataContainer> aggregate_data_container;
vectorized::AggSpillContext spill_context;
Expand All @@ -280,6 +288,49 @@ struct AggSharedState : public BasicSharedState {
};
MemoryRecord mem_usage_record;
std::unique_ptr<MemTracker> mem_tracker = std::make_unique<MemTracker>("AggregateOperator");
bool agg_data_created_without_key = false;

private:
void _release_tracker() {
mem_tracker->release(mem_usage_record.used_in_state + mem_usage_record.used_in_arena);
}
void _close_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = *agg_method.hash_table;
data.for_each_mapped([&](auto& mapped) {
if (mapped) {
static_cast<void>(_destroy_agg_status(mapped));
mapped = nullptr;
}
});
if (data.has_null_key_data()) {
auto st = _destroy_agg_status(
data.template get_null_key_data<vectorized::AggregateDataPtr>());
if (!st) {
throw Exception(st.code(), st.to_string());
}
}
},
agg_data->method_variant);
_release_tracker();
}
void _close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
//but finally call close to destory agg data, if agg data has bitmapValue
//will be core dump, it's not initialized
if (agg_data_created_without_key) {
static_cast<void>(_destroy_agg_status(agg_data->without_key));
agg_data_created_without_key = false;
}
_release_tracker();
}
Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
for (int i = 0; i < aggregate_evaluators.size(); ++i) {
aggregate_evaluators[i]->function()->destroy(data + offsets_of_aggregate_states[i]);
}
return Status::OK();
}
};

struct SortSharedState : public BasicSharedState {
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
auto& deps = info.upstream_dependencies;
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
_wait_for_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time");
_shared_state->source_dep = _dependency;
Expand Down Expand Up @@ -382,6 +383,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state) {
RETURN_IF_ERROR(_shared_state->close(state));
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
Expand Down Expand Up @@ -410,6 +414,7 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time");
}
_shared_state->ref();
} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx());
Expand All @@ -429,6 +434,9 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if (_closed) {
return Status::OK();
}
if (_shared_state) {
RETURN_IF_ERROR(_shared_state->close(state));
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {

protected:
DependencyType* _dependency;
typename DependencyType::SharedState* _shared_state;
typename DependencyType::SharedState* _shared_state = nullptr;
};

class DataSinkOperatorXBase;
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
}
if (reason == PPlanFragmentCancelReason::TIMEOUT) {
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
}
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
task->clear_blocking_state();
Expand Down

0 comments on commit 91ede00

Please sign in to comment.