Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](move-memtable) multi replica tables should tolerate minority failures #38003

Merged
merged 6 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,19 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite

Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) {
bool ok = false;
for (const auto& stream : _streams) {
RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id,
_context.tablet_id, segment_id, segstat, flush_schema));
auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id,
segment_id, segstat, flush_schema);
if (!st.ok()) {
LOG(WARNING) << "failed to add segment " << segment_id << " to stream "
<< stream->stream_id();
}
ok = ok || st.ok();
}
if (!ok) {
return Status::InternalError("failed to add segment {} of tablet {} to any replicas",
segment_id, _context.tablet_id);
}
return Status::OK();
}
Expand Down
113 changes: 67 additions & 46 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
_failed_st = std::make_shared<Status>();
_status = Status::OK();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
Expand All @@ -71,7 +71,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,

inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id
<< ", tablet_id=" << tablet_stream._id << ", status=" << *tablet_stream._failed_st;
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status;
return ostr;
}

Expand All @@ -89,17 +89,20 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
};

_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
auto st = _load_stream_writer->init();
if (!st.ok()) {
_failed_st = std::make_shared<Status>(st);
DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
_status = Status::Uninitialized("fault injection");
return _status;
});
_status = _load_stream_writer->init();
if (!_status.ok()) {
LOG(INFO) << "failed to init rowset builder due to " << *this;
}
return st;
return _status;
}

Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
if (!_failed_st->ok()) {
return *_failed_st;
if (!_status.ok()) {
return _status;
}

// dispatch add_segment request
Expand Down Expand Up @@ -156,9 +159,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
file_type, new_segid);
}
}
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
LOG(INFO) << "write data failed " << *this;
if (!st.ok() && _status.ok()) {
_status = st;
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
Expand All @@ -173,21 +176,30 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
timer.start();
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
return Status::Error<true>(
_status = Status::Error<true>(
"wait flush token back pressure time is more than "
"load_stream_max_wait_flush_token_time {}",
load_stream_max_wait_flush_token_time_ms);
return _status;
}
bthread_usleep(2 * 1000); // 2ms
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
return flush_token->submit_func(flush_func);
auto st = flush_token->submit_func(flush_func);
if (!st.ok()) {
_status = st;
}
return _status;
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_add_segment_timer);
DCHECK(header.has_segment_statistics());
SegmentStatistics stat(header.segment_statistics());
Expand All @@ -204,15 +216,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
return Status::InternalError(
_status = Status::InternalError(
"add segment failed, no segment written by this src be yet, src_id={}, "
"segment_id={}",
src_id, segid);
return _status;
}
if (segid >= _segids_mapping[src_id]->size()) {
return Status::InternalError(
_status = Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
src_id, segid);
return _status;
}
new_segid = _segids_mapping[src_id]->at(segid);
}
Expand All @@ -221,16 +235,24 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
if (!st.ok() && _status.ok()) {
_status = st;
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
return flush_token->submit_func(add_segment_func);
auto st = flush_token->submit_func(add_segment_func);
if (!st.ok()) {
_status = st;
}
return _status;
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_close_wait_timer);
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
Expand All @@ -247,34 +269,35 @@ Status TabletStream::close() {
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
}

if (!_failed_st->ok()) {
return *_failed_st;
}
if (_next_segid.load() != _num_segments) {
return Status::Corruption(
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
}

Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
st = _load_stream_writer->close();
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
return st;
return _status;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
Expand All @@ -298,7 +321,7 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data)
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id()));
_init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
} else {
tablet_stream = it->second;
}
Expand All @@ -307,17 +330,19 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data)
return tablet_stream->append_data(header, data);
}

Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
return Status::OK();
auto st = tablet_stream->init(_schema, _id, partition_id);
if (!st.ok()) {
LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
}
}

Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
Expand All @@ -328,8 +353,7 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
Expand All @@ -345,7 +369,6 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
return Status::OK();
}

// TODO: Profile is temporary disabled, because:
Expand Down Expand Up @@ -398,8 +421,8 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
return Status::OK();
}

Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);

Expand All @@ -417,16 +440,14 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t

if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return Status::OK();
return;
}

for (auto& [_, index_stream] : _index_streams_map) {
RETURN_IF_ERROR(
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets));
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
}
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
return Status::OK();
}

void LoadStream::_report_result(StreamId stream, const Status& status,
Expand Down Expand Up @@ -612,8 +633,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, st, success_tablet_ids, failed_tablets, true);
close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
} break;
case PStreamHeader::GET_SCHEMA: {
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TabletStream {
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bthread::Mutex _lock;
std::shared_ptr<Status> _failed_st;
Status _status;
PUniqueId _load_id;
int64_t _txn_id;
RuntimeProfile* _profile = nullptr;
Expand All @@ -86,12 +86,12 @@ class IndexStream {

Status append_data(const PStreamHeader& header, butil::IOBuf* data);

Status close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
void close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);
void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);

private:
int64_t _id;
Expand Down Expand Up @@ -124,8 +124,8 @@ class LoadStream : public brpc::StreamInputHandler {
}
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

// callbacks called by brpc
int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
}
Expand Down Expand Up @@ -183,7 +182,6 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
size_t inverted_file_size = 0;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
}
Expand Down
Loading
Loading