Skip to content

Commit

Permalink
storage: option to fail reader in deduplication
Browse files Browse the repository at this point in the history
An upcoming test will inject an exception to simulate a failure case
seen in the wild that is otherwise hard to reproduce.

I'd considered using the file_sanitizer-based error injection, but it
seems that infra only really shines at making Redpanda hit asserts, so
it's not quite what I'm looking for (my upcoming test will assert that
we _don't_ crash in a particular error scenario).
  • Loading branch information
andrwng committed Mar 7, 2024
1 parent 62fd4f8 commit 13f5537
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ ss::future<ss::stop_iteration> copy_data_segment_reducer::filter_and_append(

ss::future<ss::stop_iteration>
copy_data_segment_reducer::operator()(model::record_batch b) {
if (_inject_failure) {
throw std::runtime_error("injected error");
}
const auto comp = b.header().attrs.compression();
if (!b.compressed()) {
co_return co_await filter_and_append(comp, std::move(b));
Expand Down
9 changes: 7 additions & 2 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ class copy_data_segment_reducer : public compaction_reducer {
bool internal_topic,
offset_delta_time apply_offset,
model::offset segment_last_offset,
compacted_index_writer* cidx = nullptr)
compacted_index_writer* cidx = nullptr,
bool inject_failure = false)
: _should_keep_fn(std::move(f))
, _segment_last_offset(segment_last_offset)
, _appender(a)
, _compacted_idx(cidx)
, _idx(index_state::make_empty_index(apply_offset))
, _internal_topic(internal_topic) {}
, _internal_topic(internal_topic)
, _inject_failure(inject_failure) {}

ss::future<ss::stop_iteration> operator()(model::record_batch);
storage::index_state end_of_stream() { return std::move(_idx); }
Expand Down Expand Up @@ -167,6 +169,9 @@ class copy_data_segment_reducer : public compaction_reducer {
/// We need to know if this is an internal topic to inform whether to
/// index on non-raft-data batches
bool _internal_topic;

/// If set to true, will throw an exception on operator().
bool _inject_failure;
};

class index_rebuilder_reducer : public compaction_reducer {
Expand Down
6 changes: 4 additions & 2 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ ss::future<index_state> deduplicate_segment(
compacted_index_writer& cmp_idx_writer,
probe& probe,
offset_delta_time should_offset_delta_times,
ss::sharded<features::feature_table>& feature_table) {
ss::sharded<features::feature_table>& feature_table,
bool inject_reader_failure) {
auto read_holder = co_await seg->read_lock();
if (seg->is_closed()) {
throw segment_closed_exception();
Expand Down Expand Up @@ -223,7 +224,8 @@ ss::future<index_state> deduplicate_segment(
seg->path().is_internal_topic(),
should_offset_delta_times,
seg->offsets().committed_offset,
&cmp_idx_writer);
&cmp_idx_writer,
inject_reader_failure);

auto new_idx = co_await rdr.consume(
std::move(copy_reducer), model::no_timeout);
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/segment_deduplication_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ ss::future<index_state> deduplicate_segment(
compacted_index_writer& cmp_idx_writer,
storage::probe& probe,
offset_delta_time should_offset_delta_times,
ss::sharded<features::feature_table>&);
ss::sharded<features::feature_table>&,
bool inject_reader_failure = false);

} // namespace storage

0 comments on commit 13f5537

Please sign in to comment.