Skip to content

Commit

Permalink
ducktape: tests for delete-records
Browse files Browse the repository at this point in the history
Adds a couple basic tests for delete-records requests:
- that they persist after topic recovery
- that they are honored on read replicas
  • Loading branch information
andrwng committed Jun 21, 2023
1 parent d29250f commit 22bf207
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 5 deletions.
62 changes: 59 additions & 3 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.kcl import KCL
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.action_injector import random_process_kills
Expand Down Expand Up @@ -296,9 +297,6 @@ def indices_uploaded():
@skip_debug_mode
@cluster(num_nodes=4)
def test_recover(self):
# Recovery will leave behind results files, which aren't tracked.
# TODO: remove when recovery is controller-driven instead of using
# results objects.
self.start_producer()
produce_until_segments(
redpanda=self.redpanda,
Expand Down Expand Up @@ -337,6 +335,64 @@ def test_recover(self):
timeout_sec=30,
backoff_sec=1)

@skip_debug_mode
@cluster(num_nodes=4)
def test_recover_after_delete_records(self):
self.start_producer()
produce_until_segments(
redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=10,
)
original_snapshot = self.redpanda.storage(
all_nodes=True).segments_by_node("kafka", self.topic, 0)
self.kafka_tools.alter_topic_config(
self.topic,
{
TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES:
5 * self.segment_size,
},
)

wait_for_removal_of_n_segments(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
n=6,
original_snapshot=original_snapshot)
kcl = KCL(self.redpanda)
new_lwm = 2
response = kcl.delete_records(
{self.topic: {
0: new_lwm
}})
assert len(response) == 1
assert response[0].topic == self.topic
assert response[0].partition == 0
assert response[0].error == 'OK', f"Err msg: {response[0].error}"
assert new_lwm == response[0].new_low_watermark, response[0].new_low_watermark
rpk = RpkTool(self.redpanda)
topics_info = list(rpk.describe_topic(self.topic))
assert len(topics_info) == 1
assert topics_info[0].start_offset == new_lwm, topics_info

self.redpanda.stop()
self.redpanda.remove_local_data(self.redpanda.nodes[0])
self.redpanda.restart_nodes(self.redpanda.nodes)
self.redpanda._admin.await_stable_leader("controller",
partition=0,
namespace='redpanda',
timeout_s=60,
backoff_s=2)

rpk.cluster_recovery_start(wait=True)
wait_until(lambda: len(set(rpk.list_topics())) == 1,
timeout_sec=30,
backoff_sec=1)
topics_info = list(rpk.describe_topic(self.topic))
assert len(topics_info) == 1
assert topics_info[0].start_offset == new_lwm, topics_info


class EndToEndShadowIndexingTestCompactedTopic(EndToEndShadowIndexingBase):
topics = (TopicSpec(
Expand Down
81 changes: 79 additions & 2 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from re import T
from typing import NamedTuple, Optional
from rptest.services.cluster import cluster

from rptest.clients.default import DefaultClient
from rptest.clients.kcl import KCL
from rptest.services.redpanda import SISettings
from rptest.clients.rpk import RpkTool, RpkException
from rptest.clients.types import TopicSpec
Expand Down Expand Up @@ -43,6 +45,32 @@ class BucketUsage(NamedTuple):
]


def get_lwm_per_partition(cluster: RedpandaService, topic_name,
partition_count):
id_to_lwm = dict()
rpk = RpkTool(cluster)
for prt in rpk.describe_topic(topic_name):
id_to_lwm[prt.id] = prt.start_offset
if len(id_to_lwm) != partition_count:
return False, None
return True, id_to_lwm

def lwms_are_identical(logger, src_cluster, dst_cluster, topic_name,
partition_count):
# Collect the HWMs for each partition before stopping.
src_lwms = wait_until_result(lambda: get_lwm_per_partition(
src_cluster, topic_name, partition_count),
timeout_sec=30,
backoff_sec=1)

# Ensure that our HWMs on the destination are the same.
rr_lwms = wait_until_result(lambda: get_lwm_per_partition(
dst_cluster, topic_name, partition_count),
timeout_sec=30,
backoff_sec=1)
logger.info(f"{src_lwms} vs {rr_lwms}")
return src_lwms == rr_lwms

def get_hwm_per_partition(cluster: RedpandaService, topic_name,
partition_count):
id_to_hwm = dict()
Expand All @@ -53,7 +81,6 @@ def get_hwm_per_partition(cluster: RedpandaService, topic_name,
return False, None
return True, id_to_hwm


def hwms_are_identical(logger, src_cluster, dst_cluster, topic_name,
partition_count):
# Collect the HWMs for each partition before stopping.
Expand All @@ -70,7 +97,6 @@ def hwms_are_identical(logger, src_cluster, dst_cluster, topic_name,
logger.info(f"{src_hwms} vs {rr_hwms}")
return src_hwms == rr_hwms


def create_read_replica_topic(dst_cluster, topic_name, bucket_name) -> None:
rpk_dst_cluster = RpkTool(dst_cluster)
# NOTE: we set 'redpanda.remote.readreplica' to ORIGIN
Expand Down Expand Up @@ -224,6 +250,57 @@ def _bucket_delta(self, bu1: BucketUsage,
else:
return None

@cluster(num_nodes=7, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST)
@matrix(partition_count=[5], cloud_storage_type=[CloudStorageType.S3])
def test_identical_lwms_after_delete_records(self, partition_count: int,
cloud_storage_type: CloudStorageType) -> None:
self._setup_read_replica(partition_count=partition_count,
num_messages=1000)
kcl = KCL(self.redpanda)
def set_lwm(new_lwm):
response = kcl.delete_records(
{self.topic_name: {
0: new_lwm
}})
assert response[0].error == 'OK', response[0].error

rpk = RpkTool(self.redpanda)
def check_lwm(new_lwm):
topics_info = list(rpk.describe_topic(self.topic_name))
topic_info = topics_info[0]
for t in topics_info:
if t.id == 0:
topic_info = t
break
assert topic_info.start_offset == new_lwm, topic_info
set_lwm(5)
check_lwm(5)

def clusters_report_identical_lwms():
return lwms_are_identical(self.logger, self.redpanda,
self.second_cluster, self.topic_name,
partition_count)

wait_until(clusters_report_identical_lwms,
timeout_sec=30,
backoff_sec=1)

# As a sanity check, ensure the same is true after a restart.
self.redpanda.restart_nodes(self.redpanda.nodes)
wait_until(clusters_report_identical_lwms,
timeout_sec=30,
backoff_sec=1)

set_lwm(6)
check_lwm(6)

self.second_cluster.restart_nodes(self.second_cluster.nodes)
wait_until(clusters_report_identical_lwms,
timeout_sec=30,
backoff_sec=1)
set_lwm(7)
check_lwm(7)

@cluster(num_nodes=8, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST)
@matrix(partition_count=[5], cloud_storage_type=[CloudStorageType.S3])
def test_identical_hwms(self, partition_count: int,
Expand Down

0 comments on commit 22bf207

Please sign in to comment.