diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 79c6c1b5a0ca7..7b19916625df6 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1748,7 +1748,7 @@ void disk_log_impl::bg_checkpoint_offset_translator() { ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) { auto roll_lock_holder = co_await _segments_rolling_lock.get_units(); auto t = term(); - auto next_offset = offsets().dirty_offset + model::offset(1); + auto next_offset = model::next_offset(offsets().dirty_offset); if (_segs.empty()) { co_return co_await new_segment(next_offset, t, iopc); } diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 9a32445c80d8f..00b6f3ed511ee 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -31,6 +31,7 @@ #include +struct storage_e2e_fixture; namespace storage { /// \brief offset boundary type @@ -248,6 +249,7 @@ class disk_log_impl final : public log { private: friend class disk_log_appender; // for multi-term appends friend class disk_log_builder; // for tests + friend ::storage_e2e_fixture; friend std::ostream& operator<<(std::ostream& o, const disk_log_impl& d); /// Compute file offset of the batch inside the segment diff --git a/src/v/storage/segment.cc b/src/v/storage/segment.cc index ec731353f0b89..93d37e7800367 100644 --- a/src/v/storage/segment.cc +++ b/src/v/storage/segment.cc @@ -297,6 +297,8 @@ ss::future<> segment::release_appender(readers_cache* readers_cache) { } void segment::release_appender_in_background(readers_cache* readers_cache) { + _gate.check(); + auto a = std::exchange(_appender, nullptr); auto c = config::shard_local_cfg().release_cache_on_segment_roll() ? std::exchange(_cache, std::nullopt) diff --git a/src/v/storage/tests/storage_e2e_fixture.h b/src/v/storage/tests/storage_e2e_fixture.h new file mode 100644 index 0000000000000..8f7e3352e2f94 --- /dev/null +++ b/src/v/storage/tests/storage_e2e_fixture.h @@ -0,0 +1,50 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "redpanda/tests/fixture.h" +#include "storage/disk_log_impl.h" +#include "storage/segment.h" +#include "test_utils/scoped_config.h" + +#include +#include + +struct storage_e2e_fixture : public redpanda_thread_fixture { + scoped_config test_local_cfg; + + // Produces to the given fixture's partition for 10 seconds. + ss::future<> produce_to_fixture(model::topic topic_name, int* incomplete) { + tests::kafka_produce_transport producer(co_await make_kafka_client()); + co_await producer.start(); + const int cardinality = 10; + auto now = ss::lowres_clock::now(); + while (ss::lowres_clock::now() < now + 5s) { + for (int i = 0; i < cardinality; i++) { + co_await producer.produce_to_partition( + topic_name, + model::partition_id(0), + tests::kv_t::sequence(i, 1)); + } + } + *incomplete -= 1; + } + + ss::future<> remove_segment_permanently( + storage::disk_log_impl* log, ss::lw_shared_ptr seg) { + return log->remove_segment_permanently(seg, "storage_e2e_fixture") + .then([&, log, seg]() { + auto& segs = log->segments(); + auto it = std::find(segs.begin(), segs.end(), seg); + if (it == segs.end()) { + return; + } + segs.erase(it, std::next(it)); + }); + } +}; diff --git a/src/v/storage/tests/storage_e2e_fixture_test.cc b/src/v/storage/tests/storage_e2e_fixture_test.cc index 6c3a8c38ba174..a9299cbfece8a 100644 --- a/src/v/storage/tests/storage_e2e_fixture_test.cc +++ b/src/v/storage/tests/storage_e2e_fixture_test.cc @@ -10,10 +10,13 @@ #include "kafka/server/tests/produce_consume_utils.h" #include "model/fundamental.h" #include "random/generators.h" -#include "redpanda/tests/fixture.h" +#include "storage/disk_log_impl.h" +#include "storage/segment.h" +#include "storage/tests/storage_e2e_fixture.h" #include "test_utils/fixture.h" -#include "test_utils/scoped_config.h" +#include +#include #include #include @@ -23,27 +26,14 @@ using namespace std::chrono_literals; -struct storage_e2e_fixture : public redpanda_thread_fixture { - scoped_config test_local_cfg; -}; - namespace { - -// Produces to the given fixture's partition for 10 seconds. -ss::future<> produce_to_fixture( - storage_e2e_fixture* fix, model::topic topic_name, int* incomplete) { - tests::kafka_produce_transport producer(co_await fix->make_kafka_client()); - co_await producer.start(); - const int cardinality = 10; - auto now = ss::lowres_clock::now(); - while (ss::lowres_clock::now() < now + 5s) { - for (int i = 0; i < cardinality; i++) { - co_await producer.produce_to_partition( - topic_name, model::partition_id(0), tests::kv_t::sequence(i, 1)); - } +ss::future<> force_roll_log(storage::disk_log_impl* log) { + try { + co_await log->force_roll(ss::default_priority_class()); + } catch (...) { } - *incomplete -= 1; } + } // namespace FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) { @@ -69,7 +59,7 @@ FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) { produces.reserve(5); int incomplete = 5; for (int i = 0; i < 5; i++) { - auto fut = produce_to_fixture(this, topic_name, &incomplete); + auto fut = produce_to_fixture(topic_name, &incomplete); produces.emplace_back(std::move(fut)); } auto partition = app.partition_manager.local().get(ntp); @@ -177,3 +167,31 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) { // final round of eviction. BOOST_REQUIRE_LE(log->segment_count(), 1); } + +FIXTURE_TEST(test_concurrent_segment_roll_and_close, storage_e2e_fixture) { + const auto topic_name = model::topic("tapioca"); + const auto ntp = model::ntp(model::kafka_namespace, topic_name, 0); + + cluster::topic_properties props; + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); + wait_for_leader(ntp).get(); + + auto partition = app.partition_manager.local().get(ntp); + auto* log = dynamic_cast(partition->log().get()); + auto seg = log->segments().back(); + + // Hold a read lock, which will force release_appender() to go through + // release_appender_in_background() + auto read_lock_holder = seg->read_lock().get(); + + auto roll_fut = force_roll_log(log); + auto release_holder_fut = ss::sleep(100ms).then( + [read_locker_holder = std::move(read_lock_holder)] {}); + auto remove_segment_fut = remove_segment_permanently(log, seg); + + ss::when_all( + std::move(roll_fut), + std::move(remove_segment_fut), + std::move(release_holder_fut)) + .get(); +}