From 13f55371a6d23e64cf0729614c5eb715bcd3a49c Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 6 Mar 2024 23:42:52 -0800 Subject: [PATCH] storage: option to fail reader in deduplication An upcoming test will inject an exception to simulate a failure case seen in the wild that is otherwise hard to reproduce. I'd considered using the file_sanitizer-based error injection, but it seems that infra only really shines at making Redpanda hit asserts, so it's not quite what I'm looking for (my upcoming test will assert that we _don't_ crash in a particular error scenario). --- src/v/storage/compaction_reducers.cc | 3 +++ src/v/storage/compaction_reducers.h | 9 +++++++-- src/v/storage/segment_deduplication_utils.cc | 6 ++++-- src/v/storage/segment_deduplication_utils.h | 3 ++- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/v/storage/compaction_reducers.cc b/src/v/storage/compaction_reducers.cc index ce912e27a4591..516f75f036799 100644 --- a/src/v/storage/compaction_reducers.cc +++ b/src/v/storage/compaction_reducers.cc @@ -325,6 +325,9 @@ ss::future copy_data_segment_reducer::filter_and_append( ss::future copy_data_segment_reducer::operator()(model::record_batch b) { + if (_inject_failure) { + throw std::runtime_error("injected error"); + } const auto comp = b.header().attrs.compression(); if (!b.compressed()) { co_return co_await filter_and_append(comp, std::move(b)); diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index 55e64b4c19702..226565181dc61 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -125,13 +125,15 @@ class copy_data_segment_reducer : public compaction_reducer { bool internal_topic, offset_delta_time apply_offset, model::offset segment_last_offset, - compacted_index_writer* cidx = nullptr) + compacted_index_writer* cidx = nullptr, + bool inject_failure = false) : _should_keep_fn(std::move(f)) , _segment_last_offset(segment_last_offset) , _appender(a) , _compacted_idx(cidx) , _idx(index_state::make_empty_index(apply_offset)) - , _internal_topic(internal_topic) {} + , _internal_topic(internal_topic) + , _inject_failure(inject_failure) {} ss::future operator()(model::record_batch); storage::index_state end_of_stream() { return std::move(_idx); } @@ -167,6 +169,9 @@ class copy_data_segment_reducer : public compaction_reducer { /// We need to know if this is an internal topic to inform whether to /// index on non-raft-data batches bool _internal_topic; + + /// If set to true, will throw an exception on operator(). + bool _inject_failure; }; class index_rebuilder_reducer : public compaction_reducer { diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index 88cb69d8b492e..038de5ffa2274 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -187,7 +187,8 @@ ss::future deduplicate_segment( compacted_index_writer& cmp_idx_writer, probe& probe, offset_delta_time should_offset_delta_times, - ss::sharded& feature_table) { + ss::sharded& feature_table, + bool inject_reader_failure) { auto read_holder = co_await seg->read_lock(); if (seg->is_closed()) { throw segment_closed_exception(); @@ -223,7 +224,8 @@ ss::future deduplicate_segment( seg->path().is_internal_topic(), should_offset_delta_times, seg->offsets().committed_offset, - &cmp_idx_writer); + &cmp_idx_writer, + inject_reader_failure); auto new_idx = co_await rdr.consume( std::move(copy_reducer), model::no_timeout); diff --git a/src/v/storage/segment_deduplication_utils.h b/src/v/storage/segment_deduplication_utils.h index 3bc8df66e0689..45d982457d786 100644 --- a/src/v/storage/segment_deduplication_utils.h +++ b/src/v/storage/segment_deduplication_utils.h @@ -57,6 +57,7 @@ ss::future deduplicate_segment( compacted_index_writer& cmp_idx_writer, storage::probe& probe, offset_delta_time should_offset_delta_times, - ss::sharded&); + ss::sharded&, + bool inject_reader_failure = false); } // namespace storage