Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] [CORE-6807] kafka: change offset_out_of_range condition in replicated_partition::prefix_truncate() #22945

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,11 @@ replicated_partition::get_leader_epoch_last_offset_unbounded(
ss::future<error_code> replicated_partition::prefix_truncate(
model::offset kafka_truncation_offset,
ss::lowres_clock::time_point deadline) {
if (
kafka_truncation_offset <= start_offset()
|| kafka_truncation_offset > high_watermark()) {
if (kafka_truncation_offset <= start_offset()) {
// No-op, return early.
co_return kafka::error_code::none;
}
if (kafka_truncation_offset > high_watermark()) {
co_return error_code::offset_out_of_range;
}
model::offset rp_truncate_offset{};
Expand Down
67 changes: 66 additions & 1 deletion tests/rptest/clients/kafka_cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from rptest.clients.types import TopicSpec
import json
from ducktape.utils.util import wait_until
from typing import Any, Optional, Sequence, cast
from typing import Any, Optional, Sequence, cast, NamedTuple
import os

from rptest.services.keycloak import OAuthConfig
Expand Down Expand Up @@ -52,6 +52,13 @@ def __str__(self):
return self.msg


class KafkaDeleteOffsetsResponse(NamedTuple):
topic: str
partition: int
new_start_offset: int
error_msg: str


class KafkaCliTools:
"""
Wrapper around the Kafka admin command line tools.
Expand Down Expand Up @@ -330,6 +337,64 @@ def produce(self,
cmd += ["--producer.config", self._command_config.name]
return self._execute(cmd, "produce")

def delete_records(self, offsets: dict[str, Any]):
"""
`offsets` is of expected form e.g.:
{"partitions":
[{"topic": "topic_name",
"partition": 0,
"offset": 10},
{"topic": "topic_name",
"partition": 1,
"offset": 15}
],
"version": 1
}
"""

self._redpanda.logger.debug(f"Delete records with json {offsets}")
assert "version" in offsets
assert offsets["version"] == 1
assert "partitions" in offsets

json_file = tempfile.NamedTemporaryFile(mode="w", delete=False)
output = None

try:
json.dump(offsets, json_file)
json_file.close()
args: list[str] = ["--offset-json-file", json_file.name]
output = self._run("kafka-delete-records.sh",
args,
desc="delete_records")
finally:
os.remove(json_file.name)

result_list: list[KafkaDeleteOffsetsResponse] = []

split_str = output.split('\n')
assert split_str[-1] == ""
partition_watermark_lines = split_str[2:-1]
for partition_watermark_line in partition_watermark_lines:
topic_partition_str, result_str = partition_watermark_line.strip(
).split('\t')
topic_partition_str_split = topic_partition_str.split(
"partition: ")[1]
topic_partition_split_index = topic_partition_str_split.rfind('-')
topic_str = topic_partition_str_split[:topic_partition_split_index]
partition_str = topic_partition_str_split[
topic_partition_split_index:]
new_start_offset = -1
error_msg = ""
if "error" in result_str:
error_msg = result_str.split("error: ")[1]
elif "low_watermark" in result_str:
new_start_offset = int(result_str.split("low_watermark: ")[1])
result_list.append(
KafkaDeleteOffsetsResponse(topic_str, int(partition_str),
new_start_offset, error_msg))
return result_list

def list_acls(self):
args = ["--list"]
return self._run("kafka-acls.sh", args, desc="list_acls")
Expand Down
Loading
Loading