Skip to content

Commit

Permalink
tests: add produce/consume load to all tests in shard_placement_test.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Jun 26, 2024
1 parent 6397bb1 commit 928723d
Showing 1 changed file with 71 additions and 7 deletions.
78 changes: 71 additions & 7 deletions tests/rptest/tests/shard_placement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,68 @@
from rptest.services.cluster import cluster
from rptest.services.redpanda import ResourceSettings
from rptest.services.admin import Admin
import rptest.services.kgo_verifier_services as kgo
from rptest.clients.rpk import RpkTool
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.util import wait_until_result


class ShardPlacementTest(RedpandaTest):
class ShardPlacementTest(PreallocNodesTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, num_brokers=5, **kwargs)
super().__init__(*args, num_brokers=5, node_prealloc_count=1, **kwargs)

def setUp(self):
# start the nodes manually
pass

def start_client_load(self, topic_name):
msg_size = 4096

if self.redpanda.dedicated_nodes:
rate_limit_bps = 100 * 2**20
elif not self.debug_mode:
rate_limit_bps = 10 * 2**20
else:
rate_limit_bps = 100 * 2**10

self.producer = kgo.KgoVerifierProducer(
self.test_context,
self.redpanda,
topic=topic_name,
msg_size=msg_size,
# some large number to get produce load till the end of test
msg_count=2**30,
rate_limit_bps=rate_limit_bps,
custom_node=self.preallocated_nodes)
self.producer.start(clean=False)
self.producer.wait_for_acks(10, timeout_sec=30, backoff_sec=1)

self.consumer = kgo.KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
topic=topic_name,
msg_size=msg_size,
readers=5,
loop=True,
nodes=self.preallocated_nodes,
debug_logs=True)
self.consumer.start(clean=False)
self.consumer.wait_total_reads(10, timeout_sec=30, backoff_sec=1)

def stop_client_load(self):
self.producer.stop()
self.consumer.wait_total_reads(self.producer.produce_status.acked,
timeout_sec=60,
backoff_sec=1)
self.consumer.stop()

self.logger.info(
f"produced {self.producer.produce_status.acked} msgs, "
f"consumed {self.consumer.consumer_status.validator.valid_reads}")
assert self.consumer.consumer_status.validator.invalid_reads == 0
assert self.consumer.consumer_status.validator.out_of_scope_invalid_reads == 0

def get_replica_shard_map(self, nodes, admin=None):
"""Return map of topic -> partition -> [(node_id, core)]"""

Expand Down Expand Up @@ -102,7 +150,7 @@ def is_stationary():
backoff_sec=backoff_sec)
return shard_map

@cluster(num_nodes=5)
@cluster(num_nodes=6)
def test_upgrade(self):
# Disable partition balancer in this test, as we need partitions
# to remain stationary.
Expand All @@ -127,6 +175,8 @@ def test_upgrade(self):
for topic in ["foo", "bar"]:
rpk.create_topic(topic, partitions=n_partitions, replicas=3)

self.start_client_load("foo")

self.logger.info("created cluster and topics.")
initial_map = self.get_replica_shard_map(seed_nodes, admin)
self.print_shard_stats(initial_map)
Expand Down Expand Up @@ -236,7 +286,9 @@ def test_upgrade(self):
self.print_shard_stats(map_after_restart)
assert map_after_restart == map_before_restart

@cluster(num_nodes=5)
self.stop_client_load()

@cluster(num_nodes=6)
def test_manual_rebalance(self):
self.redpanda.start()

Expand All @@ -248,6 +300,8 @@ def test_manual_rebalance(self):
for topic in ["foo", "bar"]:
rpk.create_topic(topic, partitions=n_partitions, replicas=5)

self.start_client_load("foo")

# Manually move some partitions to create artificial imbalance

node = self.redpanda.nodes[0]
Expand Down Expand Up @@ -287,7 +341,9 @@ def test_manual_rebalance(self):
for topic, shard_counts in counts_by_topic.items():
assert max(shard_counts) - min(shard_counts) <= 1

@cluster(num_nodes=5)
self.stop_client_load()

@cluster(num_nodes=6)
def test_core_count_change(self):
self.redpanda.set_resource_settings(ResourceSettings(num_cpus=1))
self.redpanda.start()
Expand All @@ -301,6 +357,8 @@ def test_core_count_change(self):
# create topics with rf=5 for ease of accounting
rpk.create_topic(topic, partitions=n_partitions, replicas=5)

self.start_client_load("foo")

# increase cpu count on one node, restart it and
# check that new shards are in use.
self.logger.info("increasing cpu count and restarting...")
Expand Down Expand Up @@ -346,9 +404,11 @@ def test_core_count_change(self):
self.print_shard_stats(map_after_restart)
assert map_after_restart == shard_map

self.stop_client_load()

# TODO: core count decrease (not supported yet)

@cluster(num_nodes=5)
@cluster(num_nodes=6)
def test_node_join(self):
self.redpanda.add_extra_rp_conf({
"core_balancing_continuous": True,
Expand All @@ -366,6 +426,8 @@ def test_node_join(self):
for topic in topics:
rpk.create_topic(topic, partitions=n_partitions, replicas=3)

self.start_client_load("foo")

self.logger.info(f"created topics: {topics}")
initial_shard_map = self.wait_shard_map_stationary(seed_nodes, admin)
self.print_shard_stats(initial_shard_map)
Expand Down Expand Up @@ -407,3 +469,5 @@ def shard_rebalance_finished():
backoff_sec=2)
self.logger.info("shard rebalance finished")
self.print_shard_stats(shard_map_after_balance)

self.stop_client_load()

0 comments on commit 928723d

Please sign in to comment.