diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py index 260a8c4d93e8..acbb8f7c73cf 100644 --- a/tests/rptest/tests/partition_movement_test.py +++ b/tests/rptest/tests/partition_movement_test.py @@ -18,6 +18,7 @@ from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest from rptest.services.admin import Admin +from rptest.services.honey_badger import HoneyBadger from kafka import KafkaProducer from kafka import KafkaConsumer @@ -147,6 +148,66 @@ def derived_done(): wait_until(derived_done, timeout_sec=30, backoff_sec=1) + @cluster(num_nodes=3) + def test_moving_not_fully_initialized_partition(self): + """ + Move partition before first leader is elected + """ + topics = [] + hb = HoneyBadger() + # if failure injector is not enabled simply skip this test + if not hb.is_enabled(self.redpanda.nodes[0]): + return + + for n in self.redpanda.nodes: + hb.set_exception(n, 'raftgen_service::failure_probes', 'vote') + topic = "topic-1" + partition = 0 + spec = TopicSpec(name=topic, partition_count=1, replication_factor=3) + self.redpanda.create_topic(spec) + admin = Admin(self.redpanda) + + # choose a random topic-partition + self.logger.info(f"selected topic-partition: {topic}-{partition}") + + # get the partition's replica set, including core assignments. the kafka + # api doesn't expose core information, so we use the redpanda admin api. + assignments = self._get_assignments(admin, topic, partition) + self.logger.info(f"assignments for {topic}-{partition}: {assignments}") + + brokers = admin.get_brokers() + # replace all node cores in assignment + for assignment in assignments: + for broker in brokers: + if broker['node_id'] == assignment['node_id']: + assignment['core'] = random.randint( + 0, broker["num_cores"] - 1) + self.logger.info( + f"new assignments for {topic}-{partition}: {assignments}") + + admin.set_partition_replicas(topic, partition, assignments) + + def status_done(): + info = admin.get_partitions(topic, partition) + self.logger.info( + f"current assignments for {topic}-{partition}: {info}") + converged = self._equal_assignments(info["replicas"], assignments) + return converged and info["status"] == "done" + + # unset failures + for n in self.redpanda.nodes: + hb.unset_failures(n, 'raftgen_service::failure_probes', 'vote') + # wait until redpanda reports complete + wait_until(status_done, timeout_sec=30, backoff_sec=1) + + def derived_done(): + info = self._get_current_partitions(admin, topic, partition) + self.logger.info( + f"derived assignments for {topic}-{partition}: {info}") + return self._equal_assignments(info, assignments) + + wait_until(derived_done, timeout_sec=30, backoff_sec=1) + @cluster(num_nodes=3) def test_empty(self): """