diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 0840eeb6cf6b..721bb34fcc2d 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -486,9 +486,11 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( ss::future replicated_partition::prefix_truncate( model::offset kafka_truncation_offset, ss::lowres_clock::time_point deadline) { - if ( - kafka_truncation_offset <= start_offset() - || kafka_truncation_offset > high_watermark()) { + if (kafka_truncation_offset <= start_offset()) { + // No-op, return early. + co_return kafka::error_code::none; + } + if (kafka_truncation_offset > high_watermark()) { co_return error_code::offset_out_of_range; } model::offset rp_truncate_offset{}; diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index fc30db04706b..8ec3a6254719 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -13,7 +13,7 @@ from rptest.clients.types import TopicSpec import json from ducktape.utils.util import wait_until -from typing import Any, Optional, Sequence, cast +from typing import Any, Optional, Sequence, cast, NamedTuple import os from rptest.services.keycloak import OAuthConfig @@ -52,6 +52,13 @@ def __str__(self): return self.msg +class KafkaDeleteOffsetsResponse(NamedTuple): + topic: str + partition: int + new_start_offset: int + error_msg: str + + class KafkaCliTools: """ Wrapper around the Kafka admin command line tools. @@ -330,6 +337,64 @@ def produce(self, cmd += ["--producer.config", self._command_config.name] return self._execute(cmd, "produce") + def delete_records(self, offsets: dict[str, Any]): + """ + `offsets` is of expected form e.g.: + {"partitions": + [{"topic": "topic_name", + "partition": 0, + "offset": 10}, + {"topic": "topic_name", + "partition": 1, + "offset": 15} + ], + "version": 1 + } + """ + + self._redpanda.logger.debug(f"Delete records with json {offsets}") + assert "version" in offsets + assert offsets["version"] == 1 + assert "partitions" in offsets + + json_file = tempfile.NamedTemporaryFile(mode="w", delete=False) + output = None + + try: + json.dump(offsets, json_file) + json_file.close() + args: list[str] = ["--offset-json-file", json_file.name] + output = self._run("kafka-delete-records.sh", + args, + desc="delete_records") + finally: + os.remove(json_file.name) + + result_list: list[KafkaDeleteOffsetsResponse] = [] + + split_str = output.split('\n') + assert split_str[-1] == "" + partition_watermark_lines = split_str[2:-1] + for partition_watermark_line in partition_watermark_lines: + topic_partition_str, result_str = partition_watermark_line.strip( + ).split('\t') + topic_partition_str_split = topic_partition_str.split( + "partition: ")[1] + topic_partition_split_index = topic_partition_str_split.rfind('-') + topic_str = topic_partition_str_split[:topic_partition_split_index] + partition_str = topic_partition_str_split[ + topic_partition_split_index:] + new_start_offset = -1 + error_msg = "" + if "error" in result_str: + error_msg = result_str.split("error: ")[1] + elif "low_watermark" in result_str: + new_start_offset = int(result_str.split("low_watermark: ")[1]) + result_list.append( + KafkaDeleteOffsetsResponse(topic_str, int(partition_str), + new_start_offset, error_msg)) + return result_list + def list_acls(self): args = ["--list"] return self._run("kafka-acls.sh", args, desc="list_acls") diff --git a/tests/rptest/tests/delete_records_test.py b/tests/rptest/tests/delete_records_test.py index 3da8ea6e8b52..fa0c44350b0f 100644 --- a/tests/rptest/tests/delete_records_test.py +++ b/tests/rptest/tests/delete_records_test.py @@ -14,20 +14,29 @@ import threading import confluent_kafka as ck import ducktape +from typing import Callable, NamedTuple from ducktape.mark import parametrize, matrix from rptest.services.admin import Admin from rptest.tests.partition_movement import PartitionMovementMixin +from rptest.clients.default import DefaultClient from rptest.services.cluster import cluster from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer from rptest.tests.redpanda_test import RedpandaTest -from rptest.clients.kafka_cli_tools import KafkaCliTools +from rptest.services.kafka import KafkaServiceAdapter +from rptest.clients.kafka_cat import KafkaCat +from kafkatest.services.kafka import KafkaService +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.version import V_3_0_0 +from rptest.clients.kafka_cli_tools import KafkaCliTools, KafkaDeleteOffsetsResponse from rptest.clients.rpk import RpkTool, RpkException from ducktape.utils.util import wait_until from rptest.clients.types import TopicSpec from rptest.util import produce_until_segments, wait_until_result, expect_exception from rptest.services.redpanda import SISettings from rptest.utils.si_utils import BucketView, NTP +from ducktape.mark.resource import cluster as ducktape_cluster +from ducktape.tests.test import Test TEST_TOPIC_NAME = "test-topic-1" TEST_COMPACTED_TOPIC_NAME = "test-topic-2-compact" @@ -298,7 +307,7 @@ def test_delete_records_topic_start_delta(self, cloud_storage_enabled): @matrix(cloud_storage_enabled=[True, False], truncate_point=[ "at_segment_boundary", "random_offset", - "one_below_high_watermark", "at_high_watermark" + "one_below_high_watermark", "at_high_watermark", "start_offset" ]) def test_delete_records_segment_deletion(self, cloud_storage_enabled: bool, truncate_point: str): @@ -345,6 +354,8 @@ def obtain_test_parameters(local_snapshot): truncate_offset = None high_watermark = int( self.get_topic_info(TEST_TOPIC_NAME).high_watermark) + start_offset = int( + self.get_topic_info(TEST_TOPIC_NAME).start_offset) if truncate_point == "one_below_high_watermark": truncate_offset = high_watermark - 1 # Leave 1 record elif truncate_point == "at_high_watermark": @@ -354,6 +365,8 @@ def obtain_test_parameters(local_snapshot): elif truncate_point == "at_segment_boundary": random_segment = random.randint(1, len(local_snapshot) - 1) truncate_offset = segment_boundaries[random_segment] + elif truncate_point == "start_offset": + truncate_offset = start_offset else: assert False, "unknown test parameter encountered" @@ -484,10 +497,6 @@ def bad_truncation(truncate_offset): truncate_offset) assert low_watermark == truncate_offset - # Try to truncate before and at the low_watermark - bad_truncation(0) - bad_truncation(low_watermark) - # Try to truncate at a specific edge case where the start and end # are 1 offset away from eachother truncate_offset = num_records - 2 @@ -495,9 +504,6 @@ def bad_truncation(truncate_offset): low_watermark = self.delete_records(TEST_TOPIC_NAME, 0, t_ofs) assert low_watermark == t_ofs - # Assert that nothing is readable - bad_truncation(truncate_offset) - bad_truncation(num_records) bad_truncation(num_records + 1) @cluster(num_nodes=3) @@ -522,11 +528,6 @@ def test_delete_records_empty_or_missing_topic_or_partition( missing_idx = 15 self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [missing_idx]) - # Assert out of range occurs on an empty topic - with expect_exception(RpkException, - lambda e: out_of_range_prefix in str(e)): - self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [0]) - # Assert correct behavior on a topic with 1 record self.rpk.produce(TEST_TOPIC_NAME, "k", "v", partition=0) self.wait_until_records(TEST_TOPIC_NAME, @@ -535,8 +536,7 @@ def test_delete_records_empty_or_missing_topic_or_partition( backoff_sec=1) with expect_exception(RpkException, lambda e: out_of_range_prefix in str(e)): - self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [0]) - + self.rpk.trim_prefix(TEST_TOPIC_NAME, 100, [0]) # ... truncating at high watermark 1 should delete all data low_watermark = self.delete_records(TEST_TOPIC_NAME, 0, 1) assert low_watermark == 1 @@ -657,10 +657,13 @@ class ThreadReporter: raise ThreadReporter.exc @cluster(num_nodes=5) - @parametrize(cloud_storage_enabled=True) - @parametrize(cloud_storage_enabled=False) - def test_delete_records_concurrent_truncations(self, - cloud_storage_enabled): + @matrix(cloud_storage_enabled=[True, False], + truncate_point=[ + "random_offset", "one_below_high_watermark", + "at_high_watermark", "start_offset" + ]) + def test_delete_records_concurrent_truncations(self, cloud_storage_enabled, + truncate_point): """ This tests verifies that issuing DeleteRecords requests with concurrent producers and consumers has no effect on correctness @@ -683,23 +686,29 @@ def test_delete_records_concurrent_truncations(self, def issue_delete_records(): topic_info = self.get_topic_info(TEST_TOPIC_NAME) - start_offset = topic_info.start_offset + 1 - high_watermark = topic_info.high_watermark - 1 - if high_watermark - start_offset <= 0: - self.redpanda.logger.info("Waiting on log to populate") - return - truncate_point = random.randint(start_offset, high_watermark) + start_offset = topic_info.start_offset + high_watermark = topic_info.high_watermark + if truncate_point == "one_below_high_watermark": + truncate_offset = high_watermark - 1 # Leave 1 record + elif truncate_point == "at_high_watermark": + truncate_offset = high_watermark # Delete all data + elif truncate_point == "random_offset": + truncate_offset = random.randint(start_offset, high_watermark) + elif truncate_point == "start_offset": + truncate_offset = start_offset + else: + assert False, "unknown test parameter encountered" self.redpanda.logger.info( - f"Issuing delete_records request at offset: {truncate_point}") - response = self.rpk.trim_prefix(TEST_TOPIC_NAME, truncate_point, + f"Issuing delete_records request at offset: {truncate_offset}") + response = self.rpk.trim_prefix(TEST_TOPIC_NAME, truncate_offset, [0]) assert len(response) == 1 - assert response[0].new_start_offset == truncate_point + assert response[0].new_start_offset == truncate_offset assert response[0].error_msg == "" # Cannot assert end boundaries as there is a concurrent producer # moving the hwm forward self.assert_start_partition_boundaries(TEST_TOPIC_NAME, - truncate_point) + truncate_offset) def issue_partition_move(): self._dispatch_random_partition_move(TEST_TOPIC_NAME, 0) @@ -775,3 +784,175 @@ class PartitionMoveExceptionReporter: status = consumer.consumer_status assert status.validator.invalid_reads == 0 assert status.validator.out_of_scope_invalid_reads == 0 + + +class TopicPartitionOffset(NamedTuple): + topic: str + partition: int + offset: int + + +class BaseDeleteRecordsTest: + client: Callable[[], DefaultClient] + test_context: dict + + def _make_delete_records_for_cli( + self, topic_partition_offsets: list[TopicPartitionOffset]): + delete_records_json = {"version": 1, "partitions": []} + for tpo in topic_partition_offsets: + delete_records_json["partitions"].append({ + "topic": tpo.topic, + "partition": tpo.partition, + "offset": tpo.offset + }) + return delete_records_json + + def _execute_kafka_delete_records( + self, cluster, delete_records_tpos: list[TopicPartitionOffset]): + delete_records_json = self._make_delete_records_for_cli( + delete_records_tpos) + kafka_tools = KafkaCliTools(cluster) + res = kafka_tools.delete_records(delete_records_json) + for r in res: + if r.error_msg != "": + raise Exception(r.error_msg) + return res + + def _test_delete_records_empty_topic(self, cluster): + self.rpk = RpkTool(cluster) + empty_topic = TopicSpec(name=TEST_TOPIC_NAME, + partition_count=1, + replication_factor=3) + self.client().create_topic(empty_topic) + delete_records_tpos = [TopicPartitionOffset(TEST_TOPIC_NAME, 0, 0)] + + delete_records_result = self._execute_kafka_delete_records( + cluster, delete_records_tpos)[0] + kafka_low_watermark = delete_records_result.new_start_offset + assert kafka_low_watermark == 0 + + trim_prefix_result = self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [0])[0] + rpk_low_watermark = trim_prefix_result.new_start_offset + assert rpk_low_watermark == 0 + + def _test_delete_records_non_empty_topic(self, cluster, truncate_point): + self.rpk = RpkTool(cluster) + topic = TopicSpec(name=TEST_TOPIC_NAME, + partition_count=1, + replication_factor=3) + self.client().create_topic(topic) + + # Produce some data to the partition + producer = KgoVerifierProducer(self.test_context, + cluster, + TEST_TOPIC_NAME, + msg_size=1, + msg_count=10) + + producer.start() + producer.wait() + + # Make an initial DeleteRecords request to bump up the start offset + delete_records_tpos = [TopicPartitionOffset(TEST_TOPIC_NAME, 0, 5)] + delete_records_result = self._execute_kafka_delete_records( + cluster, delete_records_tpos)[0] + + # Should see the new_start_offset == 5 + new_start_offset = delete_records_result.new_start_offset + assert new_start_offset == 5 + + # Set the truncation offset and result expectations based on the test injection parameter + # Kafka/redpanda should both handle truncation_offset <= start_offset. + truncate_offset = None + expect_error = False + expected_new_start_offset = None + if truncate_point == "start_offset": + truncate_offset = new_start_offset + expected_new_start_offset = truncate_offset + elif truncate_point == "below_start_offset": + truncate_offset = new_start_offset - 1 + expected_new_start_offset = new_start_offset + else: + assert False, "unknown truncate point encountered" + + kafka_offset_out_of_range = "org.apache.kafka.common.errors.OffsetOutOfRangeException" + rp_offset_out_of_range = "OFFSET_OUT_OF_RANGE" + + # Perform Kafka DeleteRecords + delete_records_tpos = [ + TopicPartitionOffset(TEST_TOPIC_NAME, 0, truncate_offset) + ] + try: + delete_records_result = self._execute_kafka_delete_records( + cluster, delete_records_tpos)[0] + print(truncate_offset, new_start_offset, expected_new_start_offset, + delete_records_result) + assert delete_records_result.new_start_offset == expected_new_start_offset + except Exception as e: + assert expect_error + assert kafka_offset_out_of_range in str(e) + + # Perform rpk trim-prefix + try: + trim_prefix_result = self.rpk.trim_prefix(TEST_TOPIC_NAME, + truncate_offset, [0])[0] + assert trim_prefix_result.new_start_offset == expected_new_start_offset + except RpkException as e: + assert expect_error + assert rp_offset_out_of_range in str(e) + + +class DeleteRecordsRedpandaTest(RedpandaTest, BaseDeleteRecordsTest): + def setUp(self): + self.redpanda.start() + + @ducktape_cluster(num_nodes=3) + def test_delete_records_empty_topic(self): + self._test_delete_records_empty_topic(self.redpanda) + + @ducktape_cluster(num_nodes=4) + @matrix(truncate_point=["start_offset", "below_start_offset"]) + def test_delete_records_non_empty_topic(self, truncate_point): + self._test_delete_records_non_empty_topic(self.redpanda, + truncate_point) + + +class DeleteRecordsKafkaTest(Test, BaseDeleteRecordsTest): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.zk = ZookeeperService(self.test_context, + num_nodes=1, + version=V_3_0_0) + + self.kafka = KafkaServiceAdapter( + self.test_context, + KafkaService(self.test_context, + num_nodes=3, + zk=self.zk, + version=V_3_0_0)) + + self._client = DefaultClient(self.kafka) + + def client(self): + return self._client + + def setUp(self): + self.zk.start() + self.kafka.start() + + def tearDown(self): + self.logger.info("Stopping Kafka...") + self.kafka.stop() + + self.logger.info("Stopping Zookeeper...") + self.zk.stop() + + @ducktape_cluster(num_nodes=4) + def test_delete_records_empty_topic(self): + self._test_delete_records_empty_topic(self.kafka) + + @ducktape_cluster(num_nodes=5) + @matrix(truncate_point=["start_offset", "below_start_offset"]) + def test_delete_records_non_empty_topic(self, truncate_point): + self._test_delete_records_non_empty_topic(self.kafka, truncate_point)