-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix timequery returning wrong offset after trim-prefix which could lead to stuck consumers #18112
Fix timequery returning wrong offset after trim-prefix which could lead to stuck consumers #18112
Conversation
/dt |
This comment was marked as outdated.
This comment was marked as outdated.
d880351
to
21d876c
Compare
ed92b11
to
7121fa8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great find!
Probably worth updating the PR title to reflect that this is a bug fix
Also should this be backported to 23.2 as well? IIRC DeleteRecords support landed then
7121fa8
to
383804b
Compare
518d7fd
to
fe96ed2
Compare
Let's see what CI says. |
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48489#018f2fe3-c4cc-4176-acb7-f136b4e36f1f |
9a01673
to
2369538
Compare
|
2369538
to
ed19202
Compare
|
548ca53
to
0da4cf9
Compare
Last 2 force-pushes fixes some typos in text. |
Encapsulates common operations on offset intervals. For now, although it is named bounded, the maximum offset can still be set to `model::offset::max()`. I will likely change this in the future as it requires changing quite a bit of call sites, most likely only tests. This little data structure tries to be very light weight and impose minimum overhead on basic interval operations like intersection or inclusion. It is also quite hard to use it incorrectly due to the checked construction variant and debug assertions. Later, we might want to refactor things like log_reader to use this instead of min and max offsets like they do today. Once that is done, the checked variant needs to be called only once at the kafka layer. For everyone else it becomes a ~0 cost abstraction.
kafka ListOffsets request allows to query offsets by timestamps. The result of such a request is the first kafka offset in the log that has a timestamp equal to or greater than the requested timestamp. Or, -1 if no such record can be found. The implementation we have today assumes that the start of the physical log matches the start of the log as it is seen by external users/kafka clients. However, this is not always true. In particular, when [trim-prefix][1] (prefix truncation) is used. There are 2 sources of problems: 1) trim-prefix is synchronously applied at cluster layer where it changes the visibility of the log from the client point-of-view, but it is asynchronously applied to the consensus log/physical log/ disk_log_impl class, cloud storage. 2) trim-prefix is executed with an offset of a record that in in the middle of a batch. As a result, in these scenarios, if the clients sends a kafka Fetch request with the received offset they'll be replied with OffsetOutOfRange error. This commit changes such queries are implemented at the lower levels of the system by carrying the information about the visible start and end of the log together with the timestamp. Then, at the lower levels, we use these to limit our search only to that range. Although this commit does not change the implementation of the tiered storage timequery it does fix the trim-prefix problem there too in the general case because of check and "bump" added in redpanda-data#11579. Tiered Storage timequeries have some additional problems which I plan to address in redpanda-data#18097. [1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/
Previous code contained a bug which is masked by the retry logic in replicated partition: const auto kafka_start_override = _partition->kafka_start_offset_override(); if ( !kafka_start_override.has_value() || kafka_start_override.value() <= res.value().offset) { // The start override doesn't affect the result of the timequery. co_return res; } vlog( klog.debug, "{} timequery result {} clamped by start override, fetching result at " "start {}", ntp(), res->offset, kafka_start_override.value()); ...
Not easy to test that this is right so not going to for now.
0da4cf9
to
8f2de96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some unit testing would be nice for some of the lower level functions, but otherwise this pretty much LGTM
static bounded_offset_interval | ||
unchecked(model::offset min, model::offset max) noexcept { | ||
return {min, max}; | ||
} | ||
|
||
static bounded_offset_interval | ||
checked(model::offset min, model::offset max) { | ||
if (min < model::offset(0) || max < model::offset(0) || min > max) { | ||
throw std::invalid_argument(fmt::format( | ||
"Invalid arguments for constructing a non-empty bounded offset " | ||
"interval: min({}) <= max({})", | ||
min, | ||
max)); | ||
} | ||
|
||
return {min, max}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe consider create()
and create_checked()
? Or create_unchecked()
and create()
if the idea is to make checked be the default and only expose unchecked for the noexcept? "checked" doesn't quite convey that this is a constructor IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll continue iterating on this if we are to adopt it in more places. I intentionally named them unchecked/checked so that the name communicates to the caller that there is a "cost" involved. ::create
/constructor would make it too easy to choose the "expensive" variant.
I'm still not sure whether the checked/unchecked are enough as constructor variants or whether they should be 2 separate structs and "propagate" the checked/uncheck property as a data structure "attribute". I.e. that all operations on that struct should be checked.
Deferring this for now hoping that it will become clear when additional use-cases appear.
&& cfg.min_offset < kafka::offset_cast( | ||
_cloud_storage_partition->next_kafka_offset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if there's a good place to unit test this bug, given it was masked by higher level code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will think about it. I'm proposing to remove the masking code in the next PR.
# @parametrize(cloud_storage=True, spillover=False) | ||
# @parametrize(cloud_storage=True, spillover=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will re-enable this in the next PR #18097
@@ -132,7 +132,7 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |||
auto res = co_await kafka_partition->timequery(storage::timequery_config{ | |||
kafka_partition->start_offset(), | |||
timestamp, | |||
offset, | |||
model::prev_offset(offset), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes, nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering what the implications of this are. Presumably if the offset
doesn't exist, the timequery will just return early, and if it does exist, it won't affect the result of a timequery. Is that right?
Maybe a good idea to add any implications (or lackthereof) into the commit message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and if it does exist, it won't affect the result of a timequery. Is that right?
You mean prior to this commit? It would have returned a potentially non-committed offset even with acks=all. Normally, redpanda doesn't expose such offsets unless write caching is in use. Fetch would have failed with out of range too, causing the consumer to reset.
Merging this to unblock #18097. Will address comments as follow ups. |
/backport v24.1.x |
/backport v23.3.x |
Failed to create a backport PR to v23.3.x branch. I tried:
|
* //-------+-------------+------------+-------------+-------// | ||
* \\...9 | 10...14 | 15..15 | 16.....22 | 23...\\ | ||
* //-------+-------------+------------+-------------+-------// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is awesome
inline bool overlaps(const bounded_offset_interval& other) const noexcept { | ||
return _min <= other._max && _max >= other._min; | ||
} | ||
|
||
inline bool contains(model::offset o) const noexcept { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline is unnecessary here. it's implied when the definition is defined in the class like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL
bounded_offset_interval(model::offset min, model::offset max) noexcept | ||
: _min(min) | ||
, _max(max) { | ||
#ifndef NDEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it impactful enough to performance to disable the check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention is to use this in hot-loops and I made the assumption that it will be impactful enough.
E.g. I want checks like the one in
or
redpanda/src/v/storage/log_reader.cc
Line 558 in 76a1ea2
if (record_t >= t && query_interval.contains(record_o)) { |
to be compiled to nothing more than a single if statement that one would write manually.
The bounded_offset_interval::checked
"static-constructor" always checks the invariants. The ones in constructor are redundant in this case. They are relevant for the ::unchecked
construction however which is meant to be used in hot loops
redpanda/src/v/model/offset_interval.h
Lines 32 to 42 in f13bfa6
checked(model::offset min, model::offset max) { | |
if (min < model::offset(0) || max < model::offset(0) || min > max) { | |
throw std::invalid_argument(fmt::format( | |
"Invalid arguments for constructing a non-empty bounded offset " | |
"interval: min({}) <= max({})", | |
min, | |
max)); | |
} | |
return {min, max}; | |
} |
@@ -132,7 +132,7 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |||
auto res = co_await kafka_partition->timequery(storage::timequery_config{ | |||
kafka_partition->start_offset(), | |||
timestamp, | |||
offset, | |||
model::prev_offset(offset), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka: fix off-by-one in timequery
Not easy to test that this is right so not going to for now.
@nvartolomei please include in commit message more detail. the commit by inspection implies an off-by-one error, but there are critically useful pieces of information a reader in the future may want. for example, how was it discovered, what are symptoms of it being wrong, etc...
The fix works only if compression is not in use. We need follow-up work which would decompress the batches to find the exact offset to return, or (!) we could to prevent trim-offset inside a batch in that case.
Backports Required
Release Notes
Bug Fixes