Skip to content

Commit

Permalink
Merge pull request #22621 from vbotbuildovich/backport-pr-22533-v24.2…
Browse files Browse the repository at this point in the history
….x-186

[v24.2.x] features: make node_local_core_assignment explicit-only
  • Loading branch information
piyushredpanda authored Jul 29, 2024
2 parents a219c8a + 70c7ecf commit c49772d
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ constexpr static std::array feature_schema{
cluster::cluster_version{13},
"node_local_core_assignment",
feature::node_local_core_assignment,
feature_spec::available_policy::new_clusters_only,
feature_spec::available_policy::explicit_only,
feature_spec::prepare_policy::requires_migration},
feature_spec{
cluster::cluster_version{13},
Expand Down
6 changes: 6 additions & 0 deletions tests/rptest/scale_tests/shard_placement_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def setUp(self):
# start the nodes manually
pass

def enable_feature(self):
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)

def start_omb(self):
workload = {
"name": "CommonWorkload",
Expand Down Expand Up @@ -102,6 +106,7 @@ def finish_omb(self):
@skip_debug_mode
def test_manual_moves(self):
self.redpanda.start()
self.enable_feature()

self.start_omb()

Expand Down Expand Up @@ -152,6 +157,7 @@ def test_node_add(self):
joiner_nodes = self.redpanda.nodes[4:]

self.redpanda.start(nodes=seed_nodes)
self.enable_feature()

self.start_omb()

Expand Down
39 changes: 27 additions & 12 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3323,11 +3323,32 @@ def is_ready():
raise AssertionError(
"Nodes report restart required but expect_restart is False")

def set_feature_active(self, feature_name: str, active: bool, *,
timeout_sec: int):
state = 'active' if active else 'disabled'
self._admin.put_feature(feature_name, {"state": state})
self.await_feature(feature_name, state, timeout_sec=timeout_sec)
def set_feature_active(self,
feature_name: str,
active: bool,
*,
timeout_sec: int = 15):
target_state = 'active' if active else 'disabled'
cur_state = self.get_feature_state(feature_name)
if active and cur_state == 'unavailable':
# If we have just restarted after an upgrade, wait for cluster version
# to progress and for the feature to become available.
self.await_feature(feature_name,
'available',
timeout_sec=timeout_sec)
self._admin.put_feature(feature_name, {"state": target_state})
self.await_feature(feature_name, target_state, timeout_sec=timeout_sec)

def get_feature_state(self,
feature_name: str,
node: ClusterNode | None = None):
f = self._admin.get_features(node=node)
by_name = dict((f['name'], f) for f in f['features'])
try:
state = by_name[feature_name]['state']
except KeyError:
state = None
return state

def await_feature(self,
feature_name: str,
Expand All @@ -3344,13 +3365,7 @@ def await_feature(self,

def is_awaited_state():
for n in nodes:
f = self._admin.get_features(node=n)
by_name = dict((f['name'], f) for f in f['features'])
try:
state = by_name[feature_name]['state']
except KeyError:
state = None

state = self.get_feature_state(feature_name)
if state != await_state:
self.logger.info(
f"Feature {feature_name} not yet {await_state} on {n.name} (state {state})"
Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/tests/partition_move_interruption_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def test_cancelling_partition_move_node_down(self):
id = self.redpanda.node_id(n)
if id not in replica_ids:
previous = assignments.pop()
assignments.append({"node_id": id})
assignments.append({"node_id": id, "core": 0})
# stop a node that is going to be removed from current partition assignment
to_stop = self.get_node_by_id(previous['node_id'])
self.redpanda.stop_node(to_stop)
Expand Down Expand Up @@ -640,7 +640,7 @@ def test_cancellations_interrupted_with_restarts(self, replication_factor):
for id in available_ids:
if id not in replica_ids:
assignments.pop()
assignments.append({"node_id": id})
assignments.append({"node_id": id, "core": 0})

self.logger.info(
f"[{i}] moving {topic}/{partition}: {prev_assignments} -> {assignments}"
Expand Down
8 changes: 4 additions & 4 deletions tests/rptest/tests/partition_movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _choose_replacement(admin, assignments, allow_no_ops,
return selected, replacements

@staticmethod
def _get_assignments(admin, topic, partition, with_cores=False):
def _get_assignments(admin, topic, partition, with_cores=True):
def try_get_partitions():
try:
res = admin.get_partitions(topic, partition)
Expand Down Expand Up @@ -200,7 +200,7 @@ def _do_move_and_verify(self,
topic,
partition,
timeout_sec,
node_local_core_assignment=True):
node_local_core_assignment=False):
_, new_assignment = self._dispatch_random_partition_move(
topic=topic,
partition=partition,
Expand Down Expand Up @@ -262,7 +262,7 @@ def _set_partition_assignments(self,
partition,
assignments,
admin=None,
node_local_core_assignment=True):
node_local_core_assignment=False):
self.logger.info(
f"setting assignments for {topic}/{partition}: {assignments}")

Expand All @@ -289,7 +289,7 @@ def _dispatch_random_partition_move(self,
partition,
x_core_only=False,
allow_no_op=True,
node_local_core_assignment=True):
node_local_core_assignment=False):
"""
Request partition replicas to be randomly moved
Expand Down
97 changes: 83 additions & 14 deletions tests/rptest/tests/partition_movement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
self.client().create_topic(spec)
admin = Admin(self.redpanda)

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# choose a random topic-partition
self.logger.info(f"selected topic-partition: {topic}/{partition}")

Expand All @@ -112,7 +119,7 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
partition,
assignments,
admin=admin,
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)

def node_assignments_converged():
info = admin.get_partitions(topic, partition)
Expand Down Expand Up @@ -163,9 +170,16 @@ def test_empty(self, num_to_upgrade):
for spec in topics:
self.client().create_topic(spec)

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

for _ in range(25):
self._move_and_verify(
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)

@cluster(num_nodes=4,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
Expand Down Expand Up @@ -210,9 +224,16 @@ def test_static(self, num_to_upgrade):
producer.free()
self.logger.info(f"Producer stop complete.")

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

for _ in range(25):
self._move_and_verify(
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)

for spec in topics:
self.logger.info(f"Verifying records in {spec}")
Expand Down Expand Up @@ -286,12 +307,19 @@ def test_dynamic(self, num_to_upgrade):
self.client().create_topic(spec)
self.topic = spec.name

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

self.start_producer(1, throughput=throughput)
self.start_consumer(1)
self.await_startup()
for _ in range(moves):
self._move_and_verify(
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)
self.run_validation(enable_idempotence=False,
consumer_timeout_sec=45,
min_records=records)
Expand Down Expand Up @@ -325,6 +353,13 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

admin = Admin(self.redpanda)
topic = "__consumer_offsets"
partition = 0
Expand All @@ -339,7 +374,7 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade):
partition,
assignments,
admin=admin,
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)
self._wait_post_move(topic, partition, assignments, 360)

self.run_validation(enable_idempotence=False,
Expand All @@ -365,9 +400,17 @@ def test_bootstrapping_after_move(self, num_to_upgrade):
self.start_producer(1)
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# execute single move
self._move_and_verify(
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)
self.run_validation(enable_idempotence=False, consumer_timeout_sec=45)

# snapshot offsets
Expand Down Expand Up @@ -440,7 +483,12 @@ def dispatch_and_assert_status_code(assignments, expected_sc):
assert status_code == expected_sc, \
f"Expected {expected_sc} but got {status_code}"

node_local_core_assignment = not test_mixed_versions
if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# A valid node but an invalid core
assignments = [{"node_id": valid_dest, "core": invalid_shard}]
Expand Down Expand Up @@ -730,7 +778,7 @@ def test_availability_when_one_node_down(self):
]
selected = random.choice(to_select)
# replace one of the assignments
assignments[0] = {'node_id': selected['node_id']}
assignments[0] = {'node_id': selected['node_id'], 'core': 0}
self.logger.info(
f"new assignment for {self.topic}/{partition_id}: {assignments}")

Expand Down Expand Up @@ -804,7 +852,7 @@ def test_stale_node(self, frequent_controller_snapshots):
selected = random.choice(to_select)
# replace one of the assignments
replaced = assignments[0]['node_id']
assignments[0] = {'node_id': selected['node_id']}
assignments[0] = {'node_id': selected['node_id'], 'core': 0}
self.logger.info(
f"target assignment for {self.topic}/{partition_id}: {assignments}"
)
Expand Down Expand Up @@ -843,7 +891,7 @@ def status_done():
"first movement done, scheduling second movement back")

# bring replaced node back
assignments[0] = {'node_id': replaced}
assignments[0] = {'node_id': replaced, 'core': 0}
self._set_partition_assignments(self.topic, partition_id, assignments,
admin)

Expand All @@ -870,6 +918,10 @@ def test_movement_tracking_api(self):
self.start_producer(1, throughput=throughput)
self.start_consumer(1)
self.await_startup(min_records=throughput * 10, timeout_sec=60)

self.redpanda.set_feature_active("node_local_core_assignment",
active=True)

self.redpanda.set_cluster_config({"raft_learner_recovery_rate": 1})
brokers = admin.get_brokers()
for partition in range(0, partition_count):
Expand All @@ -884,8 +936,11 @@ def test_movement_tracking_api(self):
self.logger.info(
f"initial assignments for {self.topic}/{partition}: {prev_assignments}, new assignment: {assignments}"
)
self._set_partition_assignments(self.topic, partition, assignments,
admin)
self._set_partition_assignments(self.topic,
partition,
assignments,
admin,
node_local_core_assignment=True)

wait_until(
lambda: len(admin.list_reconfigurations()) == partition_count, 30)
Expand Down Expand Up @@ -990,6 +1045,13 @@ def test_shadow_indexing(self, num_to_upgrade, cloud_storage_type):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

# We will start an upgrade halfway through the test: this ensures
# that a single-version cluster existed for long enough to actually
# upload some data to S3, before the upgrade potentially pauses
Expand All @@ -1001,7 +1063,7 @@ def test_shadow_indexing(self, num_to_upgrade, cloud_storage_type):
self._partial_upgrade(num_to_upgrade)

self._move_and_verify(
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)

self.run_validation(enable_idempotence=False,
consumer_timeout_sec=45,
Expand Down Expand Up @@ -1035,6 +1097,13 @@ def test_cross_shard(self, num_to_upgrade, cloud_storage_type):
self.start_consumer(1)
self.await_startup()

if test_mixed_versions:
node_local_core_assignment = False
else:
self.redpanda.set_feature_active("node_local_core_assignment",
active=True)
node_local_core_assignment = True

admin = Admin(self.redpanda)
topic = self.topic
partition = 0
Expand All @@ -1058,7 +1127,7 @@ def test_cross_shard(self, num_to_upgrade, cloud_storage_type):
partition,
assignments,
admin=admin,
node_local_core_assignment=not test_mixed_versions)
node_local_core_assignment=node_local_core_assignment)
self._wait_post_move(topic, partition, assignments, 360)

self.run_validation(enable_idempotence=False,
Expand Down
Loading

0 comments on commit c49772d

Please sign in to comment.