Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: fix cloud storage support for delete-records #11579

Merged
merged 22 commits into from
Jun 30, 2023

Conversation

andrwng
Copy link
Contributor

@andrwng andrwng commented Jun 21, 2023

The previous implementation of the _start_kafka_offset relied on the fact that delete-record requests would fail when called above the max collectible offset. This made it impossible for an offset to be persisted in the manifest that was outside the cloud range. While somewhat easier to reason about, it meant that cloud storage could "forget" about a delete-record request, e.g. if receiving a request on data that has not yet been uploaded. With this PR, such a request will now land in the archival metadata stm, and the cloud storage layer is updated to handle such overrides.

Rather than trying to account for the _start_kafka_offset override throughout cloud storage, this PR updates the handling of various start-offset-type methods to ignore the override entirely, instead relying on callers (e.g. the Kafka layer) to consider overrides. A side effect of this is that some cloud storage APIs are updated to be able to take into account a kafka offset lower bound (e.g. timequery).

This PR includes the following changes:

  • In cloud storage, we will now rarely consider _start_kafka_offset, only applying retention based on it, but otherwise omitting it from calculations of start offset.
  • Instead, start offset is computed at in the Kafka layer by bumping up to the _start_kafka_offset if the would-be start offset is too low.
  • The archival_metadata_stm now listens for prefix_truncate batches, so delete-records can be done in parallel between log_eviction_stm and archival_metadata_stm.
  • Fixes for handling of the start offset override on read-replicas.
  • Bumping of timequeries up to the start override if a result is too low.
  • Bumping of offset_for_leader_epoch requests up to the start override if a result is too low.
  • Some test utilities for getting offsets via the Kafka API.
  • Some miscellaneous smaller fixes.
  • A couple of short ducktape tests for recovery and read replicas (by no means exhaustive, more will follow).

A previous iteration of this PR had more tests for offset-based retention, but it was discovered that retention doesn't quite work, so I backed them out until fixed so this can be merged sooner.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.1.x
  • v22.3.x
  • v22.2.x

Release Notes

  • none

src/v/cluster/partition.h Outdated Show resolved Hide resolved
src/v/cluster/partition.h Outdated Show resolved Hide resolved
src/v/cluster/partition.cc Outdated Show resolved Hide resolved
@@ -174,7 +175,7 @@ log_eviction_stm::sync_effective_start(model::timeout_clock::duration timeout) {
co_return errc::timeout;
}
}
co_return effective_start_offset();
co_return model::next_offset(_delete_records_eviction_offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious why this line changes: the difference is that the call to effective_start_offset() takes account of the last snapshot offset, whereas this doesn't, right?

Maybe the last snapshot offset is applied elsewhere after these changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is now going to be clamped at replicated_partition to account for the partition log's full log.

src/v/cluster/partition.cc Outdated Show resolved Hide resolved
src/v/cluster/partition.cc Outdated Show resolved Hide resolved
model::timestamp ts{};

// Optional inclusive lower bound on the Kafka offset.
kafka::offset start_offset{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this part is needed: why clamp this inside the cloud storage layer? If the cloud storage timequery returns an offset behind the partition's start kafka offset, I would have thought we could have the Kafka layer advance to the start offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I guess it depends on how we want to handle out-of-order timestamps. I suppose in the normal case, the segments will likely be monotonically increasing in time ranges. I took the "assume nothing" approach and forced the search to begin at the override offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

That's true, although we promise the user very little when it comes to out of order timestamps, so I'd recommend going with whatever makes implementation cleanest/simplest.

src/v/cloud_storage/partition_manifest.cc Outdated Show resolved Hide resolved
src/v/cluster/log_eviction_stm.h Outdated Show resolved Hide resolved
sync_effective_start(model::timeout_clock::duration timeout);
sync_kafka_start_offset_override(model::timeout_clock::duration timeout);

model::offset kafka_start_offset_override() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could name this get_delete_records_eviction_offset()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a clearer name, honestly. One of the surprising things I found is delete_records_eviction_offset is not the new start offset, but actually one before the new start, hence the next_offset() calls.

src/v/cluster/partition.h Show resolved Hide resolved
src/v/kafka/server/replicated_partition.cc Show resolved Hide resolved
src/v/kafka/server/replicated_partition.cc Show resolved Hide resolved
tests/rptest/tests/read_replica_e2e_test.py Show resolved Hide resolved
src/v/cluster/archival_metadata_stm.cc Outdated Show resolved Hide resolved
@andrwng andrwng force-pushed the delete-records-cs branch 4 times, most recently from b7c7e07 to 41bdcf6 Compare June 22, 2023 19:01
@andrwng
Copy link
Contributor Author

andrwng commented Jun 22, 2023

The last two force pushes were just rebases.

I need to adjust the behavior of truncation of the STM manifest.

@andrwng
Copy link
Contributor Author

andrwng commented Jun 23, 2023

Changes still on-going, just rebasing so others can begin testing on this branch.

@andrwng andrwng force-pushed the delete-records-cs branch from 93491d5 to 335a34c Compare June 26, 2023 15:07
@andrwng
Copy link
Contributor Author

andrwng commented Jun 26, 2023

Latest push has functional fixes in my branch. Still working on tests for them.

@andrwng andrwng force-pushed the delete-records-cs branch from 335a34c to a00017f Compare June 29, 2023 15:26
@andrwng
Copy link
Contributor Author

andrwng commented Jun 29, 2023

This is still missing tests for timequery and get_leader_epoch_last_offset, but pushing to get it reviewed. More tests can be added later

@andrwng
Copy link
Contributor Author

andrwng commented Jun 29, 2023

This also includes #11772 (IIUC Vlad's taking it up). May have to wait on that to settle and merge before merging this

@andrwng andrwng force-pushed the delete-records-cs branch 3 times, most recently from 7c30ac2 to a3ce118 Compare June 30, 2023 09:14
@andrwng
Copy link
Contributor Author

andrwng commented Jun 30, 2023

Latest couple of pushes have been to address C++ test failures.

andrwng added 13 commits June 30, 2023 08:08
Read replicas don't have the proper state to translate.
Adds a test for syncing read replicas. This happens by virtue of us
swapping out the manifest wholesale on read replicas.
Similar to other test utilities, this hides most of the kafka protocol
from test authors who want to use the list_offsets API.
The start offset could return a higher value than the HWM on read
replicas, since their HWM is computed with remote segments, but a start
override may persist to the manifest before uploading the corresponding
segments.

This patch clamps this at the Kafka layer.
Read replicas should only support immutable operations.
We previously accounted for the start override by incorporating it into
the start offset at the time of applying update_start_kafka_offset
batch. Now that this has changed, we need to apply retention on the STM
region of the manifest.

This commit does so by introducing a new retention strategy to the
retention calculator class.
This adds a test for consuming from cloud storage after delete-records.
This is helpful for testing timequeries.
The cluster::partition abstraction is mostly unaware of Kafka start
overrides when it comes to fetching data. As such, a timequery may
return a result that falls below the start override.

To avoid returning an unqueryable offset, this patch updates the
behavior to check the timequery result against the override, and
performs a read at the override offset if the timequery result was too
low.
The following race is possible:
1. Transactional data is produced and aborted.
2. The fetch path reads data batches corresponding to the aborted data.
3. All data in cloud is truncated and GCed.
4. The fetch path looks for aborted transactions corresponding to read
   data, but finds none.
5. The fetch path returns to the client with aborted transactional data,
   but an empty list of aborted transactions.
6. Applications see aborted transactions.

This patch mitigates this by checking for changes in the start offset
before returning with transactional data. If the returned data includes
any data below the start offset as of step 5, it instead returns
offset_out_of_range.
This could previously return a value that is below the start override.
Since the semantics are to return the next-highest term if the input term
falls below the log start, this patch does just that by bumping to the
effective start offset.
@andrwng
Copy link
Contributor Author

andrwng commented Jun 30, 2023

Latest force push was a rebase to account for the new read replica changes, and a flaky fix for the new recovery test

Adds a couple basic tests for delete-records requests:
- that they persist after topic recovery
- that they are honored on read replicas
@andrwng andrwng force-pushed the delete-records-cs branch from 8d47f1f to f1368e5 Compare June 30, 2023 15:23
@andrwng
Copy link
Contributor Author

andrwng commented Jun 30, 2023

Latest force push: lint

Copy link
Contributor

@graphcareful graphcareful left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super good job, well thought out, just left some questions

_raft->ntp(),
truncation_offset,
kafka_truncation_offset);
if (truncation_offset != model::offset{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is truncation_offset is == model::offset{} ? Shouldn't we just return OOR? FWIW i see this is later removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's {} if we couldn't translate it, because the kafka offset fell below the local log.

co_return err;
}
}
// TODO(awong): have archival listen on log_eviction_stm.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this behavior modified in subsequent commits, maybe you can omit it altogether or mention in the commit message how this is temporary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't scrub the history here too hard, given it progressed somewhat incrementally.

@@ -103,8 +103,9 @@ class partition {
// The eviction STM only keeps track of DeleteRecords truncations
// as Raft offsets. Translate if possible.
auto offset_translator_state = get_offset_translator_state();
if (offset_res.value() != model::offset{} &&
_raft->start_offset() < offset_res.value()) {
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Formatting made diff

@@ -275,34 +281,58 @@ ss::future<std::error_code> log_eviction_stm::replicate_command(
}

ss::future<> log_eviction_stm::apply(model::record_batch batch) {
if (batch.header().type != model::record_batch_type::prefix_truncate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-add likely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in a follow-up.

// other STMs can honor it (e.g. archival).
vlog(
_logger.info,
"Replicated prefix_truncate batch for {} with no redpanda offset. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd mention local no local redpanda offset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in a follow-up.

kafka_truncation_offset, deadline, _as);
// The archival metadata stm also listens for prefix_truncate batches.
auto applied = co_await _archival_meta_stm->wait_no_throw(
_raft->committed_offset(), deadline, _as);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wait on raft committed? I would assume log_eviction_stm->truncate to return the offset that replicate returned, and that can be used as the wait offset, is it because log_eviction_stm->truncate also performs the wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point. This was mostly because I didn't feel like doing the plumbing, but looking at it, it shouldn't be too bad.

May do this in a follow-up.

@@ -52,6 +52,9 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) {
return ctx.authorized(security::acl_operation::remove, t.name);
};
const auto is_deletable = [](const cluster::topic_configuration& cfg) {
if (cfg.is_read_replica()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicee , I didn't think of this +1

// If the requested term falls below our earliest consumable segment as
// bounded by a start override, return the offset of the next-highest term
// (the new start offset).
co_return std::max(offset_unbounded.value(), start_offset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piyushredpanda piyushredpanda merged commit ca7b5ef into redpanda-data:dev Jun 30, 2023
nvartolomei added a commit to nvartolomei/redpanda that referenced this pull request Apr 30, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
redpanda-data#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in redpanda-data#18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/
nvartolomei added a commit to nvartolomei/redpanda that referenced this pull request Apr 30, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
redpanda-data#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in redpanda-data#18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/
nvartolomei added a commit to nvartolomei/redpanda that referenced this pull request Apr 30, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
redpanda-data#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in redpanda-data#18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/
nvartolomei added a commit to nvartolomei/redpanda that referenced this pull request Apr 30, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
redpanda-data#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in redpanda-data#18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/
nvartolomei added a commit to nvartolomei/redpanda that referenced this pull request Apr 30, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
redpanda-data#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in redpanda-data#18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/
vbotbuildovich pushed a commit to vbotbuildovich/redpanda that referenced this pull request May 7, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
redpanda-data#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in redpanda-data#18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/

(cherry picked from commit 76a1ea2)
nvartolomei added a commit that referenced this pull request May 21, 2024
kafka ListOffsets request allows to query offsets by timestamps. The
result of such a request is the first kafka offset in the log that has a
timestamp equal to or greater than the requested timestamp. Or, -1 if no
such record can be found.

The implementation we have today assumes that the start of the physical
log matches the start of the log as it is seen by external users/kafka
clients.

However, this is not always true. In particular, when [trim-prefix][1]
(prefix truncation) is used. There are 2 sources of problems:

  1) trim-prefix is synchronously applied at cluster layer where it
     changes the visibility of the log from the client point-of-view,
     but it is asynchronously applied to the consensus log/physical log/
     disk_log_impl class, cloud storage.

  2) trim-prefix is executed with an offset of a record that in in the
     middle of a batch.

As a result, in these scenarios, if the clients sends a kafka Fetch
request with the received offset they'll be replied with
OffsetOutOfRange error.

This commit changes such queries are implemented at the lower levels
of the system by carrying the information about the visible start and
end of the log together with the timestamp. Then, at the lower levels,
we use these to limit our search only to that range.

Although this commit does not change the implementation of the tiered
storage timequery it does fix the trim-prefix problem there too in the
general case because of check and "bump" added in
#11579.

Tiered Storage timequeries have some additional problems which I plan to
address in #18097.

[1]: https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/

(cherry picked from commit 76a1ea2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants