Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timequery returning wrong offset after trim-prefix which could lead to stuck consumers #18112

Merged
merged 8 commits into from
May 7, 2024
9 changes: 6 additions & 3 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1195,11 +1195,13 @@ remote_partition::timequery(storage::timequery_config cfg) {
co_return std::nullopt;
}

auto start_offset = stm_manifest.full_log_start_kafka_offset().value();
auto start_offset = std::max(
cfg.min_offset,
kafka::offset_cast(stm_manifest.full_log_start_kafka_offset().value()));

// Synthesize a log_reader_config from our timequery_config
storage::log_reader_config config(
kafka::offset_cast(start_offset),
start_offset,
cfg.max_offset,
0,
2048, // We just need one record batch
Expand All @@ -1222,7 +1224,8 @@ remote_partition::timequery(storage::timequery_config cfg) {
vlog(_ctxlog.debug, "timequery: {} batches", batches.size());

if (batches.size()) {
co_return storage::batch_timequery(*(batches.begin()), cfg.time);
co_return storage::batch_timequery(
*(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset);
} else {
co_return std::nullopt;
}
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ FIXTURE_TEST(test_local_timequery, e2e_fixture) {
bool expect_value = false,
std::optional<model::offset> expected_o = std::nullopt) {
auto timequery_conf = storage::timequery_config(
t, o, ss::default_priority_class(), std::nullopt);
model::offset(0), t, o, ss::default_priority_class(), std::nullopt);

auto result = partition->timequery(timequery_conf).get();

Expand Down Expand Up @@ -798,7 +798,7 @@ FIXTURE_TEST(test_cloud_storage_timequery, e2e_fixture) {
bool expect_value = false,
std::optional<model::offset> expected_o = std::nullopt) {
auto timequery_conf = storage::timequery_config(
t, o, ss::default_priority_class(), std::nullopt);
model::offset(0), t, o, ss::default_priority_class(), std::nullopt);

auto result = partition->timequery(timequery_conf).get();

Expand Down Expand Up @@ -904,7 +904,7 @@ FIXTURE_TEST(test_mixed_timequery, e2e_fixture) {
bool expect_value = false,
std::optional<model::offset> expected_o = std::nullopt) {
auto timequery_conf = storage::timequery_config(
t, o, ss::default_priority_class(), std::nullopt);
model::offset(0), t, o, ss::default_priority_class(), std::nullopt);

auto result = partition->timequery(timequery_conf).get();

Expand Down
78 changes: 41 additions & 37 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,24 @@ partition::timequery(storage::timequery_config cfg) {

const bool may_answer_from_cloud
= may_read_from_cloud()
&& _cloud_storage_partition->bounds_timestamp(cfg.time);
&& _cloud_storage_partition->bounds_timestamp(cfg.time)
&& cfg.min_offset < kafka::offset_cast(
_cloud_storage_partition->next_kafka_offset());
Comment on lines +510 to +511
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there's a good place to unit test this bug, given it was masked by higher level code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think about it. I'm proposing to remove the masking code in the next PR.


if (_raft->log()->start_timestamp() <= cfg.time) {
// The query is ahead of the local data's start_timestamp: this
// means it _might_ hit on local data: start_timestamp is not
// precise, so once we query we might still fall back to cloud
// storage
auto result = co_await local_timequery(cfg);
//
// We also need to adjust the lower bound for the local query as the
// min_offset corresponds to the full log (including tiered storage).
auto local_query_cfg = cfg;
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);
auto result = co_await local_timequery(
local_query_cfg, may_answer_from_cloud);
if (result.has_value()) {
co_return result;
} else {
Expand All @@ -526,9 +536,16 @@ partition::timequery(storage::timequery_config cfg) {
// Timestamp is before local storage but within cloud storage
co_return co_await cloud_storage_timequery(cfg);
} else {
// No cloud data: queries earlier than the start of the log
// will hit on the start of the log.
co_return co_await local_timequery(cfg);
// No cloud data OR not allowed to read from cloud: queries earlier
// than the start of the log will hit on the start of the log.
//
// Adjust the lower bound for the local query as the min_offset
// corresponds to the full log (including tiered storage).
auto local_query_cfg = cfg;
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);
co_return co_await local_timequery(local_query_cfg, false);
}
}
}
Expand All @@ -548,58 +565,46 @@ partition::cloud_storage_timequery(storage::timequery_config cfg) {
// raft log is ahead of the query timestamp or the topic is a read
// replica, so proceed to query the remote partition to try and
// find the earliest data that has timestamp >= the query time.
vlog(
clusterlog.debug,
"timequery (cloud) {} t={} max_offset(k)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset);
vlog(clusterlog.debug, "timequery (cloud) {} cfg(k)={}", _raft->ntp(), cfg);

// remote_partition pre-translates offsets for us, so no call into
// the offset translator here
auto result = co_await _cloud_storage_partition->timequery(cfg);
if (result.has_value()) {
vlog(
clusterlog.debug,
"timequery (cloud) {} t={} max_offset(r)={} result(r)={}",
"timequery (cloud) {} cfg(k)={} result(k)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset,
cfg,
result->offset);
}

co_return result;
}

ss::future<std::optional<storage::timequery_result>>
partition::local_timequery(storage::timequery_config cfg) {
vlog(
clusterlog.debug,
"timequery (raft) {} t={} max_offset(k)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset);
ss::future<std::optional<storage::timequery_result>> partition::local_timequery(
storage::timequery_config cfg, bool allow_cloud_fallback) {
vlog(clusterlog.debug, "timequery (raft) {} cfg(k)={}", _raft->ntp(), cfg);

cfg.min_offset = _raft->log()->to_log_offset(cfg.min_offset);
cfg.max_offset = _raft->log()->to_log_offset(cfg.max_offset);

auto result = co_await _raft->timequery(cfg);
vlog(clusterlog.debug, "timequery (raft) {} cfg(r)={}", _raft->ntp(), cfg);

const bool may_answer_from_cloud
= may_read_from_cloud()
&& _cloud_storage_partition->bounds_timestamp(cfg.time);
auto result = co_await _raft->timequery(cfg);

if (result.has_value()) {
if (may_answer_from_cloud) {
if (allow_cloud_fallback) {
// We need to test for cases in which we will fall back to querying
// cloud storage.
if (_raft->log()->start_timestamp() > cfg.time) {
// Query raced with prefix truncation
vlog(
clusterlog.debug,
"timequery (raft) {} ts={} raced with truncation "
"timequery (raft) {} cfg(r)={} raced with truncation "
"(start_timestamp {}, result {})",
_raft->ntp(),
cfg.time,
cfg,
_raft->log()->start_timestamp(),
result->time);
co_return std::nullopt;
Expand All @@ -618,11 +623,11 @@ partition::local_timequery(storage::timequery_config cfg) {
// https://github.com/redpanda-data/redpanda/issues/9669
vlog(
clusterlog.debug,
"Timequery (raft) {} ts={} miss on local log "
"Timequery (raft) {} cfg(r)={} miss on local log "
"(start_timestamp "
"{}, result {})",
_raft->ntp(),
cfg.time,
cfg,
_raft->log()->start_timestamp(),
result->time);
co_return std::nullopt;
Expand All @@ -635,15 +640,15 @@ partition::local_timequery(storage::timequery_config cfg) {
// have the same timestamp and are present in cloud storage.
vlog(
clusterlog.debug,
"Timequery (raft) {} ts={} hit start_offset in local log "
"Timequery (raft) {} cfg(r)={} hit start_offset in local log "
"(start_offset {} start_timestamp {}, result {})",
_raft->ntp(),
cfg,
_raft->log()->offsets().start_offset,
cfg.time,
_raft->log()->start_timestamp(),
cfg.time);

if (may_answer_from_cloud) {
if (allow_cloud_fallback) {
// Even though we hit data with the desired timestamp, we
// cannot be certain that this is the _first_ batch with
// the desired timestamp: return null so that the caller
Expand All @@ -654,10 +659,9 @@ partition::local_timequery(storage::timequery_config cfg) {

vlog(
clusterlog.debug,
"timequery (raft) {} t={} max_offset(r)={} result(r)={}",
"timequery (raft) {} cfg(r)={} result(r)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset,
cfg,
result->offset);
result->offset = _raft->log()->from_log_offset(result->offset);
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
bool may_read_from_cloud() const;

ss::future<std::optional<storage::timequery_result>>
local_timequery(storage::timequery_config);
local_timequery(storage::timequery_config, bool allow_cloud_fallback);

consensus_ptr _raft;
ss::shared_ptr<cluster::log_eviction_stm> _log_eviction_stm;
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
kafka_partition->leader_epoch());
}
auto res = co_await kafka_partition->timequery(storage::timequery_config{
kafka_partition->start_offset(),
timestamp,
offset,
model::prev_offset(offset),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes, nice catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what the implications of this are. Presumably if the offset doesn't exist, the timequery will just return early, and if it does exist, it won't affect the result of a timequery. Is that right?

Maybe a good idea to add any implications (or lackthereof) into the commit message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if it does exist, it won't affect the result of a timequery. Is that right?

You mean prior to this commit? It would have returned a potentially non-committed offset even with acks=all. Normally, redpanda doesn't expose such offsets unless write caching is in use. Fetch would have failed with out of range too, causing the consumer to reset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka: fix off-by-one in timequery
Not easy to test that this is right so not going to for now.

@nvartolomei please include in commit message more detail. the commit by inspection implies an off-by-one error, but there are critically useful pieces of information a reader in the future may want. for example, how was it discovered, what are symptoms of it being wrong, etc...

kafka_read_priority(),
{model::record_batch_type::raft_data},
octx.rctx.abort_source().local()});
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ replicated_partition::timequery(storage::timequery_config cfg) {
if (batches.empty()) {
co_return std::nullopt;
}
co_return storage::batch_timequery(*(batches.begin()), cfg.time);
co_return storage::batch_timequery(
*(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset);
}

ss::future<result<model::offset>> replicated_partition::replicate(
Expand Down
80 changes: 80 additions & 0 deletions src/v/model/offset_interval.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 Redpanda Data, Inc.
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/vassert.h"
#include "model/fundamental.h"

namespace model {
/// A non-empty, bounded, closed interval of offsets [min offset, max offset].
///
/// This property helps to simplify the logic and the instructions required to
/// check for overlaps, containment, etc. It is the responsibility of the caller
/// to ensure these properties hold before constructing an instance of this
/// class.
///
/// To represent a potentially empty range, wrap it in an optional.
class bounded_offset_interval {
public:
static bounded_offset_interval
unchecked(model::offset min, model::offset max) noexcept {
return {min, max};
}

static bounded_offset_interval
checked(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
throw std::invalid_argument(fmt::format(
"Invalid arguments for constructing a non-empty bounded offset "
"interval: min({}) <= max({})",
min,
max));
}

return {min, max};
}
Comment on lines +26 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe consider create() and create_checked()? Or create_unchecked() and create() if the idea is to make checked be the default and only expose unchecked for the noexcept? "checked" doesn't quite convey that this is a constructor IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll continue iterating on this if we are to adopt it in more places. I intentionally named them unchecked/checked so that the name communicates to the caller that there is a "cost" involved. ::create/constructor would make it too easy to choose the "expensive" variant.

I'm still not sure whether the checked/unchecked are enough as constructor variants or whether they should be 2 separate structs and "propagate" the checked/uncheck property as a data structure "attribute". I.e. that all operations on that struct should be checked.

Deferring this for now hoping that it will become clear when additional use-cases appear.


inline bool overlaps(const bounded_offset_interval& other) const noexcept {
return _min <= other._max && _max >= other._min;
}

inline bool contains(model::offset o) const noexcept {
Comment on lines +44 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline is unnecessary here. it's implied when the definition is defined in the class like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

return _min <= o && o <= _max;
}

friend std::ostream&
operator<<(std::ostream& o, const bounded_offset_interval& r) {
fmt::print(o, "{{min: {}, max: {}}}", r._min, r._max);
return o;
}

inline model::offset min() const noexcept { return _min; }
inline model::offset max() const noexcept { return _max; }

private:
bounded_offset_interval(model::offset min, model::offset max) noexcept
: _min(min)
, _max(max) {
#ifndef NDEBUG
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it impactful enough to performance to disable the check?

Copy link
Contributor Author

@nvartolomei nvartolomei Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention is to use this in hot-loops and I made the assumption that it will be impactful enough.

E.g. I want checks like the one in

https://github.com/redpanda-data/redpanda/pull/18112/files#diff-adc3b4388ce88f3bd5b5de62a42ebc298737e9da94fa94ea9cde18342e246435R48

or

if (record_t >= t && query_interval.contains(record_o)) {

to be compiled to nothing more than a single if statement that one would write manually.

The bounded_offset_interval::checked "static-constructor" always checks the invariants. The ones in constructor are redundant in this case. They are relevant for the ::unchecked construction however which is meant to be used in hot loops

checked(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
throw std::invalid_argument(fmt::format(
"Invalid arguments for constructing a non-empty bounded offset "
"interval: min({}) <= max({})",
min,
max));
}
return {min, max};
}

vassert(
min >= model::offset(0), "Offset interval min({}) must be >= 0", min);
vassert(
min <= max,
"Offset interval invariant not satisfied: min({}) <= max({})",
min,
max);
#endif
}

model::offset _min;
model::offset _max;
};

} // namespace model
9 changes: 7 additions & 2 deletions src/v/model/tests/random_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ make_random_batch(model::offset o, int num_records, bool allow_compression) {

model::record_batch make_random_batch(record_batch_spec spec) {
auto ts = spec.timestamp.value_or(model::timestamp::now());
auto max_ts = model::timestamp(ts() + spec.count - 1);
auto max_ts = ts;
if (!spec.all_records_have_same_timestamp) {
max_ts = model::timestamp(ts() + spec.count - 1);
}
auto header = model::record_batch_header{
.size_bytes = 0, // computed later
.base_offset = spec.offset,
Expand Down Expand Up @@ -235,7 +238,9 @@ make_random_batches(record_batch_spec spec) {
auto num_records = spec.records ? *spec.records : get_int(2, 30);
auto batch_spec = spec;
batch_spec.timestamp = ts;
ts = model::timestamp(ts() + num_records);
if (!batch_spec.all_records_have_same_timestamp) {
ts = model::timestamp(ts() + num_records);
}
batch_spec.offset = o;
batch_spec.count = num_records;
if (spec.enable_idempotence) {
Expand Down
1 change: 1 addition & 0 deletions src/v/model/tests/random_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct record_batch_spec {
bool is_transactional{false};
std::optional<std::vector<size_t>> record_sizes;
std::optional<model::timestamp> timestamp;
bool all_records_have_same_timestamp{false};
};

model::record make_random_record(int, iobuf);
Expand Down
6 changes: 4 additions & 2 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2335,7 +2335,8 @@ disk_log_impl::make_reader(timequery_config config) {
vassert(!_closed, "make_reader on closed log - {}", *this);
return _lock_mngr.range_lock(config).then(
[this, cfg = config](std::unique_ptr<lock_manager::lease> lease) {
auto start_offset = _start_offset;
auto start_offset = cfg.min_offset;
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved

if (!lease->range.empty()) {
const ss::lw_shared_ptr<segment>& segment = *lease->range.begin();
std::optional<segment_index::entry> index_entry = std::nullopt;
Expand Down Expand Up @@ -2450,7 +2451,8 @@ disk_log_impl::timequery(timequery_config cfg) {
if (
!batches.empty()
&& batches.front().header().max_timestamp >= cfg.time) {
return ret_t(batch_timequery(batches.front(), cfg.time));
return ret_t(batch_timequery(
batches.front(), cfg.min_offset, cfg.time, cfg.max_offset));
}
return ret_t();
});
Expand Down
Loading
Loading