Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] cloud_storage: avoid possible double stop in segment reader #18241

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,13 @@ class partition_record_batch_reader_impl final
/// Transition reader to the completed state. Stop tracking state in
/// the 'remote_partition'
ss::future<> set_end_of_stream() {
co_await _seg_reader->stop();
_seg_reader = {};
if (!_seg_reader) {
co_return;
}
// It's critical that we swap out the reader before calling stop().
// Otherwise, another fiber may swap it out while we're stopping!
auto reader = std::move(_seg_reader);
co_await reader->stop();
}

retry_chain_node _rtc;
Expand Down
87 changes: 87 additions & 0 deletions src/v/cloud_storage/tests/remote_partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,93 @@ FIXTURE_TEST(
}
}

namespace {
ss::future<> sleep_and_abort(ss::abort_source* as, ss::gate* gate) {
auto holder = gate->hold();
auto rand_ms = random_generators::get_int(1, 50);
co_await ss::sleep(rand_ms * 1ms);
as->request_abort_ex(
std::system_error(std::make_error_code(std::errc::connection_aborted)));
}
ss::future<>
read(storage::log_reader_config reader_config, remote_partition* partition) {
auto next = reader_config.start_offset;
while (true) {
reader_config.start_offset = next;
auto translating_reader = co_await partition->make_reader(
reader_config);
auto reader = std::move(translating_reader.reader);
auto headers_read = co_await reader.consume(
test_consumer(), model::no_timeout);
if (headers_read.empty()) {
break;
}
next = headers_read.back().last_offset() + model::offset(1);
}
}
} // anonymous namespace

// With some scheduling points/sleeps injected here and there, regression test
// for a crash seen when racing a client disconnect with the stopping of a
// reader.
FIXTURE_TEST(test_remote_partition_abort_eos_race, cloud_storage_fixture) {
batch_t data = {
.num_records = 1, .type = model::record_batch_type::raft_data};
batch_t conf = {
.num_records = 1, .type = model::record_batch_type::raft_configuration};
batch_t tx_fence = {
.num_records = 1, .type = model::record_batch_type::tx_fence};
const std::vector<std::vector<batch_t>> batch_types = {
{conf, data, data, data, data, data, data, tx_fence, data},
{conf, data, data, data, data, data, data, tx_fence, data},
{conf, data, data, data, data, data, data, tx_fence, data},
};

auto segments = setup_s3_imposter(
*this, batch_types, manifest_inconsistency::none);
auto base = segments[0].base_offset;
auto max = segments[0].max_offset;

vlog(test_log.debug, "offset range: {}-{}", base, max);
print_segments(segments);

ss::lowres_clock::update();
static auto bucket = cloud_storage_clients::bucket_name("bucket");
auto manifest = hydrate_manifest(api.local(), bucket);
partition_probe probe(manifest.get_ntp());
auto manifest_view = ss::make_shared<async_manifest_view>(
api, cache, manifest, bucket);
auto partition = ss::make_shared<remote_partition>(
manifest_view, api.local(), cache.local(), bucket, probe);
auto partition_stop = ss::defer([&partition] { partition->stop().get(); });
partition->start().get();

// Intentionally use max - 1 so the reader stops early and is forced to
// handle it as an EOS.
ss::abort_source as;
storage::log_reader_config reader_config(
base, model::offset{max() - 1}, ss::default_priority_class());
reader_config.abort_source = as;

std::vector<ss::future<>> futs;
ss::gate gate;

// Explicitly abort, simulating the behavior when a client disconnects.
ssx::background = sleep_and_abort(&as, &gate);
for (int i = 0; i < 10; i++) {
futs.emplace_back(read(reader_config, partition.get()));
}
for (auto& f : futs) {
// Ignore exceptions from the reader -- we only care that we don't
// crash.
try {
std::move(f).get();
} catch (...) {
}
}
gate.close().get();
}

/// This test scans the partition with overlapping segments
FIXTURE_TEST(
test_remote_partition_scan_translate_overlap_1, cloud_storage_fixture) {
Expand Down
Loading