Skip to content

Commit

Permalink
storage: convert some vasserts to exceptions
Browse files Browse the repository at this point in the history
We've seen at least one instance where empty batches are pushed to the
log. Rather than bringing down the process, this updates a couple
vasserts to throw instead (converted a couple that seemed related to
data inputs, and left in a vassert that seems much more problematic
related to being on the correct shard).
  • Loading branch information
andrwng committed Aug 9, 2024
1 parent 2c24041 commit 5838337
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
12 changes: 10 additions & 2 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,16 @@ TEST_F(raft_fixture, test_empty_writes) {
auto reader = model::make_memory_record_batch_reader(
std::move(builder).build());

EXPECT_DEATH(
replicate(std::move(reader)).get(), "Assert failure.+Empty batch");
// Catch the error when appending.
auto res = replicate(std::move(reader)).get();
ASSERT_TRUE(res.has_error());
ASSERT_EQ(res.error(), errc::leader_append_failed);

// In this case there are no batches at all so we don't go to storage, and
// catch the error in Raft.
res = replicate(make_batches({})).get();
ASSERT_TRUE(res.has_error());
ASSERT_EQ(res.error(), errc::invalid_input_records);
}

TEST_F_CORO(raft_fixture, test_stuck_append_entries) {
Expand Down
32 changes: 18 additions & 14 deletions src/v/storage/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,25 +503,29 @@ ss::future<> segment::compaction_index_batch(const model::record_batch& b) {

ss::future<append_result> segment::do_append(const model::record_batch& b) {
check_segment_not_closed("append()");
vassert(
b.base_offset() <= b.last_offset(),
"Empty batch written to {}. Batch header: {}",
path(),
b.header());
vassert(
b.base_offset() >= _tracker.get_base_offset(),
"Invalid state. Attempted to append a batch with base_offset:{}, but "
"would invalidate our initial state base offset of:{}. Actual batch "
"header:{}, self:{}",
b.base_offset(),
_tracker.get_base_offset(),
b.header(),
*this);
vassert(
b.header().ctx.owner_shard,
"Shard not set when writing to: {} - header: {}",
*this,
b.header());
if (unlikely(b.base_offset() > b.last_offset())) {
return ss::make_exception_future<append_result>(
std::runtime_error(fmt::format(
"Empty batch written to {}. Batch header: {}",
path(),
b.header())));
}
if (unlikely(b.base_offset() < _tracker.get_base_offset())) {
return ss::make_exception_future<
append_result>(std::runtime_error(fmt::format(
"Invalid state. Attempted to append a batch with base_offset:{}, but "
"would invalidate our initial state base offset of:{}. Actual batch "
"header:{}, self:{}",
b.base_offset(),
_tracker.get_base_offset(),
b.header(),
*this)));
}
if (unlikely(b.compressed() && !b.header().attrs.is_valid_compression())) {
return ss::make_exception_future<
append_result>(std::runtime_error(fmt::format(
Expand Down

0 comments on commit 5838337

Please sign in to comment.