diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index dbc85147fe78e0..ea877a803c950f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -604,7 +604,7 @@ Status VTabletWriterV2::close(Status exec_status) { // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - _close_wait(false); + RETURN_IF_ERROR(_close_wait(false)); // send CLOSE_LOAD on all incremental streams if this is the last sink. // this must happen after all non-incremental streams are closed, @@ -614,7 +614,7 @@ Status VTabletWriterV2::close(Status exec_status) { } // close_wait on all incremental streams, even if this is not the last sink. - _close_wait(true); + RETURN_IF_ERROR(_close_wait(true)); // calculate and submit commit info if (is_last_sink) { @@ -663,7 +663,7 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -void VTabletWriterV2::_close_wait(bool incremental) { +Status VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); auto st = _load_stream_map->for_each_st( [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { @@ -688,6 +688,7 @@ void VTabletWriterV2::_close_wait(bool incremental) { if (!st.ok()) { LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); } + return st; } void VTabletWriterV2::_calc_tablets_to_commit() { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index b50044ede938c4..9f9743de3a2949 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -146,7 +146,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { void _calc_tablets_to_commit(); - void _close_wait(bool incremental); + Status _close_wait(bool incremental); void _cancel(Status status);