Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.3.x] [CORE-7058]: storage: fix race condition in segment::release_appender_in_background() #24560

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ void disk_log_impl::bg_checkpoint_offset_translator() {
ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) {
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
auto t = term();
auto next_offset = offsets().dirty_offset + model::offset(1);
auto next_offset = model::next_offset(offsets().dirty_offset);
if (_segs.empty()) {
co_return co_await new_segment(next_offset, t, iopc);
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include <absl/container/flat_hash_map.h>

struct storage_e2e_fixture;
namespace storage {

/// \brief offset boundary type
Expand Down Expand Up @@ -248,6 +249,7 @@ class disk_log_impl final : public log {
private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
friend ::storage_e2e_fixture;
friend std::ostream& operator<<(std::ostream& o, const disk_log_impl& d);

/// Compute file offset of the batch inside the segment
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ ss::future<> segment::release_appender(readers_cache* readers_cache) {
}

void segment::release_appender_in_background(readers_cache* readers_cache) {
_gate.check();

auto a = std::exchange(_appender, nullptr);
auto c = config::shard_local_cfg().release_cache_on_segment_roll()
? std::exchange(_cache, std::nullopt)
Expand Down
50 changes: 50 additions & 0 deletions src/v/storage/tests/storage_e2e_fixture.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "redpanda/tests/fixture.h"
#include "storage/disk_log_impl.h"
#include "storage/segment.h"
#include "test_utils/scoped_config.h"

#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>

struct storage_e2e_fixture : public redpanda_thread_fixture {
scoped_config test_local_cfg;

// Produces to the given fixture's partition for 10 seconds.
ss::future<> produce_to_fixture(model::topic topic_name, int* incomplete) {
tests::kafka_produce_transport producer(co_await make_kafka_client());
co_await producer.start();
const int cardinality = 10;
auto now = ss::lowres_clock::now();
while (ss::lowres_clock::now() < now + 5s) {
for (int i = 0; i < cardinality; i++) {
co_await producer.produce_to_partition(
topic_name,
model::partition_id(0),
tests::kv_t::sequence(i, 1));
}
}
*incomplete -= 1;
}

ss::future<> remove_segment_permanently(
storage::disk_log_impl* log, ss::lw_shared_ptr<storage::segment> seg) {
return log->remove_segment_permanently(seg, "storage_e2e_fixture")
.then([&, log, seg]() {
auto& segs = log->segments();
auto it = std::find(segs.begin(), segs.end(), seg);
if (it == segs.end()) {
return;
}
segs.erase(it, std::next(it));
});
}
};
60 changes: 39 additions & 21 deletions src/v/storage/tests/storage_e2e_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
#include "kafka/server/tests/produce_consume_utils.h"
#include "model/fundamental.h"
#include "random/generators.h"
#include "redpanda/tests/fixture.h"
#include "storage/disk_log_impl.h"
#include "storage/segment.h"
#include "storage/tests/storage_e2e_fixture.h"
#include "test_utils/fixture.h"
#include "test_utils/scoped_config.h"

#include <seastar/core/future.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/lowres_clock.hh>

#include <boost/test/tools/old/interface.hpp>
Expand All @@ -23,27 +26,14 @@

using namespace std::chrono_literals;

struct storage_e2e_fixture : public redpanda_thread_fixture {
scoped_config test_local_cfg;
};

namespace {

// Produces to the given fixture's partition for 10 seconds.
ss::future<> produce_to_fixture(
storage_e2e_fixture* fix, model::topic topic_name, int* incomplete) {
tests::kafka_produce_transport producer(co_await fix->make_kafka_client());
co_await producer.start();
const int cardinality = 10;
auto now = ss::lowres_clock::now();
while (ss::lowres_clock::now() < now + 5s) {
for (int i = 0; i < cardinality; i++) {
co_await producer.produce_to_partition(
topic_name, model::partition_id(0), tests::kv_t::sequence(i, 1));
}
ss::future<> force_roll_log(storage::disk_log_impl* log) {
try {
co_await log->force_roll(ss::default_priority_class());
} catch (...) {
}
*incomplete -= 1;
}

} // namespace

FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) {
Expand All @@ -69,7 +59,7 @@ FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) {
produces.reserve(5);
int incomplete = 5;
for (int i = 0; i < 5; i++) {
auto fut = produce_to_fixture(this, topic_name, &incomplete);
auto fut = produce_to_fixture(topic_name, &incomplete);
produces.emplace_back(std::move(fut));
}
auto partition = app.partition_manager.local().get(ntp);
Expand Down Expand Up @@ -177,3 +167,31 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) {
// final round of eviction.
BOOST_REQUIRE_LE(log->segment_count(), 1);
}

FIXTURE_TEST(test_concurrent_segment_roll_and_close, storage_e2e_fixture) {
const auto topic_name = model::topic("tapioca");
const auto ntp = model::ntp(model::kafka_namespace, topic_name, 0);

cluster::topic_properties props;
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();

auto partition = app.partition_manager.local().get(ntp);
auto* log = dynamic_cast<storage::disk_log_impl*>(partition->log().get());
auto seg = log->segments().back();

// Hold a read lock, which will force release_appender() to go through
// release_appender_in_background()
auto read_lock_holder = seg->read_lock().get();

auto roll_fut = force_roll_log(log);
auto release_holder_fut = ss::sleep(100ms).then(
[read_locker_holder = std::move(read_lock_holder)] {});
auto remove_segment_fut = remove_segment_permanently(log, seg);

ss::when_all(
std::move(roll_fut),
std::move(remove_segment_fut),
std::move(release_holder_fut))
.get();
}
Loading