Skip to content

Commit

Permalink
features: activate node_local_core_assignment automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Oct 28, 2024
1 parent 40e0dca commit 51faa6d
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 112 deletions.
2 changes: 1 addition & 1 deletion src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ inline constexpr std::array feature_schema{
release_version::v24_2_1,
"node_local_core_assignment",
feature::node_local_core_assignment,
feature_spec::available_policy::explicit_only,
feature_spec::available_policy::always,
feature_spec::prepare_policy::requires_migration},
feature_spec{
release_version::v24_2_1,
Expand Down
6 changes: 0 additions & 6 deletions tests/rptest/scale_tests/shard_placement_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ def setUp(self):
# start the nodes manually
pass

def enable_feature(self):
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)

def start_omb(self):
producer_rate_mbps = 100

Expand Down Expand Up @@ -109,7 +105,6 @@ def finish_omb(self):
@skip_debug_mode
def test_manual_moves(self):
self.redpanda.start()
self.enable_feature()

self.start_omb()

Expand Down Expand Up @@ -160,7 +155,6 @@ def test_node_add(self):
joiner_nodes = self.redpanda.nodes[4:]

self.redpanda.start(nodes=seed_nodes)
self.enable_feature()

self.start_omb()

Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/tests/partition_move_interruption_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def test_cancelling_partition_move_node_down(self):
id = self.redpanda.node_id(n)
if id not in replica_ids:
previous = assignments.pop()
assignments.append({"node_id": id, "core": 0})
assignments.append({"node_id": id})
# stop a node that is going to be removed from current partition assignment
to_stop = self.get_node_by_id(previous['node_id'])
self.redpanda.stop_node(to_stop)
Expand Down Expand Up @@ -640,7 +640,7 @@ def test_cancellations_interrupted_with_restarts(self, replication_factor):
for id in available_ids:
if id not in replica_ids:
assignments.pop()
assignments.append({"node_id": id, "core": 0})
assignments.append({"node_id": id})

self.logger.info(
f"[{i}] moving {topic}/{partition}: {prev_assignments} -> {assignments}"
Expand Down
8 changes: 4 additions & 4 deletions tests/rptest/tests/partition_movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _choose_replacement(admin, assignments, allow_no_ops,
return selected, replacements

@staticmethod
def _get_assignments(admin, topic, partition, with_cores=True):
def _get_assignments(admin, topic, partition, with_cores=False):
def try_get_partitions():
try:
res = admin.get_partitions(topic, partition)
Expand Down Expand Up @@ -200,7 +200,7 @@ def _do_move_and_verify(self,
topic,
partition,
timeout_sec,
node_local_core_assignment=False):
node_local_core_assignment=True):
_, new_assignment = self._dispatch_random_partition_move(
topic=topic,
partition=partition,
Expand Down Expand Up @@ -262,7 +262,7 @@ def _set_partition_assignments(self,
partition,
assignments,
admin=None,
node_local_core_assignment=False):
node_local_core_assignment=True):
self.logger.info(
f"setting assignments for {topic}/{partition}: {assignments}")

Expand All @@ -289,7 +289,7 @@ def _dispatch_random_partition_move(self,
partition,
x_core_only=False,
allow_no_op=True,
node_local_core_assignment=False):
node_local_core_assignment=True):
"""
Request partition replicas to be randomly moved
Expand Down
95 changes: 14 additions & 81 deletions tests/rptest/tests/partition_movement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
self.client().create_topic(spec)
admin = Admin(self.redpanda)

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# choose a random topic-partition
self.logger.info(f"selected topic-partition: {topic}/{partition}")

Expand All @@ -119,7 +112,7 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
partition,
assignments,
admin=admin,
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)

def node_assignments_converged():
info = admin.get_partitions(topic, partition)
Expand Down Expand Up @@ -170,16 +163,9 @@ def test_empty(self, num_to_upgrade):
for spec in topics:
self.client().create_topic(spec)

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

for _ in range(25):
self._move_and_verify(
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)

@cluster(num_nodes=4,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
Expand Down Expand Up @@ -224,16 +210,9 @@ def test_static(self, num_to_upgrade):
producer.free()
self.logger.info(f"Producer stop complete.")

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

for _ in range(25):
self._move_and_verify(
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)

for spec in topics:
self.logger.info(f"Verifying records in {spec}")
Expand Down Expand Up @@ -307,19 +286,12 @@ def test_dynamic(self, num_to_upgrade):
self.client().create_topic(spec)
self.topic = spec.name

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

self.start_producer(1, throughput=throughput)
self.start_consumer(1)
self.await_startup()
for _ in range(moves):
self._move_and_verify(
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)
self.run_validation(enable_idempotence=False,
consumer_timeout_sec=45,
min_records=records)
Expand Down Expand Up @@ -353,13 +325,6 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

admin = Admin(self.redpanda)
topic = "__consumer_offsets"
partition = 0
Expand All @@ -374,7 +339,7 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade):
partition,
assignments,
admin=admin,
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)
self._wait_post_move(topic, partition, assignments, 360)

self.run_validation(enable_idempotence=False,
Expand All @@ -401,16 +366,9 @@ def test_bootstrapping_after_move(self, num_to_upgrade):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# execute single move
self._move_and_verify(
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)
self.run_validation(enable_idempotence=False, consumer_timeout_sec=45)

# snapshot offsets
Expand Down Expand Up @@ -483,12 +441,7 @@ def dispatch_and_assert_status_code(assignments, expected_sc):
assert status_code == expected_sc, \
f"Expected {expected_sc} but got {status_code}"

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True
node_local_core_assignment = not test_mixed_versions

# A valid node but an invalid core
assignments = [{"node_id": valid_dest, "core": invalid_shard}]
Expand Down Expand Up @@ -778,7 +731,7 @@ def test_availability_when_one_node_down(self):
]
selected = random.choice(to_select)
# replace one of the assignments
assignments[0] = {'node_id': selected['node_id'], 'core': 0}
assignments[0] = {'node_id': selected['node_id']}
self.logger.info(
f"new assignment for {self.topic}/{partition_id}: {assignments}")

Expand Down Expand Up @@ -852,7 +805,7 @@ def test_stale_node(self, frequent_controller_snapshots):
selected = random.choice(to_select)
# replace one of the assignments
replaced = assignments[0]['node_id']
assignments[0] = {'node_id': selected['node_id'], 'core': 0}
assignments[0] = {'node_id': selected['node_id']}
self.logger.info(
f"target assignment for {self.topic}/{partition_id}: {assignments}"
)
Expand Down Expand Up @@ -891,7 +844,7 @@ def status_done():
"first movement done, scheduling second movement back")

# bring replaced node back
assignments[0] = {'node_id': replaced, 'core': 0}
assignments[0] = {'node_id': replaced}
self._set_partition_assignments(self.topic, partition_id, assignments,
admin)

Expand Down Expand Up @@ -919,9 +872,6 @@ def test_movement_tracking_api(self):
self.start_consumer(1)
self.await_startup(min_records=throughput * 10, timeout_sec=60)

self.redpanda.set_feature_active("node_local_core_assignment",
active=True)

self.redpanda.set_cluster_config({"raft_learner_recovery_rate": 1})
brokers = admin.get_brokers()
for partition in range(0, partition_count):
Expand All @@ -936,11 +886,8 @@ def test_movement_tracking_api(self):
self.logger.info(
f"initial assignments for {self.topic}/{partition}: {prev_assignments}, new assignment: {assignments}"
)
self._set_partition_assignments(self.topic,
partition,
assignments,
admin,
node_local_core_assignment=True)
self._set_partition_assignments(self.topic, partition, assignments,
admin)

wait_until(
lambda: len(admin.list_reconfigurations()) == partition_count, 30)
Expand Down Expand Up @@ -1047,13 +994,6 @@ def test_shadow_indexing(self, num_to_upgrade, cloud_storage_type):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# We will start an upgrade halfway through the test: this ensures
# that a single-version cluster existed for long enough to actually
# upload some data to S3, before the upgrade potentially pauses
Expand All @@ -1065,7 +1005,7 @@ def test_shadow_indexing(self, num_to_upgrade, cloud_storage_type):
self._partial_upgrade(num_to_upgrade)

self._move_and_verify(
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)

self.run_validation(enable_idempotence=False,
consumer_timeout_sec=45,
Expand Down Expand Up @@ -1101,13 +1041,6 @@ def test_cross_shard(self, num_to_upgrade, cloud_storage_type):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

admin = Admin(self.redpanda)
topic = self.topic
partition = 0
Expand All @@ -1131,7 +1064,7 @@ def test_cross_shard(self, num_to_upgrade, cloud_storage_type):
partition,
assignments,
admin=admin,
node_local_core_assignment=node_local_core_assignment)
node_local_core_assignment=not test_mixed_versions)
self._wait_post_move(topic, partition, assignments, 360)

self.run_validation(enable_idempotence=False,
Expand Down
13 changes: 4 additions & 9 deletions tests/rptest/tests/shard_placement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ def setUp(self):
# start the nodes manually
pass

def enable_feature(self):
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)

def start_client_load(self, topic_name):
msg_size = 4096

Expand Down Expand Up @@ -269,13 +265,15 @@ def test_upgrade(self):
initial_map = self.wait_shard_map_stationary(seed_nodes, admin)
self.print_shard_stats(initial_map)

# Upgrade the cluster and enable the feature.
# Upgrade the cluster and wait for the feature activation.

installer.install(seed_nodes, RedpandaInstaller.HEAD)
self.redpanda.restart_nodes(seed_nodes)
self.redpanda.wait_for_membership(first_start=False)

self.enable_feature()
self.redpanda.await_feature("node_local_core_assignment",
'active',
timeout_sec=15)

self.logger.info(
"feature enabled, checking that shard map is stable...")
Expand Down Expand Up @@ -378,7 +376,6 @@ def test_upgrade(self):
@cluster(num_nodes=6)
def test_manual_rebalance(self):
self.redpanda.start()
self.enable_feature()

admin = Admin(self.redpanda)
rpk = RpkTool(self.redpanda)
Expand Down Expand Up @@ -443,7 +440,6 @@ def test_core_count_change(self):
self.redpanda.set_resource_settings(
ResourceSettings(num_cpus=initial_core_count - 1))
self.redpanda.start()
self.enable_feature()

admin = Admin(self.redpanda)
rpk = RpkTool(self.redpanda)
Expand Down Expand Up @@ -554,7 +550,6 @@ def test_node_join(self):
seed_nodes = self.redpanda.nodes[0:3]
joiner_nodes = self.redpanda.nodes[3:]
self.redpanda.start(nodes=seed_nodes)
self.enable_feature()

admin = Admin(self.redpanda, default_node=seed_nodes[0])
rpk = RpkTool(self.redpanda)
Expand Down
Loading

0 comments on commit 51faa6d

Please sign in to comment.