Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,8 @@ struct MaterializationSharedState : public BasicSharedState {
void create_counter_dependency(int operator_id, int node_id, const std::string& name);

bool rpc_struct_inited = false;
Status rpc_status = Status::OK();
AtomicStatus rpc_status;

bool last_block = false;
// empty materialization sink block not need to merge block
bool need_merge_block = true;
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/materialization_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class MaterializationCallback : public ::doris::DummyBrpcCallback<Response> {
::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
BackendOptions::get_localhost(),
::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
_shared_state->rpc_status = Status::InternalError(err);
_shared_state->rpc_status.update(Status::InternalError(err));
} else {
_shared_state->rpc_status =
Status::create(doris::DummyBrpcCallback<Response>::response_->status());
_shared_state->rpc_status.update(
Status::create(doris::DummyBrpcCallback<Response>::response_->status()));
}
((CountedFinishDependency*)_shared_state->source_deps.back().get())->sub();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/materialization_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Status MaterializationSourceOperatorX::get_block(RuntimeState* state, vectorized
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (!local_state._shared_state->rpc_status.ok()) {
return local_state._shared_state->rpc_status;
return local_state._shared_state->rpc_status.status();
}

// clear origin block, do merge response to build a ret block
Expand Down
Loading