Skip to content

Commit

Permalink
archival: clamp uploads to committed offset
Browse files Browse the repository at this point in the history
The archival/tiered storage correctness assumption builds on the
(wrong) assumption that LSO is monotonic. Tiered storage doesn't have a
concept of suffix truncation so if that would happen it would lead
violations of correctness properties and diverging logs/undefined
behavior.

However, we have discovered that property does not hold if there are no
in-progress transaction and acks=0/1 or write caching is in use because
LSO falls back to "last visible index"[^1] which can get truncated.

Ref #18244

[^1]: https://github.com/redpanda-data/redpanda/blob/88ac775f9f7954330732024abfa6e9ed5c9c11fd/src/v/cluster/rm_stm.cc#L1322

(cherry picked from commit ff358cc)
  • Loading branch information
nvartolomei authored and vbotbuildovich committed May 10, 2024
1 parent 43b3520 commit 80cfe71
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
29 changes: 22 additions & 7 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) {
}

ss::future<std::vector<ntp_archiver::scheduled_upload>>
ntp_archiver::schedule_uploads(model::offset last_stable_offset) {
ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) {
// We have to increment last offset to guarantee progress.
// The manifest's last offset contains dirty_offset of the
// latest uploaded segment but '_policy' requires offset that
Expand All @@ -1642,7 +1642,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) {
params.push_back({
.upload_kind = segment_upload_kind::non_compacted,
.start_offset = start_upload_offset,
.last_offset = last_stable_offset,
.last_offset = max_offset_exclusive,
.allow_reuploads = allow_reuploads_t::no,
.archiver_term = _start_term,
});
Expand Down Expand Up @@ -2026,15 +2026,30 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
.compacted_upload_result = compacted_result};
}

model::offset ntp_archiver::max_uploadable_offset_exclusive() const {
// We impose an additional (LSO) constraint on the uploadable offset to
// as we need to have a complete index of aborted transactions if any
// before we can upload a segment.
return std::min(
_parent.last_stable_offset(),
model::next_offset(_parent.committed_offset()));
}

ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
std::optional<model::offset> lso_override) {
vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp);
auto last_stable_offset = lso_override ? *lso_override
: _parent.last_stable_offset();
std::optional<model::offset> max_offset_override_exclusive) {
auto max_offset_exclusive = max_offset_override_exclusive
? *max_offset_override_exclusive
: max_uploadable_offset_exclusive();
vlog(
_rtclog.debug,
"Uploading next candidates called for {} with max_offset_exclusive={}",
_ntp,
max_offset_exclusive);
ss::gate::holder holder(_gate);
try {
auto units = co_await ss::get_units(_mutex, 1, _as);
auto scheduled_uploads = co_await schedule_uploads(last_stable_offset);
auto scheduled_uploads = co_await schedule_uploads(
max_offset_exclusive);
co_return co_await wait_all_scheduled_uploads(
std::move(scheduled_uploads));
} catch (const ss::gate_closed_exception&) {
Expand Down
16 changes: 13 additions & 3 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,25 @@ class ntp_archiver {
auto operator<=>(const batch_result&) const = default;
};

/// Compute the maximum offset that is safe to be uploaded to the cloud.
///
/// It must be guaranteed that this offset is monotonically increasing/
/// can never go backwards. Otherwise, the local and cloud logs will
/// diverge leading to undefined behavior.
model::offset max_uploadable_offset_exclusive() const;

/// \brief Upload next set of segments to S3 (if any)
/// The semaphore is used to track number of parallel uploads. The method
/// will pick not more than '_concurrency' candidates and start
/// uploading them.
///
/// \param lso_override last stable offset override
/// \param max_offset_override_exclusive Overrides the maximum offset
/// that can be uploaded. If nullopt, the maximum offset is
/// calculated automatically.
/// \return future that returns number of uploaded/failed segments
virtual ss::future<batch_result> upload_next_candidates(
std::optional<model::offset> last_stable_offset_override = std::nullopt);
std::optional<model::offset> max_offset_override_exclusive
= std::nullopt);

ss::future<cloud_storage::download_result> sync_manifest();

Expand Down Expand Up @@ -422,7 +432,7 @@ class ntp_archiver {

/// Start all uploads
ss::future<std::vector<scheduled_upload>>
schedule_uploads(model::offset last_stable_offset);
schedule_uploads(model::offset max_offset_exclusive);

ss::future<std::vector<scheduled_upload>>
schedule_uploads(std::vector<upload_context> loop_contexts);
Expand Down

0 comments on commit 80cfe71

Please sign in to comment.