diff --git a/tests/rptest/tests/shard_placement_test.py b/tests/rptest/tests/shard_placement_test.py index e025b711d6acf..687d465ea979b 100644 --- a/tests/rptest/tests/shard_placement_test.py +++ b/tests/rptest/tests/shard_placement_test.py @@ -12,20 +12,68 @@ from rptest.services.cluster import cluster from rptest.services.redpanda import ResourceSettings from rptest.services.admin import Admin +import rptest.services.kgo_verifier_services as kgo from rptest.clients.rpk import RpkTool -from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.services.redpanda_installer import RedpandaInstaller from rptest.util import wait_until_result -class ShardPlacementTest(RedpandaTest): +class ShardPlacementTest(PreallocNodesTest): def __init__(self, *args, **kwargs): - super().__init__(*args, num_brokers=5, **kwargs) + super().__init__(*args, num_brokers=5, node_prealloc_count=1, **kwargs) def setUp(self): # start the nodes manually pass + def start_client_load(self, topic_name): + msg_size = 4096 + + if self.redpanda.dedicated_nodes: + rate_limit_bps = 100 * 2**20 + elif not self.debug_mode: + rate_limit_bps = 10 * 2**20 + else: + rate_limit_bps = 100 * 2**10 + + self.producer = kgo.KgoVerifierProducer( + self.test_context, + self.redpanda, + topic=topic_name, + msg_size=msg_size, + # some large number to get produce load till the end of test + msg_count=2**30, + rate_limit_bps=rate_limit_bps, + custom_node=self.preallocated_nodes) + self.producer.start(clean=False) + self.producer.wait_for_acks(10, timeout_sec=30, backoff_sec=1) + + self.consumer = kgo.KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + topic=topic_name, + msg_size=msg_size, + readers=5, + loop=True, + nodes=self.preallocated_nodes, + debug_logs=True) + self.consumer.start(clean=False) + self.consumer.wait_total_reads(10, timeout_sec=30, backoff_sec=1) + + def stop_client_load(self): + self.producer.stop() + self.consumer.wait_total_reads(self.producer.produce_status.acked, + timeout_sec=60, + backoff_sec=1) + self.consumer.stop() + + self.logger.info( + f"produced {self.producer.produce_status.acked} msgs, " + f"consumed {self.consumer.consumer_status.validator.valid_reads}") + assert self.consumer.consumer_status.validator.invalid_reads == 0 + assert self.consumer.consumer_status.validator.out_of_scope_invalid_reads == 0 + def get_replica_shard_map(self, nodes, admin=None): """Return map of topic -> partition -> [(node_id, core)]""" @@ -102,7 +150,7 @@ def is_stationary(): backoff_sec=backoff_sec) return shard_map - @cluster(num_nodes=5) + @cluster(num_nodes=6) def test_upgrade(self): # Disable partition balancer in this test, as we need partitions # to remain stationary. @@ -127,6 +175,8 @@ def test_upgrade(self): for topic in ["foo", "bar"]: rpk.create_topic(topic, partitions=n_partitions, replicas=3) + self.start_client_load("foo") + self.logger.info("created cluster and topics.") initial_map = self.get_replica_shard_map(seed_nodes, admin) self.print_shard_stats(initial_map) @@ -236,7 +286,9 @@ def test_upgrade(self): self.print_shard_stats(map_after_restart) assert map_after_restart == map_before_restart - @cluster(num_nodes=5) + self.stop_client_load() + + @cluster(num_nodes=6) def test_manual_rebalance(self): self.redpanda.start() @@ -248,6 +300,8 @@ def test_manual_rebalance(self): for topic in ["foo", "bar"]: rpk.create_topic(topic, partitions=n_partitions, replicas=5) + self.start_client_load("foo") + # Manually move some partitions to create artificial imbalance node = self.redpanda.nodes[0] @@ -287,7 +341,9 @@ def test_manual_rebalance(self): for topic, shard_counts in counts_by_topic.items(): assert max(shard_counts) - min(shard_counts) <= 1 - @cluster(num_nodes=5) + self.stop_client_load() + + @cluster(num_nodes=6) def test_core_count_change(self): self.redpanda.set_resource_settings(ResourceSettings(num_cpus=1)) self.redpanda.start() @@ -301,6 +357,8 @@ def test_core_count_change(self): # create topics with rf=5 for ease of accounting rpk.create_topic(topic, partitions=n_partitions, replicas=5) + self.start_client_load("foo") + # increase cpu count on one node, restart it and # check that new shards are in use. self.logger.info("increasing cpu count and restarting...") @@ -346,9 +404,11 @@ def test_core_count_change(self): self.print_shard_stats(map_after_restart) assert map_after_restart == shard_map + self.stop_client_load() + # TODO: core count decrease (not supported yet) - @cluster(num_nodes=5) + @cluster(num_nodes=6) def test_node_join(self): self.redpanda.add_extra_rp_conf({ "core_balancing_continuous": True, @@ -366,6 +426,8 @@ def test_node_join(self): for topic in topics: rpk.create_topic(topic, partitions=n_partitions, replicas=3) + self.start_client_load("foo") + self.logger.info(f"created topics: {topics}") initial_shard_map = self.wait_shard_map_stationary(seed_nodes, admin) self.print_shard_stats(initial_shard_map) @@ -407,3 +469,5 @@ def shard_rebalance_finished(): backoff_sec=2) self.logger.info("shard rebalance finished") self.print_shard_stats(shard_map_after_balance) + + self.stop_client_load()