Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: fix segment materialisation race #13796

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,44 @@ namespace cloud_storage {
using data_t = model::record_batch_reader::data_t;
using storage_t = model::record_batch_reader::storage_t;

remote_partition::iterator remote_partition::materialize_segment(
remote_partition::iterator remote_partition::get_or_materialize_segment(
const remote_segment_path& path,
const segment_meta& meta,
segment_units unit) {
_as.check();
auto base_kafka_offset = meta.base_offset - meta.delta_offset;

VladLazar marked this conversation as resolved.
Show resolved Hide resolved
if (auto iter = _segments.find(meta.base_offset); iter != _segments.end()) {
vlog(
_ctxlog.debug,
"Reusing materialized segment for meta {} with path {}",
meta,
path);

return iter;
}

auto st = std::make_unique<materialized_segment_state>(
meta, path, *this, std::move(unit));
auto [iter, ok] = _segments.insert(
auto [new_iter, ok] = _segments.insert(
std::make_pair(meta.base_offset, std::move(st)));

// Should never fire since there's no scheduling point between
// the existence check and the insertion.
vassert(
ok,
"Segment with base log offset {} and base kafka offset {} is already "
"materialized, max offset of the new segment {}, max offset of the "
"existing segment {}",
meta.base_offset,
base_kafka_offset,
meta.committed_offset,
iter->second->segment->get_max_rp_offset());
"Segment with base offset {} is already materialized",
meta.base_offset);

_ts_probe.segment_materialized();
return iter;

vlog(
_ctxlog.debug,
"Materialized new segment for meta {} with path {}",
meta,
path);
_ts_probe.segment_materialized();

return new_iter;
}

remote_partition::borrow_result_t remote_partition::borrow_next_segment_reader(
Expand Down Expand Up @@ -163,7 +180,7 @@ remote_partition::borrow_result_t remote_partition::borrow_next_segment_reader(
}
if (iter == _segments.end()) {
auto path = manifest.generate_segment_path(*mit);
iter = materialize_segment(path, *mit, std::move(segment_unit));
iter = get_or_materialize_segment(path, *mit, std::move(segment_unit));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a separate call that expects callers to have made the segment check in the same task. Or maybe we can tweak the above find() to use get_or_materialize_segment()?

Just noting that this is seeking twice for the same offset with no scheduling points in between.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the offloading logic above should move in get_or_materialize_segment, but I chose not to do it in this PR to keep the change less intrusive.

}
auto mit_committed_offset = mit->committed_offset;
auto next_it = std::next(std::move(mit));
Expand Down Expand Up @@ -913,14 +930,15 @@ remote_partition::aborted_transactions(offset_range offsets) {
break;
}

// Segment might be materialized, we need a
// second map lookup to learn if this is the case.
auto m = _segments.find(it->base_offset);
if (m == _segments.end()) {
auto path = stm_manifest.generate_segment_path(*it);
auto segment_unit = co_await materialized().get_segment_units();
m = materialize_segment(path, *it, std::move(segment_unit));
}
// Note: This is not buletproof: the segment might be
// re-uploaded/merged while waiting for the units which may result
// in a failure to materialise. This should be transient however.
// One solution for this is to grab all the required segment units
// up front at the start of the function.
auto segment_unit = co_await materialized().get_segment_units();
auto path = stm_manifest.generate_segment_path(*it);
auto m = get_or_materialize_segment(
path, *it, std::move(segment_unit));
remote_segs.emplace_back(m->second->segment);
}
for (const auto& segment : remote_segs) {
Expand Down Expand Up @@ -961,16 +979,9 @@ remote_partition::aborted_transactions(offset_range offsets) {
});

for (const auto& [meta, path] : meta_to_materialize) {
// Segment might be materialized, we need a
// second map lookup to learn if this is the case.
auto m = _segments.find(meta.base_offset);
if (m == _segments.end()) {
// Here the 'manifest' might not be the one that contain 'meta'
// but it doesn't matter because 'materialize_segment' method is
// only used to generate a segment path.
auto segment_unit = co_await materialized().get_segment_units();
m = materialize_segment(path, meta, std::move(segment_unit));
}
auto segment_unit = co_await materialized().get_segment_units();
auto m = get_or_materialize_segment(
path, meta, std::move(segment_unit));
auto tx = co_await m->second->segment->aborted_transactions(
offsets.begin_rp, offsets.end_rp);
std::copy(tx.begin(), tx.end(), std::back_inserter(result));
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ class remote_partition
segment_reader_units segment_reader_unit,
model::offset hint = {});

/// Materialize new segment
/// @return iterator that points to newly added segment (always valid
/// Materialize a new segment or grab one if it already exists
/// @return iterator that points a materialized segment (always valid
/// iterator)
iterator materialize_segment(
iterator get_or_materialize_segment(
const remote_segment_path& path, const segment_meta&, segment_units);

retry_chain_node _rtc;
Expand Down
Loading