Skip to content

Commit

Permalink
tests: adapt partition move tests to node-local core assignment
Browse files Browse the repository at this point in the history
In tests we need to support both the old way to dispatch x-core
movements (via admin.set_partition_replicas) and the new way (via
admin.set_partition_replica_core) depending on whether the cluster is
new or in the process of upgrade. To allow that, change functions in
PartitionMovementMixin:
1) allow omitting "core" field in replica assignment dicts (absent field
   means that the core doesn't matter)
2) add node_local_core_assignment flag to switch between the old and the
   new way.

Also, modify available_policy for the feature flag to ensure that for
most tests node-local core assignment will be enabled.
  • Loading branch information
ztlpn committed Jun 7, 2024
1 parent 6b11db0 commit b305bfe
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 126 deletions.
2 changes: 1 addition & 1 deletion src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ constexpr static std::array feature_schema{
cluster::cluster_version{13},
"node_local_core_assignment",
feature::node_local_core_assignment,
feature_spec::available_policy::explicit_only,
feature_spec::available_policy::new_clusters_only,
feature_spec::prepare_policy::requires_migration},
};

Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/tests/controller_log_limiting_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ def __init__(self, ctx, *args, **kwargs):
def perform_move(self, topic, partition):
old_assignments, new_assignment = self._dispatch_random_partition_move(
topic=topic.name, partition=partition)
return old_assignments == new_assignment
return self._equal_assignments(old_assignments, new_assignment)

@cluster(num_nodes=3)
def test_move_partition_limit(self):
self.start_redpanda(num_nodes=3)

topic = TopicSpec(partition_count=3)
topic = TopicSpec(partition_count=3, replication_factor=1)
self.client().create_topic(topic)
try:
while (self.perform_move(topic, 0)):
Expand Down
12 changes: 7 additions & 5 deletions tests/rptest/tests/partition_move_interruption_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ def new_controller():
wait_until(new_controller, 10, 1)

# update replica set
admin.set_partition_replicas(self.topic, partition, new_assignment)
self._set_partition_assignments(self.topic, partition, new_assignment,
admin)

self._wait_for_move_in_progress(self.topic, partition)

Expand Down Expand Up @@ -549,7 +550,7 @@ def test_cancelling_partition_move_node_down(self):
id = self.redpanda.node_id(n)
if id not in replica_ids:
previous = assignments.pop()
assignments.append({"node_id": id, "core": 0})
assignments.append({"node_id": id})
# stop a node that is going to be removed from current partition assignment
to_stop = self.get_node_by_id(previous['node_id'])
self.redpanda.stop_node(to_stop)
Expand All @@ -567,7 +568,7 @@ def new_controller():
self.logger.info(
f"moving {topic}/{partition}: {prev_assignments} -> {assignments}")

admin.set_partition_replicas(topic, partition, assignments)
self._set_partition_assignments(topic, partition, assignments, admin)

self._wait_for_move_in_progress(topic, partition)

Expand Down Expand Up @@ -639,13 +640,14 @@ def test_cancellations_interrupted_with_restarts(self, replication_factor):
for id in available_ids:
if id not in replica_ids:
assignments.pop()
assignments.append({"node_id": id, "core": 0})
assignments.append({"node_id": id})

self.logger.info(
f"[{i}] moving {topic}/{partition}: {prev_assignments} -> {assignments}"
)

admin.set_partition_replicas(topic, partition, assignments)
self._set_partition_assignments(topic, partition, assignments,
admin)

self._wait_for_move_in_progress(topic, partition)

Expand Down
162 changes: 121 additions & 41 deletions tests/rptest/tests/partition_movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,30 @@


class PartitionMovementMixin():

# Use in assignments sent to redpanda when node-local core assignment
# is enabled, to test that the actual value is ignored.
INVALID_CORE = 12121212

@staticmethod
def _random_partition(metadata):
topic = random.choice(metadata)
partition = random.choice(topic.partitions)
return topic.name, partition.id

@staticmethod
def _choose_replacement(admin, assignments, allow_no_ops=True):
def _choose_replacement(admin, assignments, allow_no_ops,
node_local_core_assignment):
"""
Does not produce assignments that contain duplicate nodes. This is a
limitation in redpanda raft implementation.
"""
replication_factor = len(assignments)
node_ids = lambda x: set([a["node_id"] for a in x])
orig_node_ids = node_ids(assignments)

assert replication_factor >= 1
assert len(node_ids(assignments)) == replication_factor
assert len(orig_node_ids) == replication_factor

# remove random assignment(s). we allow no changes to be made to
# exercise the code paths responsible for dealing with no-ops.
Expand All @@ -51,8 +58,10 @@ def _choose_replacement(admin, assignments, allow_no_ops=True):
node_id = broker["node_id"]
if node_id in node_ids(assignments):
continue
core = random.randint(0, broker["num_cores"] - 1)
replacement = dict(node_id=node_id, core=core)
replacement = dict(node_id=node_id)
if (node_id in orig_node_ids) or (not node_local_core_assignment):
replacement["core"] = \
random.randint(0, broker["num_cores"] - 1)
if not allow_no_ops and replacement in selected:
continue
assignments.append(replacement)
Expand All @@ -61,7 +70,7 @@ def _choose_replacement(admin, assignments, allow_no_ops=True):
return selected, replacements

@staticmethod
def _get_assignments(admin, topic, partition):
def _get_assignments(admin, topic, partition, with_cores=False):
def try_get_partitions():
try:
res = admin.get_partitions(topic, partition)
Expand All @@ -76,20 +85,34 @@ def try_get_partitions():
backoff_sec=1)

def normalize(a):
return dict(node_id=a["node_id"], core=a["core"])
ret = dict(node_id=a["node_id"])
if with_cores:
ret["core"] = a["core"]
return ret

return [normalize(a) for a in res["replicas"]]

@staticmethod
def _equal_assignments(r0, r1):
def to_tuple(a):
return a["node_id"], a["core"]
# Core in an assignment can be unknown (None). In this case it is
# considered equal to any core value in the other assignment.
def node2core(assignments):
return dict((a["node_id"], a.get("core")) for a in assignments)

left_n2c = node2core(r0)
right_n2c = node2core(r1)

r0 = [to_tuple(a) for a in r0]
r1 = [to_tuple(a) for a in r1]
return set(r0) == set(r1)
if set(left_n2c.keys()) != set(right_n2c.keys()):
return False

def _get_current_partitions(self, admin, topic, partition_id):
for n, c in left_n2c.items():
rc = right_n2c[n]
if c != rc and c is not None and rc is not None:
return False

return True

def _get_current_node_cores(self, admin, topic, partition_id):
def keep(p):
return p["ns"] == "kafka" and p["topic"] == topic and p[
"partition_id"] == partition_id
Expand All @@ -109,28 +132,34 @@ def _wait_post_move(self, topic, partition, assignments, timeout_sec):
retry_codes=[404, 503, 504],
retries_amount=10)

def status_done():
def node_assignments_converged():
results = []
for n in self.redpanda._started:
info = admin.get_partitions(topic, partition, node=n)
node_assignments = [{
"node_id": r["node_id"]
} for r in info["replicas"]]
self.logger.info(
f"current assignments for {topic}-{partition}: {info}")
converged = self._equal_assignments(info["replicas"],
f"node assignments for {topic}/{partition}: {node_assignments}, "
f"partition status: {info['status']}")
converged = self._equal_assignments(node_assignments,
assignments)
results.append(converged and info["status"] == "done")

return all(results)

# wait until redpanda reports complete
wait_until(status_done, timeout_sec=timeout_sec, backoff_sec=2)
wait_until(node_assignments_converged,
timeout_sec=timeout_sec,
backoff_sec=2)

def derived_done():
info = self._get_current_partitions(admin, topic, partition)
def cores_converged():
info = self._get_current_node_cores(admin, topic, partition)
self.logger.info(
f"derived assignments for {topic}-{partition}: {info}")
f"current core placement for {topic}/{partition}: {info}")
return self._equal_assignments(info, assignments)

wait_until(derived_done, timeout_sec=timeout_sec, backoff_sec=2)
wait_until(cores_converged, timeout_sec=timeout_sec, backoff_sec=2)

def _wait_post_cancel(self, topic, partition, prev_assignments,
new_assignment, timeout_sec):
Expand All @@ -148,29 +177,40 @@ def cancel_finished():

result_configuration = admin.wait_stable_configuration(
topic=topic, partition=partition, timeout_s=timeout_sec)
# don't check core placement as x-core moves can't be cancelled if
# node-local core assignment is enabled (only assigned anew).
cur_replicas = [{
"node_id": r.node_id
} for r in result_configuration.replicas]

movement_cancelled = self._equal_assignments(
result_configuration.replicas, prev_assignments)
self.logger.info(
f"current replicas for {topic}/{partition}: {cur_replicas}")
movement_cancelled = self._equal_assignments(cur_replicas,
prev_assignments)

# Can happen if movement was already in un revertable state
movement_finished = False
if new_assignment is not None:
movement_finished = self._equal_assignments(
result_configuration.replicas, new_assignment)
cur_replicas, new_assignment)

assert movement_cancelled or movement_finished

def _do_move_and_verify(self, topic, partition, timeout_sec):
admin = Admin(self.redpanda)

def _do_move_and_verify(self,
topic,
partition,
timeout_sec,
node_local_core_assignment=True):
_, new_assignment = self._dispatch_random_partition_move(
topic=topic, partition=partition)
topic=topic,
partition=partition,
node_local_core_assignment=node_local_core_assignment)

self._wait_post_move(topic, partition, new_assignment, timeout_sec)

return (topic, partition, new_assignment)

def _move_and_verify(self):
def _move_and_verify(self, node_local_core_assignment):
# choose a random topic-partition
metadata = self.client().describe_topics()
topic, partition = self._random_partition(metadata)
Expand All @@ -179,14 +219,16 @@ def _move_and_verify(self):
# and redpanda debug builds it may take a very long time
timeout_sec = 360 if topic == "__consumer_offsets" else 90

self.logger.info(f"selected topic-partition: {topic}-{partition}")
self.logger.info(f"selected topic-partition: {topic}/{partition}")

self._do_move_and_verify(topic, partition, timeout_sec)
self._do_move_and_verify(
topic,
partition,
timeout_sec,
node_local_core_assignment=node_local_core_assignment)

def _replace_replica_set(self,
assignments,
allow_no_ops=True,
x_core_only=False):
def _replace_replica_set(self, assignments, allow_no_ops, x_core_only,
node_local_core_assignment):
"""
replaces random number of replicas in `assignments` list of replicas
Expand All @@ -208,39 +250,77 @@ def _replace_replica_set(self,
return selected, assignments

selected, replacements = self._choose_replacement(
admin, assignments, allow_no_ops=allow_no_ops)
admin,
assignments,
allow_no_ops=allow_no_ops,
node_local_core_assignment=node_local_core_assignment)

return selected, replacements

def _set_partition_assignments(self,
topic,
partition,
assignments,
admin=None,
node_local_core_assignment=True):
self.logger.info(
f"setting assignments for {topic}/{partition}: {assignments}")

if admin is None:
admin = Admin(self.redpanda)

if not node_local_core_assignment:
admin.set_partition_replicas(topic, partition, assignments)
else:
admin.set_partition_replicas(topic, partition,
[{
"node_id": a["node_id"],
"core": self.INVALID_CORE,
} for a in assignments])

for assignment in assignments:
if "core" in assignment:
admin.set_partition_replica_core(topic, partition,
assignment["node_id"],
assignment["core"])

def _dispatch_random_partition_move(self,
topic,
partition,
x_core_only=False,
allow_no_op=True):
allow_no_op=True,
node_local_core_assignment=True):
"""
Request partition replicas to be randomly moved
:param partition: partition id to be moved
:param x_core_only: when true assignment nodes will not be changed, only cores
"""
admin = Admin(self.redpanda)
assignments = self._get_assignments(admin, topic, partition)
assignments = self._get_assignments(
admin, topic, partition, with_cores=not node_local_core_assignment)
prev_assignments = assignments.copy()

self.logger.info(
f"initial assignments for {topic}/{partition}: {prev_assignments}")

# build new replica set by replacing a random assignment, do not allow no ops as we want to have operation to cancel
selected, replacements = self._replace_replica_set(
assignments, x_core_only=x_core_only, allow_no_ops=allow_no_op)
assignments,
x_core_only=x_core_only,
allow_no_ops=allow_no_op,
node_local_core_assignment=node_local_core_assignment)

self.logger.info(
f"chose {len(selected)} replacements for {topic}/{partition}: {selected} -> {replacements}"
)
self.logger.info(
f"new assignments for {topic}/{partition}: {assignments}")

admin.set_partition_replicas(topic, partition, assignments)
self._set_partition_assignments(
topic,
partition,
assignments,
admin=admin,
node_local_core_assignment=node_local_core_assignment)

return prev_assignments, assignments

Expand Down
Loading

0 comments on commit b305bfe

Please sign in to comment.