diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index e579da7528afc6..25c5f957bb975e 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -830,7 +830,8 @@ struct MaterializationSharedState : public BasicSharedState { void create_counter_dependency(int operator_id, int node_id, const std::string& name); bool rpc_struct_inited = false; - Status rpc_status = Status::OK(); + AtomicStatus rpc_status; + bool last_block = false; // empty materialization sink block not need to merge block bool need_merge_block = true; diff --git a/be/src/pipeline/exec/materialization_sink_operator.cpp b/be/src/pipeline/exec/materialization_sink_operator.cpp index 51d399328454e4..46c555b5926254 100644 --- a/be/src/pipeline/exec/materialization_sink_operator.cpp +++ b/be/src/pipeline/exec/materialization_sink_operator.cpp @@ -83,10 +83,10 @@ class MaterializationCallback : public ::doris::DummyBrpcCallback { ::doris::DummyBrpcCallback::cntl_->ErrorText(), BackendOptions::get_localhost(), ::doris::DummyBrpcCallback::cntl_->latency_us()); - _shared_state->rpc_status = Status::InternalError(err); + _shared_state->rpc_status.update(Status::InternalError(err)); } else { - _shared_state->rpc_status = - Status::create(doris::DummyBrpcCallback::response_->status()); + _shared_state->rpc_status.update( + Status::create(doris::DummyBrpcCallback::response_->status())); } ((CountedFinishDependency*)_shared_state->source_deps.back().get())->sub(); } diff --git a/be/src/pipeline/exec/materialization_source_operator.cpp b/be/src/pipeline/exec/materialization_source_operator.cpp index e9eb7a02d21243..972b96c4a0f150 100644 --- a/be/src/pipeline/exec/materialization_source_operator.cpp +++ b/be/src/pipeline/exec/materialization_source_operator.cpp @@ -29,7 +29,7 @@ Status MaterializationSourceOperatorX::get_block(RuntimeState* state, vectorized auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); if (!local_state._shared_state->rpc_status.ok()) { - return local_state._shared_state->rpc_status; + return local_state._shared_state->rpc_status.status(); } // clear origin block, do merge response to build a ret block