diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index aa1749caace3e6..2472d422c41522 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -145,11 +145,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data uint32_t new_segid = mapping->at(segid); DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); - auto flush_func = [this, new_segid, eos, buf, header, file_type]() { + auto flush_func = [this, new_segid, eos, buf, header, &file_type]() { signal::set_signal_task_id(_load_id); g_load_stream_flush_running_threads << -1; auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); if (eos && st.ok()) { + DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", { + file_type = static_cast(-1); + }); if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { st = _load_stream_writer->close_writer(new_segid, file_type); } else { @@ -159,6 +162,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data file_type, new_segid); } } + DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", + { st = Status::InternalError("fault injection"); }); if (!st.ok() && _status.ok()) { _status = st; LOG(WARNING) << "write data failed " << st << ", " << *this; @@ -189,6 +194,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data g_load_stream_flush_wait_ms << time_ms; g_load_stream_flush_running_threads << 1; auto st = flush_token->submit_func(flush_func); + DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", { + st = Status::InternalError("fault injection"); + }); if (!st.ok()) { _status = st; } @@ -222,6 +230,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data src_id, segid); return _status; } + DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", + { segid = _segids_mapping[src_id]->size(); }); if (segid >= _segids_mapping[src_id]->size()) { _status = Status::InternalError( "add segment failed, segment is never written, src_id={}, segment_id={}", @@ -235,6 +245,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data auto add_segment_func = [this, new_segid, stat, flush_schema]() { signal::set_signal_task_id(_load_id); auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); + DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", + { st = Status::InternalError("fault injection"); }); if (!st.ok() && _status.ok()) { _status = st; LOG(INFO) << "add segment failed " << *this; @@ -242,6 +254,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; auto st = flush_token->submit_func(add_segment_func); + DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", + { st = Status::InternalError("fault injection"); }); if (!st.ok()) { _status = st; } @@ -274,6 +288,8 @@ Status TabletStream::close() { return _status; } + DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", + { _num_segments++; }); if (_next_segid.load() != _num_segments) { _status = Status::Corruption( "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, @@ -519,7 +535,12 @@ void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) { for (;;) { - int ret = brpc::StreamWrite(stream, buf); + int ret = 0; + DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", + { ret = EAGAIN; }); + if (ret == 0) { + ret = brpc::StreamWrite(stream, buf); + } switch (ret) { case 0: return Status::OK(); diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index ca78311b8ea250..f70434bb8a4905 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -90,6 +90,9 @@ LoadStreamWriter::~LoadStreamWriter() { } Status LoadStreamWriter::init() { + DBUG_EXECUTE_IF("LoadStreamWriter.init.failure", { + return Status::InternalError("fault injection"); + }); RETURN_IF_ERROR(_rowset_builder->init()); _rowset_writer = _rowset_builder->rowset_writer(); _is_init = true; diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index 2cace455c167ff..0f6a264885dbba 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -153,6 +153,8 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("FileWriter.close_writer.zero_bytes_appended", "") // LoadStreamWriter close_writer/add_segment meet not inited error load_with_injection("TabletStream.init.uninited_writer", "") + // LoadStreamWriter init failure + load_with_injection("LoadStreamWriter.init.failure", "") // LoadStreamWriter add_segment meet not bad segid error load_with_injection("LoadStreamWriter.add_segment.bad_segid", "") // LoadStreamWriter add_segment meet null file writer error @@ -163,8 +165,22 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "") // LoadStream add_segment meet unknown segid in request header load_with_injection("TabletStream.add_segment.unknown_segid", "") + // LoadStream add_segment meet unknown segid in request header + load_with_injection("TabletStream.add_segment.segid_never_written", "") + // LoadStream add_segment meet LoadStreamWriter add segment failure + load_with_injection("TabletStream.add_segment.add_segment_failed", "") + // LoadStream add_segment meet submit function failure + load_with_injection("TabletStream.add_segment.submit_func_failed", "") + // LoadStream append_data meet unknown file type + load_with_injection("TabletStream.append_data.unknown_file_type", "") + // LoadStream append_data meet LoadStremWriter append or close failed + load_with_injection("TabletStream.append_data.append_failed", "") + // LoadStream append_data meet submit function failure + load_with_injection("TabletStream.append_data.submit_func_failed", "") // LoadStream append_data meet unknown index id in request header load_with_injection("TabletStream._append_data.unknown_indexid", "") + // LoadStream close meet segment num mismatch + load_with_injection("TabletStream.close.segment_num_mismatch", "") // LoadStream dispatch meet unknown load id load_with_injection("LoadStream._dispatch.unknown_loadid", "") // LoadStream dispatch meet unknown src id