Skip to content

Commit

Permalink
[test](move-memtable) add more injection test for load stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Aug 15, 2024
1 parent 661fe4a commit bb24dd9
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
25 changes: 23 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header, file_type]() {
auto flush_func = [this, new_segid, eos, buf, header, &file_type]() {
signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
if (eos && st.ok()) {
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", {
file_type = static_cast<FileType>(-1);
});
if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
st = _load_stream_writer->close_writer(new_segid, file_type);
} else {
Expand All @@ -159,6 +162,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
file_type, new_segid);
}
}
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok() && _status.ok()) {
_status = st;
LOG(WARNING) << "write data failed " << st << ", " << *this;
Expand Down Expand Up @@ -189,6 +194,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
auto st = flush_token->submit_func(flush_func);
DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", {
st = Status::InternalError("fault injection");
});
if (!st.ok()) {
_status = st;
}
Expand Down Expand Up @@ -222,6 +230,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
src_id, segid);
return _status;
}
DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
{ segid = _segids_mapping[src_id]->size(); });
if (segid >= _segids_mapping[src_id]->size()) {
_status = Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
Expand All @@ -235,13 +245,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok() && _status.ok()) {
_status = st;
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
auto st = flush_token->submit_func(add_segment_func);
DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_status = st;
}
Expand Down Expand Up @@ -274,6 +288,8 @@ Status TabletStream::close() {
return _status;
}

DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch",
{ _num_segments++; });
if (_next_segid.load() != _num_segments) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
Expand Down Expand Up @@ -519,7 +535,12 @@ void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {

Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
for (;;) {
int ret = brpc::StreamWrite(stream, buf);
int ret = 0;
DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN",
{ ret = EAGAIN; });
if (ret == 0) {
ret = brpc::StreamWrite(stream, buf);
}
switch (ret) {
case 0:
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ LoadStreamWriter::~LoadStreamWriter() {
}

Status LoadStreamWriter::init() {
DBUG_EXECUTE_IF("LoadStreamWriter.init.failure", {
return Status::InternalError("fault injection");
});
RETURN_IF_ERROR(_rowset_builder->init());
_rowset_writer = _rowset_builder->rowset_writer();
_is_init = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("FileWriter.close_writer.zero_bytes_appended", "")
// LoadStreamWriter close_writer/add_segment meet not inited error
load_with_injection("TabletStream.init.uninited_writer", "")
// LoadStreamWriter init failure
load_with_injection("LoadStreamWriter.init.failure", "")
// LoadStreamWriter add_segment meet not bad segid error
load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
// LoadStreamWriter add_segment meet null file writer error
Expand All @@ -163,8 +165,22 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "")
// LoadStream add_segment meet unknown segid in request header
load_with_injection("TabletStream.add_segment.unknown_segid", "")
// LoadStream add_segment meet unknown segid in request header
load_with_injection("TabletStream.add_segment.segid_never_written", "")
// LoadStream add_segment meet LoadStreamWriter add segment failure
load_with_injection("TabletStream.add_segment.add_segment_failed", "")
// LoadStream add_segment meet submit function failure
load_with_injection("TabletStream.add_segment.submit_func_failed", "")
// LoadStream append_data meet unknown file type
load_with_injection("TabletStream.append_data.unknown_file_type", "")
// LoadStream append_data meet LoadStremWriter append or close failed
load_with_injection("TabletStream.append_data.append_failed", "")
// LoadStream append_data meet submit function failure
load_with_injection("TabletStream.append_data.submit_func_failed", "")
// LoadStream append_data meet unknown index id in request header
load_with_injection("TabletStream._append_data.unknown_indexid", "")
// LoadStream close meet segment num mismatch
load_with_injection("TabletStream.close.segment_num_mismatch", "")
// LoadStream dispatch meet unknown load id
load_with_injection("LoadStream._dispatch.unknown_loadid", "")
// LoadStream dispatch meet unknown src id
Expand Down

0 comments on commit bb24dd9

Please sign in to comment.