Skip to content

Commit

Permalink
cloud_storage: fix segment materialisation race
Browse files Browse the repository at this point in the history
In order to create a new materialised segment, one needs to grab units
from `materialized_resources` first. This is an async operation. By the
time units are acquired, said segment might have already been via a
different code path, resulting in the assertion in
`remote_partition::materialize_segment` triggering.
`remote_partition::aborted_transactions` was particularly susceptible to
this.

This patch fixes the issue by checking for the existence of the segment
and creating a segment (if needed) in the same scheduling task.
Functionally, for the read path, nothing should change.
  • Loading branch information
Vlad Lazar committed Sep 29, 2023
1 parent 5176601 commit fc1c08a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
61 changes: 30 additions & 31 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,31 @@ namespace cloud_storage {
using data_t = model::record_batch_reader::data_t;
using storage_t = model::record_batch_reader::storage_t;

remote_partition::iterator remote_partition::materialize_segment(
remote_partition::iterator remote_partition::get_or_materialize_segment(
const remote_segment_path& path,
const segment_meta& meta,
segment_units unit) {
_as.check();
auto base_kafka_offset = meta.base_offset - meta.delta_offset;
auto st = std::make_unique<materialized_segment_state>(
meta, path, *this, std::move(unit));
auto [iter, ok] = _segments.insert(
std::make_pair(meta.base_offset, std::move(st)));
vassert(
ok,
"Segment with base log offset {} and base kafka offset {} is already "
"materialized, max offset of the new segment {}, max offset of the "
"existing segment {}",
meta.base_offset,
base_kafka_offset,
meta.committed_offset,
iter->second->segment->get_max_rp_offset());
_ts_probe.segment_materialized();

if (ok) {
vlog(
_ctxlog.trace,
"Materialized new segment for meta {} with path {}",
meta,
path);
_ts_probe.segment_materialized();
} else {
vlog(
_ctxlog.debug,
"Reusing materialized segment for meta {} with path {}",
meta,
path);
}

return iter;
}

Expand Down Expand Up @@ -163,7 +168,7 @@ remote_partition::borrow_result_t remote_partition::borrow_next_segment_reader(
}
if (iter == _segments.end()) {
auto path = manifest.generate_segment_path(*mit);
iter = materialize_segment(path, *mit, std::move(segment_unit));
iter = get_or_materialize_segment(path, *mit, std::move(segment_unit));
}
auto mit_committed_offset = mit->committed_offset;
auto next_it = std::next(std::move(mit));
Expand Down Expand Up @@ -913,14 +918,15 @@ remote_partition::aborted_transactions(offset_range offsets) {
break;
}

// Segment might be materialized, we need a
// second map lookup to learn if this is the case.
auto m = _segments.find(it->base_offset);
if (m == _segments.end()) {
auto path = stm_manifest.generate_segment_path(*it);
auto segment_unit = co_await materialized().get_segment_units();
m = materialize_segment(path, *it, std::move(segment_unit));
}
// Note: This is not buletproof: the segment might be
// re-uploaded/merged while waiting for the units which may result
// in a failure to materialise. This should be transient however.
// One solution for this is to grab all the required segment units
// up front at the start of the function.
auto segment_unit = co_await materialized().get_segment_units();
auto path = stm_manifest.generate_segment_path(*it);
auto m = get_or_materialize_segment(
path, *it, std::move(segment_unit));
remote_segs.emplace_back(m->second->segment);
}
for (const auto& segment : remote_segs) {
Expand Down Expand Up @@ -961,16 +967,9 @@ remote_partition::aborted_transactions(offset_range offsets) {
});

for (const auto& [meta, path] : meta_to_materialize) {
// Segment might be materialized, we need a
// second map lookup to learn if this is the case.
auto m = _segments.find(meta.base_offset);
if (m == _segments.end()) {
// Here the 'manifest' might not be the one that contain 'meta'
// but it doesn't matter because 'materialize_segment' method is
// only used to generate a segment path.
auto segment_unit = co_await materialized().get_segment_units();
m = materialize_segment(path, meta, std::move(segment_unit));
}
auto segment_unit = co_await materialized().get_segment_units();
auto m = get_or_materialize_segment(
path, meta, std::move(segment_unit));
auto tx = co_await m->second->segment->aborted_transactions(
offsets.begin_rp, offsets.end_rp);
std::copy(tx.begin(), tx.end(), std::back_inserter(result));
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ class remote_partition
segment_reader_units segment_reader_unit,
model::offset hint = {});

/// Materialize new segment
/// @return iterator that points to newly added segment (always valid
/// Materialize a new segment or grab one if it already exists
/// @return iterator that points a materialized segment (always valid
/// iterator)
iterator materialize_segment(
iterator get_or_materialize_segment(
const remote_segment_path& path, const segment_meta&, segment_units);

retry_chain_node _rtc;
Expand Down

0 comments on commit fc1c08a

Please sign in to comment.