Skip to content

Commit

Permalink
kafka: bump result of leader_epoch_last_offset to start offset
Browse files Browse the repository at this point in the history
This could previously return a value that is below the start override.
Since the semantics are to return the next-highest term if the input term
falls below the log start, this patch does just that by bumping to the
effective start offset.
  • Loading branch information
andrwng committed Jun 26, 2023
1 parent b5e94f2 commit 335a34c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,23 @@ raft::replicate_stages replicated_partition::replicate(

ss::future<std::optional<model::offset>>
replicated_partition::get_leader_epoch_last_offset(
kafka::leader_epoch epoch) const {
auto offset_unbounded = co_await get_leader_epoch_last_offset_unbounded(
epoch);
if (!offset_unbounded.has_value()) {
co_return std::nullopt;
}
if (!_partition->kafka_start_offset_override().has_value()) {
co_return offset_unbounded;
}
// If the requested term falls below our earliest consumable segment as
// bounded by a start override, return the offset of the next-highest term
// (the new start offset).
co_return std::max(offset_unbounded.value(), start_offset());
}

ss::future<std::optional<model::offset>>
replicated_partition::get_leader_epoch_last_offset_unbounded(
kafka::leader_epoch epoch) const {
const model::term_id term(epoch);
const auto first_local_offset = _partition->raft_start_offset();
Expand Down
5 changes: 5 additions & 0 deletions src/v/kafka/server/replicated_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ class replicated_partition final : public kafka::partition_proxy::impl {
return local_kafka_start_offset;
}

// Returns the highest offset in the given term, without considering
// overrides of the starting offset.
ss::future<std::optional<model::offset>>
get_leader_epoch_last_offset_unbounded(kafka::leader_epoch) const;

ss::future<std::vector<cluster::rm_stm::tx_range>>
aborted_transactions_local(
cloud_storage::offset_range,
Expand Down

0 comments on commit 335a34c

Please sign in to comment.