Skip to content

Commit

Permalink
cloud_storage: search for query from archive start not clean offset
Browse files Browse the repository at this point in the history
Starting the cursor from the clean offset is only required when
computing retention because of an implementation detail which is
documented in the added comment and referenced commits.

In all other cases we must start searching from the archive start
offset. This is particularly important for timequeries which must return
the first visible batch above the timestamp.
  • Loading branch information
nvartolomei committed May 17, 2024
1 parent 41eed62 commit c5eb52d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 15 deletions.
22 changes: 17 additions & 5 deletions src/v/cloud_storage/async_manifest_view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <exception>
#include <functional>
#include <iterator>
#include <optional>
#include <system_error>
#include <variant>

Expand Down Expand Up @@ -561,7 +562,8 @@ ss::future<> async_manifest_view::run_bg_loop() {
ss::future<result<std::unique_ptr<async_manifest_view_cursor>, error_outcome>>
async_manifest_view::get_cursor(
async_view_search_query_t query,
std::optional<model::offset> end_inclusive) noexcept {
std::optional<model::offset> end_inclusive,
cursor_base_t cursor_base) noexcept {
try {
ss::gate::holder h(_gate);
if (
Expand All @@ -581,7 +583,14 @@ async_manifest_view::get_cursor(
if (_stm_manifest.get_archive_start_offset() == model::offset{}) {
begin = _stm_manifest.get_start_offset().value_or(begin);
} else {
begin = _stm_manifest.get_archive_clean_offset();
switch (cursor_base) {
case cursor_base_t::archive_start_offset:
begin = _stm_manifest.get_archive_start_offset();
break;
case cursor_base_t::archive_clean_offset:
begin = _stm_manifest.get_archive_clean_offset();
break;
}
}

if (end < begin) {
Expand Down Expand Up @@ -959,7 +968,8 @@ async_manifest_view::offset_based_retention() noexcept {
archive_start_offset_advance result;
try {
auto boundary = _stm_manifest.get_start_kafka_offset_override();
auto res = co_await get_cursor(boundary);
auto res = co_await get_cursor(
boundary, std::nullopt, cursor_base_t::archive_clean_offset);
if (res.has_failure()) {
if (res.error() == error_outcome::out_of_range) {
vlog(
Expand Down Expand Up @@ -1023,7 +1033,8 @@ async_manifest_view::time_based_retention(

auto res = co_await get_cursor(
_stm_manifest.get_archive_start_offset(),
model::prev_offset(_stm_manifest.get_start_offset().value()));
model::prev_offset(_stm_manifest.get_start_offset().value()),
cursor_base_t::archive_clean_offset);
if (res.has_failure()) {
if (res.error() == error_outcome::out_of_range) {
// The cutoff point is outside of the offset range, no need to
Expand Down Expand Up @@ -1149,7 +1160,8 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept {

auto res = co_await get_cursor(
_stm_manifest.get_archive_clean_offset(),
model::prev_offset(_stm_manifest.get_start_offset().value()));
model::prev_offset(_stm_manifest.get_start_offset().value()),
cursor_base_t::archive_clean_offset);
if (res.has_failure()) {
vlogl(
_ctxlog,
Expand Down
14 changes: 13 additions & 1 deletion src/v/cloud_storage/async_manifest_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ class async_manifest_view {
ss::future<> start();
ss::future<> stop();

enum class cursor_base_t {
archive_start_offset,

/// Special case that is used when computing retention.
///
/// For details, see:
/// GitHub: https://github.com/redpanda-data/redpanda/pull/12177
/// Commit: 1b6ab7be8818e3878a32f9037694ae5c4cf4fea2
archive_clean_offset,
};

/// Get active spillover manifests asynchronously
///
/// \note the method may hydrate manifests in the cache or
Expand All @@ -91,7 +102,8 @@ class async_manifest_view {
result<std::unique_ptr<async_manifest_view_cursor>, error_outcome>>
get_cursor(
async_view_search_query_t q,
std::optional<model::offset> end_inclusive = std::nullopt) noexcept;
std::optional<model::offset> end_inclusive = std::nullopt,
cursor_base_t cursor_base = cursor_base_t::archive_start_offset) noexcept;

/// Get inactive spillover manifests which are waiting for
/// retention
Expand Down
11 changes: 11 additions & 0 deletions src/v/cloud_storage/tests/async_manifest_view_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,19 @@ FIXTURE_TEST(test_async_manifest_view_truncate, async_manifest_view_fixture) {

model::offset so = model::offset{0};
auto maybe_cursor = view.get_cursor(so).get();
BOOST_REQUIRE(
maybe_cursor.has_error()
&& maybe_cursor.error() == cloud_storage::error_outcome::out_of_range);

// The clean offset should still be accesible such that retention
// can operate above it.
maybe_cursor = view
.get_cursor(
so,
std::nullopt,
cloud_storage::async_manifest_view::cursor_base_t::
archive_clean_offset)
.get();
BOOST_REQUIRE(!maybe_cursor.has_failure());

maybe_cursor = view.get_cursor(new_so).get();
Expand Down
41 changes: 32 additions & 9 deletions src/v/cloud_storage/tests/remote_partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,14 @@ FIXTURE_TEST(test_scan_by_kafka_offset_truncated, cloud_storage_fixture) {
*this, model::offset(6), model::offset_delta(3), batch_types);
print_segments(segments);
for (int i = 0; i <= 2; i++) {
BOOST_REQUIRE(check_fetch(*this, kafka::offset(i), false));
BOOST_REQUIRE_EXCEPTION(
check_fetch(*this, kafka::offset(i), false),
std::runtime_error,
[](const std::runtime_error& e) {
return std::string(e.what()).find(
"Failed to query spillover manifests")
!= std::string::npos;
});
}
for (int i = 3; i <= 8; i++) {
BOOST_REQUIRE(check_scan(*this, kafka::offset(i), 9 - i));
Expand Down Expand Up @@ -490,7 +497,14 @@ FIXTURE_TEST(
auto segments = setup_s3_imposter(
*this, model::offset(6), model::offset_delta(3), batch_types);
print_segments(segments);
BOOST_REQUIRE(check_fetch(*this, kafka::offset(2), false));
BOOST_REQUIRE_EXCEPTION(
check_fetch(*this, kafka::offset(2), false),
std::runtime_error,
[](const std::runtime_error& e) {
return std::string(e.what()).find(
"Failed to query spillover manifests")
!= std::string::npos;
});
BOOST_REQUIRE(check_scan(*this, kafka::offset(3), 1));
BOOST_REQUIRE(check_fetch(*this, kafka::offset(3), true));
BOOST_REQUIRE(check_scan(*this, kafka::offset(4), 0));
Expand Down Expand Up @@ -538,7 +552,14 @@ FIXTURE_TEST(
*this, model::offset(6), model::offset_delta(0), batch_types);
print_segments(segments);
for (int i = 0; i < 6; i++) {
BOOST_REQUIRE(check_fetch(*this, kafka::offset(i), false));
BOOST_REQUIRE_EXCEPTION(
check_fetch(*this, kafka::offset(i), false),
std::runtime_error,
[](const std::runtime_error& e) {
return std::string(e.what()).find(
"Failed to query spillover manifests")
!= std::string::npos;
});
}
BOOST_REQUIRE(check_scan(*this, kafka::offset(6), 1));
BOOST_REQUIRE(check_fetch(*this, kafka::offset(6), true));
Expand Down Expand Up @@ -1243,12 +1264,14 @@ FIXTURE_TEST(
vlog(test_log.debug, "Creating new reader {}", reader_config);

// After truncation reading from the old end should be impossible
auto reader = partition->make_reader(reader_config).get().reader;
auto headers_read
= reader.consume(counting_batch_consumer(100), model::no_timeout)
.get();

BOOST_REQUIRE(headers_read.size() == 0);
BOOST_REQUIRE_EXCEPTION(
partition->make_reader(reader_config).get(),
std::runtime_error,
[](const std::runtime_error& e) {
return std::string(e.what()).find(
"Failed to query spillover manifests")
!= std::string::npos;
});
}
}

Expand Down

0 comments on commit c5eb52d

Please sign in to comment.