Skip to content

Commit

Permalink
WIP shard balance ducktape test
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed May 23, 2024
1 parent 2c7b622 commit b9b3207
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions tests/rptest/tests/shard_balancer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import concurrent.futures

from ducktape.mark import matrix
from ducktape.utils.util import wait_until

from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from rptest.clients.rpk import RpkTool
from rptest.tests.redpanda_test import RedpandaTest


class ShardBalanceTest(RedpandaTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, num_brokers=5, **kwargs)

def setUp(self):
# start the nodes manually
pass

def get_topic_shard_counts(self, node):
core_count = self.redpanda.get_node_cpu_count()
partitions = Admin(self.redpanda).get_partitions(node=node)
topic2shard2count = dict()
for p in partitions:
topic2shard2count.setdefault(
p["topic"], list(0 for _ in range(core_count)))[p["core"]] += 1
return topic2shard2count

@cluster(num_nodes=5)
def test_node_join(self):
seed_nodes = self.redpanda.nodes[0:3]
joiner_nodes = self.redpanda.nodes[3:]

self.redpanda.add_extra_rp_conf({"flex_core_assignment": True})
self.redpanda.start(nodes=seed_nodes)

admin = Admin(self.redpanda, default_node=seed_nodes[0])
rpk = RpkTool(self.redpanda)

topics = ["foo", "bar", "quux"]

for topic in topics:
self.logger.warn(f"creating topic {topic}...")
rpk.create_topic(topic, partitions=20, replicas=3)

for p in range(20):
admin.set_partition_replica_core(topic="foo",
partition=p,
replica=3,
core=0,
node=seed_nodes[p % 3])
import time
time.sleep(1)

def print_shard_counts(nodes):
all_topics = ["controller"] + topics
core_count = self.redpanda.get_node_cpu_count()
max_len = max(len(t) for t in all_topics)
for n in nodes:
self.logger.warn(
f"shard replica counts on node {self.redpanda.node_id(n)}:"
)
topic2shard2count = self.get_topic_shard_counts(n)
total_counts = list(0 for _ in range(core_count))
for t in all_topics:
counts = topic2shard2count.get(
t, list(0 for _ in range(core_count)))
for i, c in enumerate(counts):
total_counts[i] += c
padding = ' ' * (max_len - len(t) + 2)
self.logger.warn(f"{t + padding}: {counts}")

self.logger.warn(f"total: {total_counts}")

self.logger.warn("BEFORE node join:")
print_shard_counts(seed_nodes)

self.logger.warn(f"adding nodes 4, 5 and waiting for rebalance...")
self.redpanda.start(nodes=joiner_nodes)

def rebalance_finished():
in_progress = admin.list_reconfigurations(node=seed_nodes[0])
self.logger.debug(
f"current number of reconfigurations: {len(in_progress)}")
if len(in_progress) > 0:
return False

partitions_on_joiners = [
len(admin.get_partitions(node=n)) for n in joiner_nodes
]
self.logger.debug(
f"partitions on joiners: {partitions_on_joiners}")
return min(partitions_on_joiners) > 5

wait_until(rebalance_finished, timeout_sec=60, backoff_sec=5)

self.logger.warn("AFTER nodes joined:")
print_shard_counts(self.redpanda.nodes)

0 comments on commit b9b3207

Please sign in to comment.