Skip to content

Commit

Permalink
tests: modified remote recovery test to test recovery from cloud storage
Browse files Browse the repository at this point in the history
Added replicating some data and waiting for then to be uploaded to the
cloud when executing node wise recovery. This way a test is able to
verify if cloud storage data are used when force re-configuring
partitions with lost majority.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 20, 2024
1 parent d62fbd4 commit 010e107
Showing 1 changed file with 107 additions and 16 deletions.
123 changes: 107 additions & 16 deletions tests/rptest/tests/partition_force_reconfiguration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
import requests
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.tests.end_to_end import EndToEndTest
from rptest.clients.rpk import RpkTool
from ducktape.mark import ignore, matrix
from ducktape.utils.util import wait_until
from random import shuffle
import time
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.services.admin import Replica
from rptest.services.admin import Admin, Replica
from rptest.clients.kcl import KCL
from threading import Thread, Condition
from rptest.services.redpanda import RedpandaService
from rptest.services.redpanda import RedpandaService, SISettings
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until_result


Expand Down Expand Up @@ -368,25 +370,93 @@ def get_lso():
self.start_consumer()
self.run_validation()

@cluster(num_nodes=5)

class NodeWiseRecoveryTest(RedpandaTest):
def __init__(self, test_context, *args, **kwargs):
super(NodeWiseRecoveryTest,
self).__init__(test_context,
si_settings=SISettings(
log_segment_size=1024 * 1024,
test_context=test_context,
fast_uploads=True,
retention_local_strict=True,
),
extra_rp_conf={
"partition_autobalancing_mode": "continuous",
"enable_leader_balancer": False,
},
num_brokers=5,
*args,
**kwargs)
self.default_timeout_sec = 60
self.rpk = RpkTool(self.redpanda)
self.admin = Admin(self.redpanda)

def _alive_nodes(self):
return [n.account.hostname for n in self.redpanda.started_nodes()]

def collect_topic_partition_states(self, topic):
states = {}
for p in self.rpk.describe_topic(topic):
states[p.id] = self.admin.get_partition_state(
namespace="kafka",
topic=topic,
partition=p.id,
node=self.redpanda.get_node_by_id(p.leader))
return states

def get_topic_partition_high_watermarks(self, topic):
high_watermarks = {}
for p in self.rpk.describe_topic(topic):
high_watermarks[p.id] = p.high_watermark
return high_watermarks

def produce_until_segments_removed(self, topic):
msg_size = 512

self.producer = KgoVerifierProducer(self.test_context, self.redpanda,
topic, msg_size, 10000000)

self.producer.start(clean=False)

def all_cloud_offsets_advanced():
states = self.collect_topic_partition_states(topic)

return all(r['next_cloud_offset'] >= 1000 for s in states.values()
for r in s['replicas'])

wait_until(
all_cloud_offsets_advanced,
timeout_sec=self.default_timeout_sec,
backoff_sec=1,
err_msg="Error waiting for retention to prefix truncate partitions"
)

self.producer.stop()
self.producer.clean()
self.producer.free()

@cluster(num_nodes=6)
@matrix(dead_node_count=[1, 2])
def test_node_wise_recovery(self, dead_node_count):
self.start_redpanda(num_nodes=5,
extra_rp_conf={
"partition_autobalancing_mode": "continuous",
"enable_leader_balancer": False,
})

num_topics = 20
# Create a mix of rf=1 and 3 topics.
topics = []
for i in range(0, num_topics):
rf = 3 if i % 2 == 0 else 1
parts = random.randint(1, 3)
with_ts = random.choice([True, False])
spec = TopicSpec(name=f"topic-{i}",
replication_factor=rf,
partition_count=parts)
partition_count=parts,
redpanda_remote_read=with_ts,
redpanda_remote_write=with_ts)
topics.append(spec)
self.client().create_topic(spec)
self.client().alter_topic_config(
spec.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES,
2 * 1024 * 1024)

admin = self.redpanda._admin

Expand All @@ -395,9 +465,17 @@ def test_node_wise_recovery(self, dead_node_count):
to_kill_node_ids = [
int(self.redpanda.node_id(n)) for n in to_kill_nodes
]
for t in topics:
self.produce_until_segments_removed(t.name)
self.redpanda.wait_for_manifest_uploads()

partitions_lost_majority = admin.get_majority_lost_partitions_from_nodes(
dead_brokers=to_kill_node_ids)
# collect topic partition high watermarks before recovery
initial_topic_hws = {}
for t in topics:
initial_topic_hws[
t.name] = self.get_topic_partition_high_watermarks(t.name)

self.logger.debug(f"Stopping nodes: {to_kill_node_ids}")
self.redpanda.for_nodes(to_kill_nodes, self.redpanda.stop_node)
Expand All @@ -408,7 +486,7 @@ def controller_available():
controller) not in to_kill_node_ids

wait_until(controller_available,
timeout_sec=self.WAIT_TIMEOUT_S,
timeout_sec=self.default_timeout_sec,
backoff_sec=3,
err_msg="Controller not available")

Expand Down Expand Up @@ -440,7 +518,7 @@ def no_majority_lost_partitions():
try:
transfers.pause()
wait_until(controller_available,
timeout_sec=self.WAIT_TIMEOUT_S,
timeout_sec=self.default_timeout_sec,
backoff_sec=3,
err_msg="Controller not available")
lost_majority = admin.get_majority_lost_partitions_from_nodes(
Expand All @@ -458,7 +536,7 @@ def no_majority_lost_partitions():

# Wait until there are no partition assignments with majority loss due to dead nodes.
wait_until(no_majority_lost_partitions,
timeout_sec=self.WAIT_TIMEOUT_S,
timeout_sec=self.default_timeout_sec,
backoff_sec=3,
err_msg="Node wise recovery failed")

Expand All @@ -474,13 +552,13 @@ def no_pending_force_reconfigurations():
try:
transfers.pause()
wait_until(controller_available,
timeout_sec=self.WAIT_TIMEOUT_S,
timeout_sec=self.default_timeout_sec,
backoff_sec=3,
err_msg="Controller not available")
# Wait for balancer tick to run so the data is populated.
wait_until(lambda: get_partition_balancer_status(
lambda s: s["status"] != "starting"),
timeout_sec=self.WAIT_TIMEOUT_S,
timeout_sec=self.default_timeout_sec,
backoff_sec=3,
err_msg="Balancer tick did not run in time")
return get_partition_balancer_status(lambda s: s[
Expand All @@ -492,7 +570,7 @@ def no_pending_force_reconfigurations():
transfers.resume()

wait_until(no_pending_force_reconfigurations,
timeout_sec=self.WAIT_TIMEOUT_S,
timeout_sec=self.default_timeout_sec,
backoff_sec=3,
err_msg="reported force recovery count is non zero")

Expand All @@ -502,6 +580,19 @@ def no_pending_force_reconfigurations():
self.redpanda._admin.await_stable_leader(
topic=topic.name,
partition=part,
timeout_s=self.WAIT_TIMEOUT_S,
timeout_s=self.default_timeout_sec,
backoff_s=2,
hosts=self._alive_nodes())
topic_hws_after_recovery = {}
for t in topics:
topic_hws_after_recovery[
t.name] = self.get_topic_partition_high_watermarks(t.name)

for t in topics:
for partition_id, initial_hw in initial_topic_hws[t.name].items():
final_hw = topic_hws_after_recovery[t.name][partition_id]
self.logger.info(
f"partition {t}/{partition_id} replicas initial high watermark: {initial_hw} final high watermark: {final_hw}"
)
if t.redpanda_remote_write or t.replication_factor == 3:
assert final_hw >= 0.8 * initial_hw

0 comments on commit 010e107

Please sign in to comment.