From 22bf207119d97580b639c51386c6941c7beada84 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 21 Jun 2023 00:18:52 -0700 Subject: [PATCH] ducktape: tests for delete-records Adds a couple basic tests for delete-records requests: - that they persist after topic recovery - that they are honored on read replicas --- .../rptest/tests/e2e_shadow_indexing_test.py | 62 +++++++++++++- tests/rptest/tests/read_replica_e2e_test.py | 81 ++++++++++++++++++- 2 files changed, 138 insertions(+), 5 deletions(-) diff --git a/tests/rptest/tests/e2e_shadow_indexing_test.py b/tests/rptest/tests/e2e_shadow_indexing_test.py index 8e4ec00cffcc..6d21282e1801 100644 --- a/tests/rptest/tests/e2e_shadow_indexing_test.py +++ b/tests/rptest/tests/e2e_shadow_indexing_test.py @@ -18,6 +18,7 @@ from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.kafka_cli_tools import KafkaCliTools +from rptest.clients.kcl import KCL from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec from rptest.services.action_injector import random_process_kills @@ -296,9 +297,6 @@ def indices_uploaded(): @skip_debug_mode @cluster(num_nodes=4) def test_recover(self): - # Recovery will leave behind results files, which aren't tracked. - # TODO: remove when recovery is controller-driven instead of using - # results objects. self.start_producer() produce_until_segments( redpanda=self.redpanda, @@ -337,6 +335,64 @@ def test_recover(self): timeout_sec=30, backoff_sec=1) + @skip_debug_mode + @cluster(num_nodes=4) + def test_recover_after_delete_records(self): + self.start_producer() + produce_until_segments( + redpanda=self.redpanda, + topic=self.topic, + partition_idx=0, + count=10, + ) + original_snapshot = self.redpanda.storage( + all_nodes=True).segments_by_node("kafka", self.topic, 0) + self.kafka_tools.alter_topic_config( + self.topic, + { + TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES: + 5 * self.segment_size, + }, + ) + + wait_for_removal_of_n_segments(redpanda=self.redpanda, + topic=self.topic, + partition_idx=0, + n=6, + original_snapshot=original_snapshot) + kcl = KCL(self.redpanda) + new_lwm = 2 + response = kcl.delete_records( + {self.topic: { + 0: new_lwm + }}) + assert len(response) == 1 + assert response[0].topic == self.topic + assert response[0].partition == 0 + assert response[0].error == 'OK', f"Err msg: {response[0].error}" + assert new_lwm == response[0].new_low_watermark, response[0].new_low_watermark + rpk = RpkTool(self.redpanda) + topics_info = list(rpk.describe_topic(self.topic)) + assert len(topics_info) == 1 + assert topics_info[0].start_offset == new_lwm, topics_info + + self.redpanda.stop() + self.redpanda.remove_local_data(self.redpanda.nodes[0]) + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + rpk.cluster_recovery_start(wait=True) + wait_until(lambda: len(set(rpk.list_topics())) == 1, + timeout_sec=30, + backoff_sec=1) + topics_info = list(rpk.describe_topic(self.topic)) + assert len(topics_info) == 1 + assert topics_info[0].start_offset == new_lwm, topics_info + class EndToEndShadowIndexingTestCompactedTopic(EndToEndShadowIndexingBase): topics = (TopicSpec( diff --git a/tests/rptest/tests/read_replica_e2e_test.py b/tests/rptest/tests/read_replica_e2e_test.py index 497903a6b176..b1e6efed9104 100644 --- a/tests/rptest/tests/read_replica_e2e_test.py +++ b/tests/rptest/tests/read_replica_e2e_test.py @@ -6,10 +6,12 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +from re import T from typing import NamedTuple, Optional from rptest.services.cluster import cluster from rptest.clients.default import DefaultClient +from rptest.clients.kcl import KCL from rptest.services.redpanda import SISettings from rptest.clients.rpk import RpkTool, RpkException from rptest.clients.types import TopicSpec @@ -43,6 +45,32 @@ class BucketUsage(NamedTuple): ] +def get_lwm_per_partition(cluster: RedpandaService, topic_name, + partition_count): + id_to_lwm = dict() + rpk = RpkTool(cluster) + for prt in rpk.describe_topic(topic_name): + id_to_lwm[prt.id] = prt.start_offset + if len(id_to_lwm) != partition_count: + return False, None + return True, id_to_lwm + +def lwms_are_identical(logger, src_cluster, dst_cluster, topic_name, + partition_count): + # Collect the HWMs for each partition before stopping. + src_lwms = wait_until_result(lambda: get_lwm_per_partition( + src_cluster, topic_name, partition_count), + timeout_sec=30, + backoff_sec=1) + + # Ensure that our HWMs on the destination are the same. + rr_lwms = wait_until_result(lambda: get_lwm_per_partition( + dst_cluster, topic_name, partition_count), + timeout_sec=30, + backoff_sec=1) + logger.info(f"{src_lwms} vs {rr_lwms}") + return src_lwms == rr_lwms + def get_hwm_per_partition(cluster: RedpandaService, topic_name, partition_count): id_to_hwm = dict() @@ -53,7 +81,6 @@ def get_hwm_per_partition(cluster: RedpandaService, topic_name, return False, None return True, id_to_hwm - def hwms_are_identical(logger, src_cluster, dst_cluster, topic_name, partition_count): # Collect the HWMs for each partition before stopping. @@ -70,7 +97,6 @@ def hwms_are_identical(logger, src_cluster, dst_cluster, topic_name, logger.info(f"{src_hwms} vs {rr_hwms}") return src_hwms == rr_hwms - def create_read_replica_topic(dst_cluster, topic_name, bucket_name) -> None: rpk_dst_cluster = RpkTool(dst_cluster) # NOTE: we set 'redpanda.remote.readreplica' to ORIGIN @@ -224,6 +250,57 @@ def _bucket_delta(self, bu1: BucketUsage, else: return None + @cluster(num_nodes=7, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST) + @matrix(partition_count=[5], cloud_storage_type=[CloudStorageType.S3]) + def test_identical_lwms_after_delete_records(self, partition_count: int, + cloud_storage_type: CloudStorageType) -> None: + self._setup_read_replica(partition_count=partition_count, + num_messages=1000) + kcl = KCL(self.redpanda) + def set_lwm(new_lwm): + response = kcl.delete_records( + {self.topic_name: { + 0: new_lwm + }}) + assert response[0].error == 'OK', response[0].error + + rpk = RpkTool(self.redpanda) + def check_lwm(new_lwm): + topics_info = list(rpk.describe_topic(self.topic_name)) + topic_info = topics_info[0] + for t in topics_info: + if t.id == 0: + topic_info = t + break + assert topic_info.start_offset == new_lwm, topic_info + set_lwm(5) + check_lwm(5) + + def clusters_report_identical_lwms(): + return lwms_are_identical(self.logger, self.redpanda, + self.second_cluster, self.topic_name, + partition_count) + + wait_until(clusters_report_identical_lwms, + timeout_sec=30, + backoff_sec=1) + + # As a sanity check, ensure the same is true after a restart. + self.redpanda.restart_nodes(self.redpanda.nodes) + wait_until(clusters_report_identical_lwms, + timeout_sec=30, + backoff_sec=1) + + set_lwm(6) + check_lwm(6) + + self.second_cluster.restart_nodes(self.second_cluster.nodes) + wait_until(clusters_report_identical_lwms, + timeout_sec=30, + backoff_sec=1) + set_lwm(7) + check_lwm(7) + @cluster(num_nodes=8, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST) @matrix(partition_count=[5], cloud_storage_type=[CloudStorageType.S3]) def test_identical_hwms(self, partition_count: int,