Skip to content

Commit

Permalink
[fix](move-memtable) multi replica tables should tolerate minority fa…
Browse files Browse the repository at this point in the history
…ilures
  • Loading branch information
kaijchen committed Jul 19, 2024
1 parent 40b3d58 commit 762616a
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 89 deletions.
14 changes: 12 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,19 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite

Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) {
bool ok = false;
for (const auto& stream : _streams) {
RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id,
_context.tablet_id, segment_id, segstat, flush_schema));
auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id,
segment_id, segstat, flush_schema);
if (!st.ok()) {
LOG(WARNING) << "failed to add segment " << segment_id << " to stream "
<< stream->stream_id();
}
ok = ok || st.ok();
}
if (!ok) {
return Status::InternalError("failed to add segment {} of tablet {} to any replicas",
segment_id, _context.tablet_id);
}
return Status::OK();
}
Expand Down
109 changes: 63 additions & 46 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
_failed_st = std::make_shared<Status>();
_status = Status::OK();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
Expand All @@ -70,7 +70,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,

inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id
<< ", tablet_id=" << tablet_stream._id << ", status=" << *tablet_stream._failed_st;
<< ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status;
return ostr;
}

Expand All @@ -88,17 +88,16 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
};

_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
auto st = _load_stream_writer->init();
if (!st.ok()) {
_failed_st = std::make_shared<Status>(st);
_status = _load_stream_writer->init();
if (!_status.ok()) {
LOG(INFO) << "failed to init rowset builder due to " << *this;
}
return st;
return _status;
}

Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
if (!_failed_st->ok()) {
return *_failed_st;
if (!_status.ok()) {
return _status;
}

// dispatch add_segment request
Expand Down Expand Up @@ -155,9 +154,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
file_type, new_segid);
}
}
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
LOG(INFO) << "write data failed " << *this;
if (!st.ok() && _status.ok()) {
_status = st;
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
Expand All @@ -172,21 +171,30 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
timer.start();
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
return Status::Error<true>(
_status = Status::Error<true>(
"wait flush token back pressure time is more than "
"load_stream_max_wait_flush_token_time {}",
load_stream_max_wait_flush_token_time_ms);
return _status;
}
bthread_usleep(2 * 1000); // 2ms
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
return flush_token->submit_func(flush_func);
auto st = flush_token->submit_func(flush_func);
if (!st.ok()) {
_status = st;
}
return _status;
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_add_segment_timer);
DCHECK(header.has_segment_statistics());
SegmentStatistics stat(header.segment_statistics());
Expand All @@ -203,15 +211,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
return Status::InternalError(
_status = Status::InternalError(
"add segment failed, no segment written by this src be yet, src_id={}, "
"segment_id={}",
src_id, segid);
return _status;
}
if (segid >= _segids_mapping[src_id]->size()) {
return Status::InternalError(
_status = Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
src_id, segid);
return _status;
}
new_segid = _segids_mapping[src_id]->at(segid);
}
Expand All @@ -220,16 +230,24 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
if (!st.ok() && _status.ok()) {
_status = st;
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
return flush_token->submit_func(add_segment_func);
auto st = flush_token->submit_func(add_segment_func);
if (!st.ok()) {
_status = st;
}
return _status;
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_close_wait_timer);
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
Expand All @@ -246,34 +264,35 @@ Status TabletStream::close() {
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
}

if (!_failed_st->ok()) {
return *_failed_st;
}
if (_next_segid.load() != _num_segments) {
return Status::Corruption(
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
}

Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
st = _load_stream_writer->close();
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
return st;
return _status;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
Expand All @@ -297,7 +316,7 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data)
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id()));
_init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
} else {
tablet_stream = it->second;
}
Expand All @@ -306,17 +325,19 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data)
return tablet_stream->append_data(header, data);
}

Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
return Status::OK();
auto st = tablet_stream->init(_schema, _id, partition_id);
if (!st.ok()) {
LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
}
}

Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
Expand All @@ -327,8 +348,7 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
Expand All @@ -344,7 +364,6 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
return Status::OK();
}

// TODO: Profile is temporary disabled, because:
Expand Down Expand Up @@ -396,8 +415,8 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
return Status::OK();
}

Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);

Expand All @@ -415,16 +434,14 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t

if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return Status::OK();
return;
}

for (auto& [_, index_stream] : _index_streams_map) {
RETURN_IF_ERROR(
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets));
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
}
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
return Status::OK();
}

void LoadStream::_report_result(StreamId stream, const Status& status,
Expand Down Expand Up @@ -610,8 +627,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, st, success_tablet_ids, failed_tablets, true);
close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
} break;
case PStreamHeader::GET_SCHEMA: {
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TabletStream {
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bthread::Mutex _lock;
std::shared_ptr<Status> _failed_st;
Status _status;
PUniqueId _load_id;
int64_t _txn_id;
RuntimeProfile* _profile = nullptr;
Expand All @@ -86,12 +86,12 @@ class IndexStream {

Status append_data(const PStreamHeader& header, butil::IOBuf* data);

Status close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
void close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);
void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);

private:
int64_t _id;
Expand Down Expand Up @@ -124,8 +124,8 @@ class LoadStream : public brpc::StreamInputHandler {
}
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

// callbacks called by brpc
int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
Expand Down
21 changes: 16 additions & 5 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)>
std::lock_guard<std::mutex> lock(_mutex);
snapshot = _streams_for_node;
}
Status status = Status::OK();
for (auto& [dst_id, streams] : snapshot) {
RETURN_IF_ERROR(fn(dst_id, *streams));
auto st = fn(dst_id, *streams);
if (!st.ok() && status.ok()) {
status = st;
}
}
return Status::OK();
return status;
}

void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
Expand Down Expand Up @@ -112,19 +116,26 @@ Status LoadStreamMap::close_load(bool incremental) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
Status status = Status::OK();
bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
if (first) {
RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
auto st = stream->close_load(tablets_to_commit);
if (!st.ok() && status.ok()) {
status = st;
}
first = false;
} else {
RETURN_IF_ERROR(stream->close_load({}));
auto st = stream->close_load({});
if (!st.ok() && status.ok()) {
status = st;
}
}
}
return Status::OK();
return status;
});
}

Expand Down
Loading

0 comments on commit 762616a

Please sign in to comment.