From 5838337dd0db66b6ebcaae15e348dd1bd4076e2d Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 9 Aug 2024 12:12:20 -0700 Subject: [PATCH] storage: convert some vasserts to exceptions We've seen at least one instance where empty batches are pushed to the log. Rather than bringing down the process, this updates a couple vasserts to throw instead (converted a couple that seemed related to data inputs, and left in a vassert that seems much more problematic related to being on the correct shard). --- src/v/raft/tests/basic_raft_fixture_test.cc | 12 ++++++-- src/v/storage/segment.cc | 32 ++++++++++++--------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 69cc91c77457..475b587a9b66 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -74,8 +74,16 @@ TEST_F(raft_fixture, test_empty_writes) { auto reader = model::make_memory_record_batch_reader( std::move(builder).build()); - EXPECT_DEATH( - replicate(std::move(reader)).get(), "Assert failure.+Empty batch"); + // Catch the error when appending. + auto res = replicate(std::move(reader)).get(); + ASSERT_TRUE(res.has_error()); + ASSERT_EQ(res.error(), errc::leader_append_failed); + + // In this case there are no batches at all so we don't go to storage, and + // catch the error in Raft. + res = replicate(make_batches({})).get(); + ASSERT_TRUE(res.has_error()); + ASSERT_EQ(res.error(), errc::invalid_input_records); } TEST_F_CORO(raft_fixture, test_stuck_append_entries) { diff --git a/src/v/storage/segment.cc b/src/v/storage/segment.cc index 38c144902bed..274b35452e78 100644 --- a/src/v/storage/segment.cc +++ b/src/v/storage/segment.cc @@ -503,25 +503,29 @@ ss::future<> segment::compaction_index_batch(const model::record_batch& b) { ss::future segment::do_append(const model::record_batch& b) { check_segment_not_closed("append()"); - vassert( - b.base_offset() <= b.last_offset(), - "Empty batch written to {}. Batch header: {}", - path(), - b.header()); - vassert( - b.base_offset() >= _tracker.get_base_offset(), - "Invalid state. Attempted to append a batch with base_offset:{}, but " - "would invalidate our initial state base offset of:{}. Actual batch " - "header:{}, self:{}", - b.base_offset(), - _tracker.get_base_offset(), - b.header(), - *this); vassert( b.header().ctx.owner_shard, "Shard not set when writing to: {} - header: {}", *this, b.header()); + if (unlikely(b.base_offset() > b.last_offset())) { + return ss::make_exception_future( + std::runtime_error(fmt::format( + "Empty batch written to {}. Batch header: {}", + path(), + b.header()))); + } + if (unlikely(b.base_offset() < _tracker.get_base_offset())) { + return ss::make_exception_future< + append_result>(std::runtime_error(fmt::format( + "Invalid state. Attempted to append a batch with base_offset:{}, but " + "would invalidate our initial state base offset of:{}. Actual batch " + "header:{}, self:{}", + b.base_offset(), + _tracker.get_base_offset(), + b.header(), + *this))); + } if (unlikely(b.compressed() && !b.header().attrs.is_valid_compression())) { return ss::make_exception_future< append_result>(std::runtime_error(fmt::format(