Skip to content

Commit

Permalink
[fix](move-memtable) multi replica tables should tolerate minority fa…
Browse files Browse the repository at this point in the history
…ilures (apache#38003)

Load job for multi replica tables shouldn't fail immediately on any
single replica errors.
Errors should be recorded and reported for individual replica of
tablets, and checked on commit info.
  • Loading branch information
kaijchen committed Sep 6, 2024
1 parent cb0613e commit b0328b4
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 110 deletions.
14 changes: 12 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,19 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite

Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) {
bool ok = false;
for (const auto& stream : _streams) {
RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id,
_context.tablet_id, segment_id, segstat, flush_schema));
auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id,
segment_id, segstat, flush_schema);
if (!st.ok()) {
LOG(WARNING) << "failed to add segment " << segment_id << " to stream "
<< stream->stream_id();
}
ok = ok || st.ok();
}
if (!ok) {
return Status::InternalError("failed to add segment {} of tablet {} to any replicas",
segment_id, _context.tablet_id);
}
return Status::OK();
}
Expand Down
113 changes: 67 additions & 46 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
_failed_st = std::make_shared<Status>();
_status = Status::OK();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
Expand All @@ -71,7 +71,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,

inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id
<< ", tablet_id=" << tablet_stream._id << ", status=" << *tablet_stream._failed_st;
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status;
return ostr;
}

Expand All @@ -88,17 +88,20 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
};

_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
auto st = _load_stream_writer->init();
if (!st.ok()) {
_failed_st = std::make_shared<Status>(st);
DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
_status = Status::Uninitialized("fault injection");
return _status;
});
_status = _load_stream_writer->init();
if (!_status.ok()) {
LOG(INFO) << "failed to init rowset builder due to " << *this;
}
return st;
return _status;
}

Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
if (!_failed_st->ok()) {
return *_failed_st;
if (!_status.ok()) {
return _status;
}

// dispatch add_segment request
Expand Down Expand Up @@ -147,9 +150,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
}
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
LOG(INFO) << "write data failed " << *this;
if (!st.ok() && _status.ok()) {
_status = st;
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
Expand All @@ -164,21 +167,30 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
timer.start();
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
return Status::Error<true>(
_status = Status::Error<true>(
"wait flush token back pressure time is more than "
"load_stream_max_wait_flush_token_time {}",
load_stream_max_wait_flush_token_time_ms);
return _status;
}
bthread_usleep(2 * 1000); // 2ms
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
return flush_token->submit_func(flush_func);
auto st = flush_token->submit_func(flush_func);
if (!st.ok()) {
_status = st;
}
return _status;
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_add_segment_timer);
DCHECK(header.has_segment_statistics());
SegmentStatistics stat(header.segment_statistics());
Expand All @@ -195,15 +207,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
return Status::InternalError(
_status = Status::InternalError(
"add segment failed, no segment written by this src be yet, src_id={}, "
"segment_id={}",
src_id, segid);
return _status;
}
if (segid >= _segids_mapping[src_id]->size()) {
return Status::InternalError(
_status = Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
src_id, segid);
return _status;
}
new_segid = _segids_mapping[src_id]->at(segid);
}
Expand All @@ -212,16 +226,24 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
if (!st.ok() && _status.ok()) {
_status = st;
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
return flush_token->submit_func(add_segment_func);
auto st = flush_token->submit_func(add_segment_func);
if (!st.ok()) {
_status = st;
}
return _status;
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_close_wait_timer);
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
Expand All @@ -238,34 +260,35 @@ Status TabletStream::close() {
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
}

if (!_failed_st->ok()) {
return *_failed_st;
}
if (_next_segid.load() != _num_segments) {
return Status::Corruption(
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
}

Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
st = _load_stream_writer->close();
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
return st;
return _status;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
Expand All @@ -289,7 +312,7 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data)
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id()));
_init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
} else {
tablet_stream = it->second;
}
Expand All @@ -298,17 +321,19 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data)
return tablet_stream->append_data(header, data);
}

Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
return Status::OK();
auto st = tablet_stream->init(_schema, _id, partition_id);
if (!st.ok()) {
LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
}
}

Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
Expand All @@ -319,8 +344,7 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
Expand All @@ -336,7 +360,6 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
return Status::OK();
}

// TODO: Profile is temporary disabled, because:
Expand Down Expand Up @@ -389,8 +412,8 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
return Status::OK();
}

Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);

Expand All @@ -408,16 +431,14 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t

if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return Status::OK();
return;
}

for (auto& [_, index_stream] : _index_streams_map) {
RETURN_IF_ERROR(
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets));
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
}
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
return Status::OK();
}

void LoadStream::_report_result(StreamId stream, const Status& status,
Expand Down Expand Up @@ -602,8 +623,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, st, success_tablet_ids, failed_tablets, true);
close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
} break;
case PStreamHeader::GET_SCHEMA: {
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TabletStream {
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bthread::Mutex _lock;
std::shared_ptr<Status> _failed_st;
Status _status;
PUniqueId _load_id;
int64_t _txn_id;
RuntimeProfile* _profile = nullptr;
Expand All @@ -86,12 +86,12 @@ class IndexStream {

Status append_data(const PStreamHeader& header, butil::IOBuf* data);

Status close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
void close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);
void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);

private:
int64_t _id;
Expand Down Expand Up @@ -124,8 +124,8 @@ class LoadStream : public brpc::StreamInputHandler {
}
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

// callbacks called by brpc
int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("close_segment failed, LoadStreamWriter is not inited");
}
Expand Down Expand Up @@ -168,7 +167,6 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
}
Expand Down
Loading

0 comments on commit b0328b4

Please sign in to comment.