Skip to content

Commit

Permalink
wip second
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Mar 21, 2024
1 parent 0d335c7 commit 761b4a3
Showing 1 changed file with 104 additions and 3 deletions.
107 changes: 104 additions & 3 deletions tests/rptest/tests/write_caching_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
import random
from enum import Enum

from ducktape.mark import ignore
from ducktape.utils.util import wait_until

from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from rptest.services.failure_injector import FailureSpec, make_failure_injector
from rptest.services.kgo_verifier_services import (
KgoVerifierProducer,
KgoVerifierSeqConsumer,
)
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.admin import Admin


# no StrEnum support in test python version
Expand Down Expand Up @@ -193,6 +194,103 @@ def __init__(self, test_context):
self.topic_name = "test"
self.topics = [TopicSpec(name=self.topic_name)]

# This test shouldn't observe data loss because we are replicating to
# majority of replicas and the leader returns after the election.
@cluster(num_nodes=5)
def test_crash_leader(self):
consumer = KgoVerifierSeqConsumer(
self.test_context,
self.redpanda,
self.topic_name,
loop=False,
continuous=True,
tolerate_data_loss=False,
)
consumer.start()

num_restarts = 5
num_acked_per_iter = 1000
num_consumed_per_iter = 1000

msg_size = 512
producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
self.topic_name,
msg_size,
1_000_000_000,
batch_max_bytes=2 * msg_size,
max_buffered_records=1,
tolerate_data_loss=False,
)
producer.start()

acks_count = 0
consume_count = 0

# Introduce crashes to trigger data loss but verify that topic is still
# functioning correctly:
# a) data is still being produced,
# b) data is still being consumed.
for i in range(0, num_restarts):
acks_count = producer._status.acked

# Validate producer health.
self.logger.debug(
f"Waiting producer acked to reach {acks_count + num_acked_per_iter} "
f"messages in iteration {i}/{num_restarts}")
producer.wait_for_acks(acks_count + num_acked_per_iter,
timeout_sec=60,
backoff_sec=1)
acks_count = producer._status.acked

# Validate consumer health.
self.logger.debug(
f"Waiting for consumer to reach {consume_count + num_consumed_per_iter} messages"
)
wait_until(lambda: consumer._status.validator.total_reads >=
consume_count + num_consumed_per_iter,
timeout_sec=60,
backoff_sec=1)
consume_count = consumer._status.validator.total_reads

# Introduce failure.
admin = Admin(self.redpanda)
leader_id = admin.get_partition_leader(namespace="kafka",
topic=self.topic_name,
partition=0)
node = self.redpanda.get_node(leader_id)

self.logger.debug(f"Restarting leader node: {node.name}")
self.redpanda.signal_redpanda(node)

def wait_new_leader():
new_leader_id = admin.get_partition_leader(
namespace="kafka", topic=self.topic_name, partition=0)
return new_leader_id != -1 and new_leader_id != leader_id

self.logger.debug("Waiting for new leader")
wait_until(wait_new_leader,
timeout_sec=30,
backoff_sec=1,
err_msg="New leader not elected")

self.logger.debug(f"Starting back old leader node: {node.name}")
self.redpanda.start_node(node)

producer.stop()

# Stop consumer.
consumer.wait()
consumer.stop()

self.logger.info(
f"Lost offsets: {consumer._status.validator.lost_offsets}")

# This test doesn't tolerate data loss.
assert consumer._status.validator.lost_offsets is None

@ignore
@cluster(num_nodes=5)
def test_crash_all_or_leader(self):
consumer = KgoVerifierSeqConsumer(
Expand Down Expand Up @@ -356,7 +454,7 @@ def test_unavoidable_data_loss(self):
f"Waiting producer acked to reach {acks_count + num_acked_per_iter} "
f"messages in iteration {i}/{num_restarts}")
producer.wait_for_acks(acks_count + num_acked_per_iter,
timeout_sec=60,
timeout_sec=120,
backoff_sec=1)
acks_count = producer._status.acked

Expand All @@ -366,7 +464,7 @@ def test_unavoidable_data_loss(self):
)
wait_until(lambda: consumer._status.validator.total_reads >=
consume_count + num_consumed_per_iter,
timeout_sec=60,
timeout_sec=120,
backoff_sec=1)
consume_count = consumer._status.validator.total_reads

Expand Down Expand Up @@ -411,3 +509,6 @@ def wait_new_leader():
# redpanda to.
assert consumer._status.validator.lost_offsets['0'] > 0, \
"No lost messages observed. The test is not valid."

def teardown(self):
make_failure_injector(self.redpanda)._heal_all()

0 comments on commit 761b4a3

Please sign in to comment.