diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index 371759c5859e7..0740e981d0786 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -446,7 +446,7 @@ inline constexpr std::array feature_schema{ release_version::v24_2_1, "node_local_core_assignment", feature::node_local_core_assignment, - feature_spec::available_policy::explicit_only, + feature_spec::available_policy::always, feature_spec::prepare_policy::requires_migration}, feature_spec{ release_version::v24_2_1, diff --git a/tests/rptest/scale_tests/shard_placement_scale_test.py b/tests/rptest/scale_tests/shard_placement_scale_test.py index c71538d8fd2b9..b17623c6270ba 100644 --- a/tests/rptest/scale_tests/shard_placement_scale_test.py +++ b/tests/rptest/scale_tests/shard_placement_scale_test.py @@ -37,10 +37,6 @@ def setUp(self): # start the nodes manually pass - def enable_feature(self): - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - def start_omb(self): producer_rate_mbps = 100 @@ -109,7 +105,6 @@ def finish_omb(self): @skip_debug_mode def test_manual_moves(self): self.redpanda.start() - self.enable_feature() self.start_omb() @@ -160,7 +155,6 @@ def test_node_add(self): joiner_nodes = self.redpanda.nodes[4:] self.redpanda.start(nodes=seed_nodes) - self.enable_feature() self.start_omb() diff --git a/tests/rptest/tests/partition_move_interruption_test.py b/tests/rptest/tests/partition_move_interruption_test.py index ee649ee96abda..e20ec57afce4b 100644 --- a/tests/rptest/tests/partition_move_interruption_test.py +++ b/tests/rptest/tests/partition_move_interruption_test.py @@ -550,7 +550,7 @@ def test_cancelling_partition_move_node_down(self): id = self.redpanda.node_id(n) if id not in replica_ids: previous = assignments.pop() - assignments.append({"node_id": id, "core": 0}) + assignments.append({"node_id": id}) # stop a node that is going to be removed from current partition assignment to_stop = self.get_node_by_id(previous['node_id']) self.redpanda.stop_node(to_stop) @@ -640,7 +640,7 @@ def test_cancellations_interrupted_with_restarts(self, replication_factor): for id in available_ids: if id not in replica_ids: assignments.pop() - assignments.append({"node_id": id, "core": 0}) + assignments.append({"node_id": id}) self.logger.info( f"[{i}] moving {topic}/{partition}: {prev_assignments} -> {assignments}" diff --git a/tests/rptest/tests/partition_movement.py b/tests/rptest/tests/partition_movement.py index 0e5b61179aabb..7ab66b3aeeabb 100644 --- a/tests/rptest/tests/partition_movement.py +++ b/tests/rptest/tests/partition_movement.py @@ -70,7 +70,7 @@ def _choose_replacement(admin, assignments, allow_no_ops, return selected, replacements @staticmethod - def _get_assignments(admin, topic, partition, with_cores=True): + def _get_assignments(admin, topic, partition, with_cores=False): def try_get_partitions(): try: res = admin.get_partitions(topic, partition) @@ -200,7 +200,7 @@ def _do_move_and_verify(self, topic, partition, timeout_sec, - node_local_core_assignment=False): + node_local_core_assignment=True): _, new_assignment = self._dispatch_random_partition_move( topic=topic, partition=partition, @@ -262,7 +262,7 @@ def _set_partition_assignments(self, partition, assignments, admin=None, - node_local_core_assignment=False): + node_local_core_assignment=True): self.logger.info( f"setting assignments for {topic}/{partition}: {assignments}") @@ -289,7 +289,7 @@ def _dispatch_random_partition_move(self, partition, x_core_only=False, allow_no_op=True, - node_local_core_assignment=False): + node_local_core_assignment=True): """ Request partition replicas to be randomly moved diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py index 8dfd14f3a7ba8..5befffe6e3149 100644 --- a/tests/rptest/tests/partition_movement_test.py +++ b/tests/rptest/tests/partition_movement_test.py @@ -91,13 +91,6 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade): self.client().create_topic(spec) admin = Admin(self.redpanda) - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - # choose a random topic-partition self.logger.info(f"selected topic-partition: {topic}/{partition}") @@ -119,7 +112,7 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade): partition, assignments, admin=admin, - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) def node_assignments_converged(): info = admin.get_partitions(topic, partition) @@ -170,16 +163,9 @@ def test_empty(self, num_to_upgrade): for spec in topics: self.client().create_topic(spec) - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - for _ in range(25): self._move_and_verify( - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) @cluster(num_nodes=4, log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS + @@ -224,16 +210,9 @@ def test_static(self, num_to_upgrade): producer.free() self.logger.info(f"Producer stop complete.") - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - for _ in range(25): self._move_and_verify( - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) for spec in topics: self.logger.info(f"Verifying records in {spec}") @@ -307,19 +286,12 @@ def test_dynamic(self, num_to_upgrade): self.client().create_topic(spec) self.topic = spec.name - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - self.start_producer(1, throughput=throughput) self.start_consumer(1) self.await_startup() for _ in range(moves): self._move_and_verify( - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) self.run_validation(enable_idempotence=False, consumer_timeout_sec=45, min_records=records) @@ -353,13 +325,6 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade): self.start_consumer(1) self.await_startup() - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - admin = Admin(self.redpanda) topic = "__consumer_offsets" partition = 0 @@ -374,7 +339,7 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade): partition, assignments, admin=admin, - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) self._wait_post_move(topic, partition, assignments, 360) self.run_validation(enable_idempotence=False, @@ -401,16 +366,9 @@ def test_bootstrapping_after_move(self, num_to_upgrade): self.start_consumer(1) self.await_startup() - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - # execute single move self._move_and_verify( - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) self.run_validation(enable_idempotence=False, consumer_timeout_sec=45) # snapshot offsets @@ -483,12 +441,7 @@ def dispatch_and_assert_status_code(assignments, expected_sc): assert status_code == expected_sc, \ f"Expected {expected_sc} but got {status_code}" - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True + node_local_core_assignment = not test_mixed_versions # A valid node but an invalid core assignments = [{"node_id": valid_dest, "core": invalid_shard}] @@ -778,7 +731,7 @@ def test_availability_when_one_node_down(self): ] selected = random.choice(to_select) # replace one of the assignments - assignments[0] = {'node_id': selected['node_id'], 'core': 0} + assignments[0] = {'node_id': selected['node_id']} self.logger.info( f"new assignment for {self.topic}/{partition_id}: {assignments}") @@ -852,7 +805,7 @@ def test_stale_node(self, frequent_controller_snapshots): selected = random.choice(to_select) # replace one of the assignments replaced = assignments[0]['node_id'] - assignments[0] = {'node_id': selected['node_id'], 'core': 0} + assignments[0] = {'node_id': selected['node_id']} self.logger.info( f"target assignment for {self.topic}/{partition_id}: {assignments}" ) @@ -891,7 +844,7 @@ def status_done(): "first movement done, scheduling second movement back") # bring replaced node back - assignments[0] = {'node_id': replaced, 'core': 0} + assignments[0] = {'node_id': replaced} self._set_partition_assignments(self.topic, partition_id, assignments, admin) @@ -919,9 +872,6 @@ def test_movement_tracking_api(self): self.start_consumer(1) self.await_startup(min_records=throughput * 10, timeout_sec=60) - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - self.redpanda.set_cluster_config({"raft_learner_recovery_rate": 1}) brokers = admin.get_brokers() for partition in range(0, partition_count): @@ -936,11 +886,8 @@ def test_movement_tracking_api(self): self.logger.info( f"initial assignments for {self.topic}/{partition}: {prev_assignments}, new assignment: {assignments}" ) - self._set_partition_assignments(self.topic, - partition, - assignments, - admin, - node_local_core_assignment=True) + self._set_partition_assignments(self.topic, partition, assignments, + admin) wait_until( lambda: len(admin.list_reconfigurations()) == partition_count, 30) @@ -1047,13 +994,6 @@ def test_shadow_indexing(self, num_to_upgrade, cloud_storage_type): self.start_consumer(1) self.await_startup() - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - # We will start an upgrade halfway through the test: this ensures # that a single-version cluster existed for long enough to actually # upload some data to S3, before the upgrade potentially pauses @@ -1065,7 +1005,7 @@ def test_shadow_indexing(self, num_to_upgrade, cloud_storage_type): self._partial_upgrade(num_to_upgrade) self._move_and_verify( - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) self.run_validation(enable_idempotence=False, consumer_timeout_sec=45, @@ -1101,13 +1041,6 @@ def test_cross_shard(self, num_to_upgrade, cloud_storage_type): self.start_consumer(1) self.await_startup() - if test_mixed_versions: - node_local_core_assignment = False - else: - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - node_local_core_assignment = True - admin = Admin(self.redpanda) topic = self.topic partition = 0 @@ -1131,7 +1064,7 @@ def test_cross_shard(self, num_to_upgrade, cloud_storage_type): partition, assignments, admin=admin, - node_local_core_assignment=node_local_core_assignment) + node_local_core_assignment=not test_mixed_versions) self._wait_post_move(topic, partition, assignments, 360) self.run_validation(enable_idempotence=False, diff --git a/tests/rptest/tests/shard_placement_test.py b/tests/rptest/tests/shard_placement_test.py index 58587b5c21652..213748871a700 100644 --- a/tests/rptest/tests/shard_placement_test.py +++ b/tests/rptest/tests/shard_placement_test.py @@ -27,10 +27,6 @@ def setUp(self): # start the nodes manually pass - def enable_feature(self): - self.redpanda.set_feature_active("node_local_core_assignment", - active=True) - def start_client_load(self, topic_name): msg_size = 4096 @@ -269,13 +265,15 @@ def test_upgrade(self): initial_map = self.wait_shard_map_stationary(seed_nodes, admin) self.print_shard_stats(initial_map) - # Upgrade the cluster and enable the feature. + # Upgrade the cluster and wait for the feature activation. installer.install(seed_nodes, RedpandaInstaller.HEAD) self.redpanda.restart_nodes(seed_nodes) self.redpanda.wait_for_membership(first_start=False) - self.enable_feature() + self.redpanda.await_feature("node_local_core_assignment", + 'active', + timeout_sec=15) self.logger.info( "feature enabled, checking that shard map is stable...") @@ -378,7 +376,6 @@ def test_upgrade(self): @cluster(num_nodes=6) def test_manual_rebalance(self): self.redpanda.start() - self.enable_feature() admin = Admin(self.redpanda) rpk = RpkTool(self.redpanda) @@ -443,7 +440,6 @@ def test_core_count_change(self): self.redpanda.set_resource_settings( ResourceSettings(num_cpus=initial_core_count - 1)) self.redpanda.start() - self.enable_feature() admin = Admin(self.redpanda) rpk = RpkTool(self.redpanda) @@ -554,7 +550,6 @@ def test_node_join(self): seed_nodes = self.redpanda.nodes[0:3] joiner_nodes = self.redpanda.nodes[3:] self.redpanda.start(nodes=seed_nodes) - self.enable_feature() admin = Admin(self.redpanda, default_node=seed_nodes[0]) rpk = RpkTool(self.redpanda) diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 5f38b3a5fc57d..7ddf0b7cf0949 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -269,7 +269,8 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) - def movement_done(self, partition, assignments): + def movement_done(self, partition, to_nodes): + assignments = [dict(node_id=n) for n in to_nodes] results = [] for n in self.redpanda._started: info = self.admin.get_partitions(self.topic, partition, node=n) @@ -280,7 +281,7 @@ def movement_done(self, partition, assignments): results.append(converged and info["status"] == "done") return all(results) - def move_topic(self, assignments): + def move_topic(self, to_nodes): for partition in range(3): def get_nodes(partition): @@ -288,13 +289,17 @@ def get_nodes(partition): nodes_before = set( get_nodes(self.admin.get_partitions(self.topic, partition))) - nodes_after = {r['node_id'] for r in assignments} + nodes_after = set(to_nodes) if nodes_before == nodes_after: continue + assignments = [ + dict(node_id=n, core=PartitionMovementMixin.INVALID_CORE) + for n in to_nodes + ] self.admin.set_partition_replicas(self.topic, partition, assignments) - wait_until(lambda: self.movement_done(partition, assignments), + wait_until(lambda: self.movement_done(partition, to_nodes), timeout_sec=60, backoff_sec=2) @@ -309,8 +314,7 @@ def topic_delete_orphan_files_after_move_test(self): self.admin = Admin(self.redpanda) # Move every partition to nodes 1,2,3 - assignments = [dict(node_id=n, core=0) for n in [1, 2, 3]] - self.move_topic(assignments) + self.move_topic([1, 2, 3]) down_node = self.redpanda.nodes[0] try: @@ -319,8 +323,7 @@ def topic_delete_orphan_files_after_move_test(self): f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") # Move every partition from node 1 to node 4 - new_assignments = [dict(node_id=n, core=0) for n in [2, 3, 4]] - self.move_topic(new_assignments) + self.move_topic([2, 3, 4]) def topic_exist_on_every_node(redpanda, topic_name): storage = redpanda.storage() @@ -854,7 +857,10 @@ def get_nodes(partition): nodes_after = nodes_before[1:] + [ replacement_node, ] - new_assignments = list({'core': 0, 'node_id': n} for n in nodes_after) + new_assignments = list({ + 'node_id': n, + 'core': PartitionMovementMixin.INVALID_CORE + } for n in nodes_after) admin.set_partition_replicas(self.topic, 0, new_assignments) def move_complete():