Skip to content

Commit

Permalink
rptest: actual e2e throttling test
Browse files Browse the repository at this point in the history
Previous version of the test was setting an relatively low throughput
limit, then tried to consume data at full speed and monitor a metric to
see if tiered storage throttler applied any throttling.

This fails often because we actually throttle in the tiered storage
layer and we also apply disk throttling using seastar scheduling groups.

We can't measure if the later was applied and sometimes it provides
enough throttling that we never have to apply the tiered storage
throttling on the download path.

---

The new test is instead an e2e test. We consume a few times and measure
the average throughput. Then we throttle at half of that and measure a
few times again. The test succeeds if the second measurement was at half
of the first run.
  • Loading branch information
nvartolomei committed Jul 16, 2024
1 parent 90ac8f7 commit b9fcc7a
Showing 1 changed file with 76 additions and 15 deletions.
91 changes: 76 additions & 15 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1303,24 +1303,25 @@ def test_spillover(self, cloud_storage_type):


class EndToEndThrottlingTest(RedpandaTest):
topics = (TopicSpec(partition_count=8,
topics = (TopicSpec(partition_count=3,
cleanup_policy=TopicSpec.CLEANUP_DELETE), )

def __init__(self, test_context):
si_settings = SISettings(
test_context,
log_segment_size=1024 * 1024,
log_segment_size=10 * 1024 * 1024,
fast_uploads=True,
# Set small throughput limit to trigger throttling
cloud_storage_max_throughput_per_shard=4 * 1024 * 1024)
)

super(EndToEndThrottlingTest, self).__init__(test_context=test_context,
si_settings=si_settings)

self.rpk = RpkTool(self.redpanda)
# 1.3GiB
self.admin = Admin(self.redpanda)

# 1 GiB of data
self.msg_size = 1024 * 128
self.msg_count = 10000
self.msg_count = 8196

def get_throttling_metric(self):
return self.redpanda.metric_sum(
Expand Down Expand Up @@ -1349,20 +1350,60 @@ def consume(self):
consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
topic_name,
msg_size=self.msg_size,
loop=False,
debug_logs=True,
trace_logs=True)
consumer.start()

start = time.time()
consumer.start()
consumer.wait(timeout_sec=400)
consume_duration = time.time() - start

assert consumer.consumer_status.validator.invalid_reads == 0
assert consumer.consumer_status.validator.valid_reads >= self.msg_count

consumer.free()

return consume_duration

def measure_consume_throughput(self):
bw_measurements = []
for iter_ix in range(2):
# Trim cache.
self.redpanda.for_nodes(
self.redpanda.nodes, lambda n: self.admin.cloud_storage_trim(
byte_limit=0, object_limit=0, node=n))

# Restart redpanda to make sure it gave up on all the file handles.
self.redpanda.restart_nodes(self.redpanda.nodes)

# Wait all topics to have leadership.
wait_until(lambda: all(
self.admin.get_partition_leader(
namespace='kafka', topic=self.topic, partition=p) != -1
for p in range(self.topics[0].partition_count)),
timeout_sec=60,
backoff_sec=1)

# Measure throughput.
self.logger.info(f"Start consumer iteration {iter_ix}")
duration = self.consume()
bw = self.msg_count * self.msg_size / duration
self.logger.info(
f"Consumer took {duration} seconds. Measured throughput: {bw} bytes/sec"
)
bw_measurements.append(bw)

return sum(bw_measurements) / len(bw_measurements)

# We throttle cloud storage download bandwidth and also the I/O of the
# cloud storage scheduling group. In debug mode, Seastar I/O throttling
# makes the system way slower than the target bandwidth which makes it
# hard to assert the bandwidth or test duration so we skip the test.
@cluster(num_nodes=4)
@matrix(cloud_storage_type=get_cloud_storage_type())
# @skip_debug_mode
@matrix(cloud_storage_type=get_cloud_storage_type(
docker_use_arbitrary=True))
def test_throttling(self, cloud_storage_type):

self.logger.info("Start producer")
Expand All @@ -1374,21 +1415,41 @@ def test_throttling(self, cloud_storage_type):
num_partitions = self.topics[0].partition_count
topic_name = self.topics[0].name
rpk.alter_topic_config(topic_name, 'retention.local.target.bytes',
0x1000)
4096)

for pix in range(0, num_partitions):
wait_for_local_storage_truncate(self.redpanda,
self.topic,
target_bytes=0x2000,
target_bytes=8192,
partition_idx=pix,
timeout_sec=30)

self.logger.info("Start consumer")
# A warm-up consume to make all future measurements fairer.
# This potentially warms up object storage. Potentially some local
# files which are not trimmed yet.
self.consume()

times_throttled = self.get_throttling_metric()
self.logger.info(f"Consumer was throttled {times_throttled} times")
assert times_throttled > 0, f"Expected consumer to be throttled, metric value: {times_throttled}"
unrestricted_bw = self.measure_consume_throughput()

# Limit bandwidth to half of the unrestricted throughput. The expected
# throughput depends on the number of CPU cores available and the
# number of partitions. In the best case scenario, we expect each
# partition to land on a separate core and the throughput limit to
# apply to each partition independently.
per_shard_bw_limit = int(unrestricted_bw / 2 / num_partitions)
expected_bw = per_shard_bw_limit * num_partitions * 1.1
self.logger.info(
f"Unrestricted bandwidth: {unrestricted_bw} bytes/sec. "
f"Configuring per shard limit to {per_shard_bw_limit} bytes/sec. "
f"Expecting throughput to be less than {expected_bw} bytes/sec.")

self.redpanda.set_cluster_config(
{'cloud_storage_max_throughput_per_shard': per_shard_bw_limit})

restricted_bw = self.measure_consume_throughput()
self.logger.info(f"Restricted bandwidth: {restricted_bw} bytes/sec")

assert restricted_bw < expected_bw, f"Expected {restricted_bw=} < {expected_bw=}"


class EndToEndHydrationTimeoutTest(EndToEndShadowIndexingBase):
Expand Down

0 comments on commit b9fcc7a

Please sign in to comment.