diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 3ebe331cfc12f89..95adf3d6e50cee7 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -83,9 +83,19 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, TabletSchemaSPtr flush_schema) { + bool ok = false; for (const auto& stream : _streams) { - RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id, - _context.tablet_id, segment_id, segstat, flush_schema)); + auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id, + segment_id, segstat, flush_schema); + if (!st.ok()) { + LOG(WARNING) << "failed to add segment " << segment_id << " to stream " + << stream->stream_id(); + } + ok = ok || st.ok(); + } + if (!ok) { + return Status::InternalError("failed to add segment {} of tablet {} to any replicas", + segment_id, _context.tablet_id); } return Status::OK(); } diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 8adc90364e15e74..b7005e6d7178326 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -61,7 +61,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, _txn_id(txn_id), _load_stream_mgr(load_stream_mgr) { load_stream_mgr->create_tokens(_flush_tokens); - _failed_st = std::make_shared(); + _status = Status::OK(); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); @@ -70,7 +70,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id - << ", tablet_id=" << tablet_stream._id << ", status=" << *tablet_stream._failed_st; + << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status; return ostr; } @@ -88,17 +88,16 @@ Status TabletStream::init(std::shared_ptr schema, int64_t }; _load_stream_writer = std::make_shared(&req, _profile); - auto st = _load_stream_writer->init(); - if (!st.ok()) { - _failed_st = std::make_shared(st); + _status = _load_stream_writer->init(); + if (!_status.ok()) { LOG(INFO) << "failed to init rowset builder due to " << *this; } - return st; + return _status; } Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { - if (!_failed_st->ok()) { - return *_failed_st; + if (!_status.ok()) { + return _status; } // dispatch add_segment request @@ -155,9 +154,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data file_type, new_segid); } } - if (!st.ok() && _failed_st->ok()) { - _failed_st = std::make_shared(st); - LOG(INFO) << "write data failed " << *this; + if (!st.ok() && _status.ok()) { + _status = st; + LOG(WARNING) << "write data failed " << st << ", " << *this; } }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; @@ -172,10 +171,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data timer.start(); while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { - return Status::Error( + _status = Status::Error( "wait flush token back pressure time is more than " "load_stream_max_wait_flush_token_time {}", load_stream_max_wait_flush_token_time_ms); + return _status; } bthread_usleep(2 * 1000); // 2ms } @@ -183,10 +183,18 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data int64_t time_ms = timer.elapsed_time() / 1000 / 1000; g_load_stream_flush_wait_ms << time_ms; g_load_stream_flush_running_threads << 1; - return flush_token->submit_func(flush_func); + auto st = flush_token->submit_func(flush_func); + if (!st.ok()) { + _status = st; + } + return _status; } Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { + if (!_status.ok()) { + return _status; + } + SCOPED_TIMER(_add_segment_timer); DCHECK(header.has_segment_statistics()); SegmentStatistics stat(header.segment_statistics()); @@ -203,15 +211,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data { std::lock_guard lock_guard(_lock); if (!_segids_mapping.contains(src_id)) { - return Status::InternalError( + _status = Status::InternalError( "add segment failed, no segment written by this src be yet, src_id={}, " "segment_id={}", src_id, segid); + return _status; } if (segid >= _segids_mapping[src_id]->size()) { - return Status::InternalError( + _status = Status::InternalError( "add segment failed, segment is never written, src_id={}, segment_id={}", src_id, segid); + return _status; } new_segid = _segids_mapping[src_id]->at(segid); } @@ -220,16 +230,24 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data auto add_segment_func = [this, new_segid, stat, flush_schema]() { signal::set_signal_task_id(_load_id); auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); - if (!st.ok() && _failed_st->ok()) { - _failed_st = std::make_shared(st); + if (!st.ok() && _status.ok()) { + _status = st; LOG(INFO) << "add segment failed " << *this; } }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; - return flush_token->submit_func(add_segment_func); + auto st = flush_token->submit_func(add_segment_func); + if (!st.ok()) { + _status = st; + } + return _status; } Status TabletStream::close() { + if (!_status.ok()) { + return _status; + } + SCOPED_TIMER(_close_wait_timer); bthread::Mutex mu; std::unique_lock lock(mu); @@ -246,23 +264,24 @@ Status TabletStream::close() { if (ret) { cv.wait(lock); } else { - return Status::Error( + _status = Status::Error( "there is not enough thread resource for close load"); + return _status; } - if (!_failed_st->ok()) { - return *_failed_st; - } if (_next_segid.load() != _num_segments) { - return Status::Corruption( + _status = Status::Corruption( "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, _num_segments, _next_segid.load(), print_id(_load_id)); + return _status; } - Status st = Status::OK(); - auto close_func = [this, &mu, &cv, &st]() { + auto close_func = [this, &mu, &cv]() { signal::set_signal_task_id(_load_id); - st = _load_stream_writer->close(); + auto st = _load_stream_writer->close(); + if (!st.ok() && _status.ok()) { + _status = st; + } std::lock_guard lock(mu); cv.notify_one(); }; @@ -270,10 +289,10 @@ Status TabletStream::close() { if (ret) { cv.wait(lock); } else { - return Status::Error( + _status = Status::Error( "there is not enough thread resource for close load"); } - return st; + return _status; } IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, @@ -297,7 +316,7 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) std::lock_guard lock_guard(_lock); auto it = _tablet_streams_map.find(tablet_id); if (it == _tablet_streams_map.end()) { - RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id())); + _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); } else { tablet_stream = it->second; } @@ -306,17 +325,19 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) return tablet_stream->append_data(header, data); } -Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, - int64_t partition_id) { +void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, + int64_t partition_id) { tablet_stream = std::make_shared(_load_id, tablet_id, _txn_id, _load_stream_mgr, _profile); _tablet_streams_map[tablet_id] = tablet_stream; - RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id)); - return Status::OK(); + auto st = tablet_stream->init(_schema, _id, partition_id); + if (!st.ok()) { + LOG(WARNING) << "tablet stream init failed " << *tablet_stream; + } } -Status IndexStream::close(const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablets) { +void IndexStream::close(const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablets) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); // open all need commit tablets @@ -327,8 +348,7 @@ Status IndexStream::close(const std::vector& tablets_to_commit, TabletStreamSharedPtr tablet_stream; auto it = _tablet_streams_map.find(tablet.tablet_id()); if (it == _tablet_streams_map.end()) { - RETURN_IF_ERROR( - _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id())); + _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); tablet_stream->add_num_segments(tablet.num_segments()); } else { it->second->add_num_segments(tablet.num_segments()); @@ -344,7 +364,6 @@ Status IndexStream::close(const std::vector& tablets_to_commit, failed_tablets->emplace_back(tablet_stream->id(), st); } } - return Status::OK(); } // TODO: Profile is temporary disabled, because: @@ -396,8 +415,8 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) { return Status::OK(); } -Status LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablets) { +void LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablets) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); @@ -415,16 +434,14 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t if (_close_load_cnt < _total_streams) { // do not return commit info if there is remaining streams. - return Status::OK(); + return; } for (auto& [_, index_stream] : _index_streams_map) { - RETURN_IF_ERROR( - index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets)); + index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); } LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() << ", failed_tablet_num=" << failed_tablets->size(); - return Status::OK(); } void LoadStream::_report_result(StreamId stream, const Status& status, @@ -610,8 +627,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); - auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); - _report_result(id, st, success_tablet_ids, failed_tablets, true); + close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); + _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); brpc::StreamClose(id); } break; case PStreamHeader::GET_SCHEMA: { diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 80e69c784ad789c..427bc2dbb62cc83 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -66,7 +66,7 @@ class TabletStream { std::atomic _next_segid; int64_t _num_segments = 0; bthread::Mutex _lock; - std::shared_ptr _failed_st; + Status _status; PUniqueId _load_id; int64_t _txn_id; RuntimeProfile* _profile = nullptr; @@ -86,12 +86,12 @@ class IndexStream { Status append_data(const PStreamHeader& header, butil::IOBuf* data); - Status close(const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); + void close(const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); private: - Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, - int64_t partition_id); + void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, + int64_t partition_id); private: int64_t _id; @@ -124,8 +124,8 @@ class LoadStream : public brpc::StreamInputHandler { } } - Status close(int64_t src_id, const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); + void close(int64_t src_id, const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); // callbacks called by brpc int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override; diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 2fcb8deaeb2c85b..ba69efd9fe38f03 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -77,10 +77,14 @@ Status LoadStreamMap::for_each_st(std::function std::lock_guard lock(_mutex); snapshot = _streams_for_node; } + Status status = Status::OK(); for (auto& [dst_id, streams] : snapshot) { - RETURN_IF_ERROR(fn(dst_id, *streams)); + auto st = fn(dst_id, *streams); + if (!st.ok() && status.ok()) { + status = st; + } } - return Status::OK(); + return status; } void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, @@ -112,19 +116,26 @@ Status LoadStreamMap::close_load(bool incremental) { tablets_to_commit.push_back(tablet); tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); } + Status status = Status::OK(); bool first = true; for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; } if (first) { - RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + auto st = stream->close_load(tablets_to_commit); + if (!st.ok() && status.ok()) { + status = st; + } first = false; } else { - RETURN_IF_ERROR(stream->close_load({})); + auto st = stream->close_load({}); + if (!st.ok() && status.ok()) { + status = st; + } } } - return Status::OK(); + return status; }); } diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index f322d67ceaf9c34..416276bc662a1d3 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -149,7 +149,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, int64_t idle_timeout_ms, bool enable_profile) { std::unique_lock lock(_open_mutex); if (_is_init.load()) { - return Status::OK(); + return _init_st; } _dst_id = node_info.id; std::string host_port = get_host_port(node_info.host, node_info.brpc_port); @@ -161,7 +161,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, brpc::Controller cntl; if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) { delete opt.handler; - return Status::Error(ret, "Failed to create stream"); + _init_st = Status::Error(ret, "Failed to create stream"); + return _init_st; } cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenLoadStreamRequest request; @@ -174,7 +175,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } else if (total_streams > 0) { request.set_total_streams(total_streams); } else { - return Status::InternalError("total_streams should be greator than 0"); + _init_st = Status::InternalError("total_streams should be greator than 0"); + return _init_st; } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); @@ -195,8 +197,9 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } if (cntl.Failed()) { brpc::StreamClose(_stream_id); - return Status::InternalError("Failed to connect to backend {}: {}", _dst_id, - cntl.ErrorText()); + _init_st = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, + cntl.ErrorText()); + return _init_st; } LOG(INFO) << "open load stream to " << host_port << ", " << *this; _is_init.store(true); @@ -207,6 +210,10 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span data, bool segment_eos, FileType file_type) { + if (!_is_init.load()) { + add_failed_tablet(tablet_id, _init_st); + return _init_st; + } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { return Status::OK(); @@ -230,6 +237,10 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { + if (!_is_init.load()) { + add_failed_tablet(tablet_id, _init_st); + return _init_st; + } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { return Status::OK(); @@ -252,6 +263,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector& tablets_to_commit) { + if (!_is_init.load()) { + return _init_st; + } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); @@ -259,11 +273,20 @@ Status LoadStreamStub::close_load(const std::vector& tablets_to_commi for (const auto& tablet : tablets_to_commit) { *header.add_tablets() = tablet; } - return _encode_and_send(header); + _close_st = _encode_and_send(header); + if (!_close_st.ok()) { + LOG(WARNING) << "stream " << _stream_id << " close failed: " << _close_st; + return _close_st; + } + _is_closing.store(true); + return Status::OK(); } // GET_SCHEMA Status LoadStreamStub::get_schema(const std::vector& tablets) { + if (!_is_init.load()) { + return _init_st; + } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); @@ -284,6 +307,9 @@ Status LoadStreamStub::get_schema(const std::vector& tablets) { Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t timeout_ms) { + if (!_is_init.load()) { + return _init_st; + } if (_tablet_schema_for_index->contains(index_id)) { return Status::OK(); } @@ -310,7 +336,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); if (!_is_init.load()) { - return Status::InternalError("stream {} is not opened, {}", _stream_id, to_string()); + return _init_st; + } + if (!_is_closing.load()) { + return _close_st; } if (_is_closed.load()) { return _check_cancel(); @@ -341,7 +370,7 @@ void LoadStreamStub::cancel(Status reason) { LOG(WARNING) << *this << " is cancelled because of " << reason; { std::lock_guard lock(_cancel_mutex); - _cancel_reason = reason; + _cancel_st = reason; _is_cancelled.store(true); } { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index b6436a4b81a4e49..b06b19e511d99dc 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -223,11 +223,12 @@ class LoadStreamStub : public std::enable_shared_from_this { } std::lock_guard lock(_cancel_mutex); return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), - _cancel_reason.to_string_no_stack()); + _cancel_st.to_string_no_stack()); } protected: std::atomic _is_init; + std::atomic _is_closing; std::atomic _is_closed; std::atomic _is_cancelled; std::atomic _is_eos; @@ -236,7 +237,9 @@ class LoadStreamStub : public std::enable_shared_from_this { brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id - Status _cancel_reason; + Status _init_st; + Status _close_st; + Status _cancel_st; bthread::Mutex _open_mutex; bthread::Mutex _close_mutex; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 8bf0520aba09ecb..16589482817640e 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -279,18 +279,18 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream return Status::InternalError("Unknown node {} in tablet location", dst_id); } auto idle_timeout_ms = _state->execution_timeout() * 1000; - // get tablet schema from each backend only in the 1st stream - for (auto& stream : streams | std::ranges::views::take(1)) { - const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, tablets_for_schema, _total_streams, - idle_timeout_ms, _state->enable_profile())); - } - // for the rest streams, open without getting tablet schema - for (auto& stream : streams | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, - _state->enable_profile())); + std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; + for (auto& stream : streams) { + auto st = stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + _txn_id, *_schema, tablets_for_schema, _total_streams, + idle_timeout_ms, _state->enable_profile()); + if (st.ok()) { + // get tablet schema from each backend only in the 1st stream + tablets_for_schema.clear(); + } else { + LOG(WARNING) << "failed to open stream to backend " << dst_id + << ", load_id=" << print_id(_load_id); + } } return Status::OK(); } @@ -568,23 +568,39 @@ Status VTabletWriterV2::close(Status exec_status) { // send CLOSE_LOAD on all non-incremental streams if this is the last sink if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load(false)); + auto st = _load_stream_map->close_load(false); + if (!st.ok()) { + LOG(WARNING) << "close_load failed: " << st; + } } // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - RETURN_IF_ERROR(_close_wait(false)); + { + auto st = _close_wait(false); + if (!st.ok()) { + LOG(WARNING) << "close_wait failed: " << st; + } + } // send CLOSE_LOAD on all incremental streams if this is the last sink. // this must happen after all non-incremental streams are closed, // so we can ensure all sinks are in close phase before closing incremental streams. if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load(true)); + auto st = _load_stream_map->close_load(true); + if (!st.ok()) { + LOG(WARNING) << "close_load failed: " << st; + } } // close_wait on all incremental streams, even if this is not the last sink. - RETURN_IF_ERROR(_close_wait(true)); + { + auto st = _close_wait(true); + if (!st.ok()) { + LOG(WARNING) << "close_wait failed: " << st; + } + } // calculate and submit commit info if (is_last_sink) { @@ -640,6 +656,7 @@ Status VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); return _load_stream_map->for_each_st( [this, incremental](int64_t dst_id, const Streams& streams) -> Status { + Status status = Status::OK(); for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; @@ -651,9 +668,12 @@ Status VTabletWriterV2::_close_wait(bool incremental) { << print_id(_load_id); return Status::TimedOut("load timed out before close waiting"); } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + auto st = stream->close_wait(_state, remain_ms); + if (!st.ok() && status.ok()) { + status = st; + } } - return Status::OK(); + return status; }); } @@ -692,6 +712,9 @@ Status VTabletWriterV2::_create_commit_info(std::vector& tabl load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { std::unordered_set known_tablets; for (const auto& stream : streams) { + LOG(INFO) << "stream " << stream->stream_id() + << " success tablets: " << stream->success_tablets().size() + << ", failed tablets: " << stream->failed_tablets().size(); for (auto [tablet_id, reason] : stream->failed_tablets()) { if (known_tablets.contains(tablet_id)) { continue; @@ -717,7 +740,8 @@ Status VTabletWriterV2::_create_commit_info(std::vector& tabl if (replicas > (num_replicas - 1) / 2) { LOG(INFO) << "tablet " << tablet_id << " failed on majority backends: " << failed_reason[tablet_id]; - return failed_reason.at(tablet_id); + return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id, + failed_reason[tablet_id]); } } return Status::OK();