Skip to content

Commit

Permalink
Merge pull request #16099 from vbotbuildovich/backport-pr-15089-v23.2…
Browse files Browse the repository at this point in the history
….x-449

[v23.2.x] archival: Use explicit types to encode upload candidate creation result
  • Loading branch information
piyushredpanda authored Jan 15, 2024
2 parents 0ed4cd5 + cbfdb12 commit 2afbf0c
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 229 deletions.
80 changes: 70 additions & 10 deletions src/v/archival/archival_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,60 @@ std::ostream& operator<<(std::ostream& s, const upload_candidate& c) {
return s;
}

std::ostream& operator<<(std::ostream& os, candidate_creation_error err) {
os << "compacted candidate creation error: ";
switch (err) {
case candidate_creation_error::no_segments_collected:
return os << "no segments collected";
case candidate_creation_error::begin_offset_seek_error:
return os << "failed to seek begin offset";
case candidate_creation_error::end_offset_seek_error:
return os << "failed to seek end offset";
case candidate_creation_error::offset_inside_batch:
return os << "offset inside batch";
case candidate_creation_error::upload_size_unchanged:
return os << "size of candidate unchanged";
case candidate_creation_error::cannot_replace_manifest_entry:
return os << "candidate cannot replace manifest entry";
case candidate_creation_error::no_segment_for_begin_offset:
return os << "no segment for begin offset";
case candidate_creation_error::missing_ntp_config:
return os << "missing config for NTP";
case candidate_creation_error::failed_to_get_file_range:
return os << "failed to get file range for candidate";
case candidate_creation_error::zero_content_length:
return os << "candidate has no content";
}
}

ss::log_level log_level_for_error(const candidate_creation_error& error) {
switch (error) {
case candidate_creation_error::no_segments_collected:
case candidate_creation_error::begin_offset_seek_error:
case candidate_creation_error::end_offset_seek_error:
case candidate_creation_error::upload_size_unchanged:
case candidate_creation_error::cannot_replace_manifest_entry:
case candidate_creation_error::no_segment_for_begin_offset:
case candidate_creation_error::failed_to_get_file_range:
case candidate_creation_error::zero_content_length:
return ss::log_level::debug;
case candidate_creation_error::offset_inside_batch:
case candidate_creation_error::missing_ntp_config:
return ss::log_level::warn;
}
}

std::ostream&
operator<<(std::ostream& os, const skip_offset_range& skip_range) {
fmt::print(
os,
"skip_offset_range{{begin: {}, end: {}, error: {}}}",
skip_range.begin_offset,
skip_range.end_offset,
skip_range.reason);
return os;
}

archival_policy::archival_policy(
model::ntp ntp,
std::optional<segment_time_limit> limit,
Expand Down Expand Up @@ -308,7 +362,7 @@ static ss::future<std::optional<std::error_code>> get_file_range(
/// a name '1000-1-v1.log'. If we were only able to find offset
/// 990 instead of 1000, we will upload starting from it and
/// the name will be '990-1-v1.log'.
static ss::future<upload_candidate_with_locks> create_upload_candidate(
static ss::future<candidate_creation_result> create_upload_candidate(
model::offset begin_inclusive,
std::optional<model::offset> end_inclusive,
ss::lw_shared_ptr<storage::segment> segment,
Expand Down Expand Up @@ -344,7 +398,7 @@ static ss::future<upload_candidate_with_locks> create_upload_candidate(
archival_log.error,
"Upload candidate not created, failed to get file range: {}",
file_range_result.value().message());
co_return upload_candidate_with_locks{upload_candidate{}, {}};
co_return candidate_creation_error::failed_to_get_file_range;
}
if (result->starting_offset != segment->offsets().base_offset) {
// We need to generate new name for the segment
Expand All @@ -367,7 +421,7 @@ static ss::future<upload_candidate_with_locks> create_upload_candidate(
co_return upload_candidate_with_locks{*result, std::move(locks)};
}

ss::future<upload_candidate_with_locks> archival_policy::get_next_candidate(
ss::future<candidate_creation_result> archival_policy::get_next_candidate(
model::offset begin_inclusive,
model::offset end_exclusive,
ss::shared_ptr<storage::log> log,
Expand All @@ -378,8 +432,12 @@ ss::future<upload_candidate_with_locks> archival_policy::get_next_candidate(
auto adjusted_lso = end_exclusive - model::offset(1);
auto [segment, ntp_conf, forced] = find_segment(
begin_inclusive, adjusted_lso, std::move(log), ot_state);
if (segment.get() == nullptr || ntp_conf == nullptr) {
co_return upload_candidate_with_locks{upload_candidate{}, {}};
if (segment.get() == nullptr) {
co_return candidate_creation_error::no_segment_for_begin_offset;
}

if (ntp_conf == nullptr) {
co_return candidate_creation_error::missing_ntp_config;
}
// We need to adjust LSO since it points to the first
// recordbatch with uncommitted transactions data
Expand All @@ -399,13 +457,15 @@ ss::future<upload_candidate_with_locks> archival_policy::get_next_candidate(
ntp_conf,
_io_priority,
segment_lock_duration);
if (upload.candidate.content_length == 0) {
co_return upload_candidate_with_locks{upload_candidate{}, {}};
if (const auto* u = std::get_if<upload_candidate_with_locks>(&upload); u) {
if (u->candidate.content_length == 0) {
co_return candidate_creation_error::zero_content_length;
}
}
co_return upload;
}

ss::future<upload_candidate_with_locks>
ss::future<candidate_creation_result>
archival_policy::get_next_compacted_segment(
model::offset begin_inclusive,
ss::shared_ptr<storage::log> log,
Expand All @@ -416,7 +476,7 @@ archival_policy::get_next_compacted_segment(
archival_log.warn,
"Upload policy find next compacted segment: no segments ntp: {}",
_ntp);
co_return upload_candidate_with_locks{upload_candidate{}, {}};
co_return candidate_creation_error::no_segments_collected;
}
segment_collector compacted_segment_collector{
begin_inclusive,
Expand All @@ -427,7 +487,7 @@ archival_policy::get_next_compacted_segment(

compacted_segment_collector.collect_segments();
if (!compacted_segment_collector.should_replace_manifest_segment()) {
co_return upload_candidate_with_locks{upload_candidate{}, {}};
co_return candidate_creation_error::cannot_replace_manifest_entry;
}

co_return co_await compacted_segment_collector.make_upload_candidate(
Expand Down
37 changes: 35 additions & 2 deletions src/v/archival/archival_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@

namespace archival {

enum class candidate_creation_error {
no_segments_collected,
begin_offset_seek_error,
end_offset_seek_error,
offset_inside_batch,
upload_size_unchanged,
cannot_replace_manifest_entry,
no_segment_for_begin_offset,
missing_ntp_config,
failed_to_get_file_range,
zero_content_length,
};

std::ostream& operator<<(std::ostream&, candidate_creation_error);

ss::log_level log_level_for_error(const candidate_creation_error& error);

struct upload_candidate {
segment_name exposed_name;
model::offset starting_offset;
Expand All @@ -45,6 +62,22 @@ struct upload_candidate_with_locks {
std::vector<ss::rwlock::holder> read_locks;
};

/// Wraps an error with an offset range, so that no
/// further upload candidates are created from this offset range.
struct skip_offset_range {
model::offset begin_offset;
model::offset end_offset;
candidate_creation_error reason;

friend std::ostream& operator<<(std::ostream&, const skip_offset_range&);
};

using candidate_creation_result = std::variant<
std::monostate,
upload_candidate_with_locks,
skip_offset_range,
candidate_creation_error>;

/// Archival policy is responsible for extracting segments from
/// log_manager in right order.
///
Expand All @@ -63,14 +96,14 @@ class archival_policy {
/// \param end_exclusive is an exclusive end of the range
/// \param lm is a log manager
/// \return initializd struct on success, empty struct on failure
ss::future<upload_candidate_with_locks> get_next_candidate(
ss::future<candidate_creation_result> get_next_candidate(
model::offset begin_inclusive,
model::offset end_exclusive,
ss::shared_ptr<storage::log>,
const storage::offset_translator_state&,
ss::lowres_clock::duration segment_lock_duration);

ss::future<upload_candidate_with_locks> get_next_compacted_segment(
ss::future<candidate_creation_result> get_next_compacted_segment(
model::offset begin_inclusive,
ss::shared_ptr<storage::log> log,
const cloud_storage::partition_manifest& manifest,
Expand Down
Loading

0 comments on commit 2afbf0c

Please sign in to comment.