Skip to content

Commit

Permalink
[fix](spill) inner local state should be initialized in init phase
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Jan 23, 2025
1 parent 6c8412d commit a00a13e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
26 changes: 13 additions & 13 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", TUnit::UNIT, 1);
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1);
RETURN_IF_ERROR(_setup_internal_operator(state));
return Status::OK();
}

Expand All @@ -70,7 +71,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
_shared_state->setup_shared_profile(_profile);
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
RETURN_IF_ERROR(p._setup_internal_operator(state));
_shared_state->inner_runtime_state->set_task(state->get_task());
for (uint32_t i = 0; i != p._partition_count; ++i) {
auto& spilling_stream = _shared_state->spilled_streams[i];
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
Expand Down Expand Up @@ -513,9 +514,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
return _inner_sink_operator->open(state);
}

Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* state) {
auto& local_state = get_local_state(state);

Status PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* state) {
auto inner_runtime_state = RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
Expand All @@ -527,29 +526,30 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
inner_runtime_state->resize_op_id_to_local_state(-1);
inner_runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr());

auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto inner_shared_state = std::dynamic_pointer_cast<HashJoinSharedState>(
_inner_sink_operator->create_shared_state());
LocalSinkStateInfo info {
0, local_state._internal_runtime_profile.get(), -1, inner_shared_state.get(), {}, {}};
p._inner_sink_operator->create_shared_state());
LocalSinkStateInfo info {0, _internal_runtime_profile.get(), -1, inner_shared_state.get(), {},
{}};

RETURN_IF_ERROR(_inner_sink_operator->setup_local_state(inner_runtime_state.get(), info));
RETURN_IF_ERROR(p._inner_sink_operator->setup_local_state(inner_runtime_state.get(), info));
auto* sink_local_state = inner_runtime_state->get_sink_local_state();
DCHECK(sink_local_state != nullptr);

LocalStateInfo state_info {
local_state._internal_runtime_profile.get(), {}, inner_shared_state.get(), {}, 0};
_internal_runtime_profile.get(), {}, inner_shared_state.get(), {}, 0};

RETURN_IF_ERROR(
_inner_probe_operator->setup_local_state(inner_runtime_state.get(), state_info));
p._inner_probe_operator->setup_local_state(inner_runtime_state.get(), state_info));
auto* probe_local_state =
inner_runtime_state->get_local_state(_inner_probe_operator->operator_id());
inner_runtime_state->get_local_state(p._inner_probe_operator->operator_id());
DCHECK(probe_local_state != nullptr);
RETURN_IF_ERROR(probe_local_state->open(state));
RETURN_IF_ERROR(sink_local_state->open(state));

/// Set these two values after all the work is ready.
local_state._shared_state->inner_shared_state = std::move(inner_shared_state);
local_state._shared_state->inner_runtime_state = std::move(inner_runtime_state);
_shared_state->inner_shared_state = std::move(inner_shared_state);
_shared_state->inner_runtime_state = std::move(inner_runtime_state);
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class PartitionedHashJoinSinkLocalState

Status _finish_spilling();

Status _setup_internal_operator(RuntimeState* state);

friend class PartitionedHashJoinSinkOperatorX;

bool _child_eos {false};
Expand Down Expand Up @@ -140,8 +142,6 @@ class PartitionedHashJoinSinkOperatorX
private:
friend class PartitionedHashJoinSinkLocalState;

Status _setup_internal_operator(RuntimeState* state);

const TJoinDistributionType::type _join_distribution;

std::vector<TExpr> _build_exprs;
Expand Down

0 comments on commit a00a13e

Please sign in to comment.