From 379724f000d1688308b995f31bb8c28162317d0c Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 9 Dec 2024 14:37:56 -0500 Subject: [PATCH 1/4] `storage`: fix race condition in `segment::release_appender_in_background()` This function caused race conditions between `segment::close()` and a segment roll. Consider the following sequence of events: 1. `_gate.close()` called in `segment::close()` 2. `auto a = std::exchange(_appender, nullptr)` called in `segment::release_appender_in_background()` 3. `ssx::spawn_with_gate()` called in `segment::release_appender_in_background()` 4. `return ignore_shutdown_exceptions()` in `ssx::spawn_with_gate()` 5. rest of `release_appender_in_background()` is ignored 6. `a` goes out of scope in `release_appender_in_background()` without `close()`ing the `appender` 7. one sad panda Add an explicit check to `_gate.check()` in `release_appender_in_background()` to throw an exception in case the gate is closed, and defer the closing of the appender to `segment::close()` in order to avoid the potential race condition here. (cherry picked from commit edc0ab3c101ce93d0298d3dc8ac20d66c69f356a) --- src/v/storage/segment.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/v/storage/segment.cc b/src/v/storage/segment.cc index ec731353f0b89..93d37e7800367 100644 --- a/src/v/storage/segment.cc +++ b/src/v/storage/segment.cc @@ -297,6 +297,8 @@ ss::future<> segment::release_appender(readers_cache* readers_cache) { } void segment::release_appender_in_background(readers_cache* readers_cache) { + _gate.check(); + auto a = std::exchange(_appender, nullptr); auto c = config::shard_local_cfg().release_cache_on_segment_roll() ? std::exchange(_cache, std::nullopt) From af6c13ddac1c5e6f01668f6f7c8e39ef8ed72d71 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 9 Dec 2024 15:49:32 -0500 Subject: [PATCH 2/4] `storage`: use `model::next_offset()` in `segment::force_roll()` If the dirty offset is default initialized to `int64_t::min()` and not updated before a segment is force rolled, an assert will fail in `disk_log_impl::new_segment()` for the offset being < 0. Use `model::next_offset()` instead of simply adding `1` to the dirty offset to avoid this case. (cherry picked from commit 7a29f24eaf13735b96c65fe05941d9062d9a1f03) --- src/v/storage/disk_log_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 79c6c1b5a0ca7..7b19916625df6 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1748,7 +1748,7 @@ void disk_log_impl::bg_checkpoint_offset_translator() { ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) { auto roll_lock_holder = co_await _segments_rolling_lock.get_units(); auto t = term(); - auto next_offset = offsets().dirty_offset + model::offset(1); + auto next_offset = model::next_offset(offsets().dirty_offset); if (_segs.empty()) { co_return co_await new_segment(next_offset, t, iopc); } From 0f4fe9e3d7b0704f66e904f1e9610f2d6a144076 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 6 Dec 2024 20:05:02 -0500 Subject: [PATCH 3/4] `storage`: move `storage_e2e_fixture` to its own header file (cherry picked from commit c0d76d1afbccecc6762efc365d185b70feb3fdb6) --- src/v/storage/disk_log_impl.h | 2 + src/v/storage/tests/storage_e2e_fixture.h | 50 +++++++++++++++++++ .../storage/tests/storage_e2e_fixture_test.cc | 31 ++---------- 3 files changed, 57 insertions(+), 26 deletions(-) create mode 100644 src/v/storage/tests/storage_e2e_fixture.h diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 9a32445c80d8f..00b6f3ed511ee 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -31,6 +31,7 @@ #include +struct storage_e2e_fixture; namespace storage { /// \brief offset boundary type @@ -248,6 +249,7 @@ class disk_log_impl final : public log { private: friend class disk_log_appender; // for multi-term appends friend class disk_log_builder; // for tests + friend ::storage_e2e_fixture; friend std::ostream& operator<<(std::ostream& o, const disk_log_impl& d); /// Compute file offset of the batch inside the segment diff --git a/src/v/storage/tests/storage_e2e_fixture.h b/src/v/storage/tests/storage_e2e_fixture.h new file mode 100644 index 0000000000000..8f7e3352e2f94 --- /dev/null +++ b/src/v/storage/tests/storage_e2e_fixture.h @@ -0,0 +1,50 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "redpanda/tests/fixture.h" +#include "storage/disk_log_impl.h" +#include "storage/segment.h" +#include "test_utils/scoped_config.h" + +#include +#include + +struct storage_e2e_fixture : public redpanda_thread_fixture { + scoped_config test_local_cfg; + + // Produces to the given fixture's partition for 10 seconds. + ss::future<> produce_to_fixture(model::topic topic_name, int* incomplete) { + tests::kafka_produce_transport producer(co_await make_kafka_client()); + co_await producer.start(); + const int cardinality = 10; + auto now = ss::lowres_clock::now(); + while (ss::lowres_clock::now() < now + 5s) { + for (int i = 0; i < cardinality; i++) { + co_await producer.produce_to_partition( + topic_name, + model::partition_id(0), + tests::kv_t::sequence(i, 1)); + } + } + *incomplete -= 1; + } + + ss::future<> remove_segment_permanently( + storage::disk_log_impl* log, ss::lw_shared_ptr seg) { + return log->remove_segment_permanently(seg, "storage_e2e_fixture") + .then([&, log, seg]() { + auto& segs = log->segments(); + auto it = std::find(segs.begin(), segs.end(), seg); + if (it == segs.end()) { + return; + } + segs.erase(it, std::next(it)); + }); + } +}; diff --git a/src/v/storage/tests/storage_e2e_fixture_test.cc b/src/v/storage/tests/storage_e2e_fixture_test.cc index 6c3a8c38ba174..2ccc1a52400f7 100644 --- a/src/v/storage/tests/storage_e2e_fixture_test.cc +++ b/src/v/storage/tests/storage_e2e_fixture_test.cc @@ -10,10 +10,12 @@ #include "kafka/server/tests/produce_consume_utils.h" #include "model/fundamental.h" #include "random/generators.h" -#include "redpanda/tests/fixture.h" +#include "storage/disk_log_impl.h" +#include "storage/segment.h" +#include "storage/tests/storage_e2e_fixture.h" #include "test_utils/fixture.h" -#include "test_utils/scoped_config.h" +#include #include #include @@ -23,29 +25,6 @@ using namespace std::chrono_literals; -struct storage_e2e_fixture : public redpanda_thread_fixture { - scoped_config test_local_cfg; -}; - -namespace { - -// Produces to the given fixture's partition for 10 seconds. -ss::future<> produce_to_fixture( - storage_e2e_fixture* fix, model::topic topic_name, int* incomplete) { - tests::kafka_produce_transport producer(co_await fix->make_kafka_client()); - co_await producer.start(); - const int cardinality = 10; - auto now = ss::lowres_clock::now(); - while (ss::lowres_clock::now() < now + 5s) { - for (int i = 0; i < cardinality; i++) { - co_await producer.produce_to_partition( - topic_name, model::partition_id(0), tests::kv_t::sequence(i, 1)); - } - } - *incomplete -= 1; -} -} // namespace - FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) { test_local_cfg.get("log_segment_ms_min") .set_value(std::chrono::duration_cast(1ms)); @@ -69,7 +48,7 @@ FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) { produces.reserve(5); int incomplete = 5; for (int i = 0; i < 5; i++) { - auto fut = produce_to_fixture(this, topic_name, &incomplete); + auto fut = produce_to_fixture(topic_name, &incomplete); produces.emplace_back(std::move(fut)); } auto partition = app.partition_manager.local().get(ntp); From 975fe91aef2a1d9040a10fc50b54980ec2d1f407 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 6 Dec 2024 20:10:01 -0500 Subject: [PATCH 4/4] `storage`: add `test_concurrent_segment_roll_and_close` To test race conditions between `segment::close()` and a segment roll, particularly one which goes through `release_appender_in_background()`. (cherry picked from commit 24ec83438505276dcba709beac3677a50212e55c) --- .../storage/tests/storage_e2e_fixture_test.cc | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/v/storage/tests/storage_e2e_fixture_test.cc b/src/v/storage/tests/storage_e2e_fixture_test.cc index 2ccc1a52400f7..a9299cbfece8a 100644 --- a/src/v/storage/tests/storage_e2e_fixture_test.cc +++ b/src/v/storage/tests/storage_e2e_fixture_test.cc @@ -15,6 +15,7 @@ #include "storage/tests/storage_e2e_fixture.h" #include "test_utils/fixture.h" +#include #include #include @@ -25,6 +26,16 @@ using namespace std::chrono_literals; +namespace { +ss::future<> force_roll_log(storage::disk_log_impl* log) { + try { + co_await log->force_roll(ss::default_priority_class()); + } catch (...) { + } +} + +} // namespace + FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) { test_local_cfg.get("log_segment_ms_min") .set_value(std::chrono::duration_cast(1ms)); @@ -156,3 +167,31 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) { // final round of eviction. BOOST_REQUIRE_LE(log->segment_count(), 1); } + +FIXTURE_TEST(test_concurrent_segment_roll_and_close, storage_e2e_fixture) { + const auto topic_name = model::topic("tapioca"); + const auto ntp = model::ntp(model::kafka_namespace, topic_name, 0); + + cluster::topic_properties props; + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); + wait_for_leader(ntp).get(); + + auto partition = app.partition_manager.local().get(ntp); + auto* log = dynamic_cast(partition->log().get()); + auto seg = log->segments().back(); + + // Hold a read lock, which will force release_appender() to go through + // release_appender_in_background() + auto read_lock_holder = seg->read_lock().get(); + + auto roll_fut = force_roll_log(log); + auto release_holder_fut = ss::sleep(100ms).then( + [read_locker_holder = std::move(read_lock_holder)] {}); + auto remove_segment_fut = remove_segment_permanently(log, seg); + + ss::when_all( + std::move(roll_fut), + std::move(remove_segment_fut), + std::move(release_holder_fut)) + .get(); +}