Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: actual e2e throttling test #21379

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/v/cloud_storage/materialized_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ get_throughput_limit(std::optional<size_t> device_throughput) {
}

auto tp = std::min(hard_limit, device_throughput.value());
constexpr auto tput_overshoot_frac = 16;
return {
.disk_node_throughput_limit = tp + (tp / tput_overshoot_frac),
.disk_node_throughput_limit = tp,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert "cloud_storage: Increse disk tput limit a bit"
This reverts commit 0ae2ad2.

Needs explanation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was previous attempt to fix this test by increasing the disk limit a bit to make sure that the net limit is used to throttle downloads

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvartolomei please make sure that you put context into the commit messages. not having to visit github to see that context is a powerful thing.

Copy link
Contributor Author

@nvartolomei nvartolomei Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

I did fix the commits as you asked:

But then I did a force push from a different directory to rebase on dev 🤦🤦🤦


I have copy & pasted the commit messages in the PR cover letter. Not sure I can do more than that now...

.download_shard_throughput_limit = tp / ss::smp::count,
};
}
Expand Down
95 changes: 76 additions & 19 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1303,28 +1303,25 @@ def test_spillover(self, cloud_storage_type):

nvartolomei marked this conversation as resolved.
Show resolved Hide resolved

class EndToEndThrottlingTest(RedpandaTest):
topics = (TopicSpec(partition_count=8,
topics = (TopicSpec(partition_count=3,
cleanup_policy=TopicSpec.CLEANUP_DELETE), )

def __init__(self, test_context):
si_settings = SISettings(
test_context,
log_segment_size=1024 * 1024,
log_segment_size=10 * 1024 * 1024,
fast_uploads=True,
# Set small throughput limit to trigger throttling
cloud_storage_max_throughput_per_shard=4 * 1024 * 1024)
)

super(EndToEndThrottlingTest, self).__init__(test_context=test_context,
si_settings=si_settings)

self.rpk = RpkTool(self.redpanda)
# 1.3GiB
self.msg_size = 1024 * 128
self.msg_count = 10000
self.admin = Admin(self.redpanda)

def get_throttling_metric(self):
return self.redpanda.metric_sum(
"vectorized_cloud_storage_read_path_downloads_throttled_sum_total")
# 1 GiB of data
self.msg_size = 1024 * 128
self.msg_count = 8196

def produce(self):
topic_name = self.topics[0].name
Expand All @@ -1349,20 +1346,60 @@ def consume(self):
consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
topic_name,
msg_size=self.msg_size,
loop=False,
debug_logs=True,
trace_logs=True)
consumer.start()

start = time.time()
consumer.start()
consumer.wait(timeout_sec=400)
consume_duration = time.time() - start

assert consumer.consumer_status.validator.invalid_reads == 0
assert consumer.consumer_status.validator.valid_reads >= self.msg_count

consumer.free()

return consume_duration

def measure_consume_throughput(self):
bw_measurements = []
for iter_ix in range(2):
# Trim cache.
self.redpanda.for_nodes(
self.redpanda.nodes, lambda n: self.admin.cloud_storage_trim(
byte_limit=0, object_limit=0, node=n))

# Restart redpanda to make sure it gave up on all the file handles.
self.redpanda.restart_nodes(self.redpanda.nodes)

# Wait all topics to have leadership.
wait_until(lambda: all(
self.admin.get_partition_leader(
namespace='kafka', topic=self.topic, partition=p) != -1
for p in range(self.topics[0].partition_count)),
timeout_sec=60,
backoff_sec=1)

# Measure throughput.
self.logger.info(f"Start consumer iteration {iter_ix}")
duration = self.consume()
bw = self.msg_count * self.msg_size / duration
self.logger.info(
f"Consumer took {duration} seconds. Measured throughput: {bw} bytes/sec"
)
bw_measurements.append(bw)

return sum(bw_measurements) / len(bw_measurements)

# We throttle cloud storage download bandwidth and also the I/O of the
# cloud storage scheduling group. In debug mode, Seastar I/O throttling
# makes the system way slower than the target bandwidth which makes it
# hard to assert the bandwidth or test duration so we skip the test.
@cluster(num_nodes=4)
@matrix(cloud_storage_type=get_cloud_storage_type())
# @skip_debug_mode
@matrix(cloud_storage_type=get_cloud_storage_type(
docker_use_arbitrary=True))
def test_throttling(self, cloud_storage_type):

self.logger.info("Start producer")
Expand All @@ -1374,21 +1411,41 @@ def test_throttling(self, cloud_storage_type):
num_partitions = self.topics[0].partition_count
topic_name = self.topics[0].name
rpk.alter_topic_config(topic_name, 'retention.local.target.bytes',
0x1000)
4096)

for pix in range(0, num_partitions):
wait_for_local_storage_truncate(self.redpanda,
self.topic,
target_bytes=0x2000,
target_bytes=8192,
partition_idx=pix,
timeout_sec=30)

self.logger.info("Start consumer")
# A warm-up consume to make all future measurements fairer.
# This potentially warms up object storage. Potentially some local
# files which are not trimmed yet.
self.consume()

times_throttled = self.get_throttling_metric()
self.logger.info(f"Consumer was throttled {times_throttled} times")
assert times_throttled > 0, f"Expected consumer to be throttled, metric value: {times_throttled}"
unrestricted_bw = self.measure_consume_throughput()

# Limit bandwidth to half of the unrestricted throughput. The expected
# throughput depends on the number of CPU cores available and the
# number of partitions. In the best case scenario, we expect each
# partition to land on a separate core and the throughput limit to
# apply to each partition independently.
per_shard_bw_limit = int(unrestricted_bw / 2 / num_partitions)
expected_bw = per_shard_bw_limit * num_partitions * 1.1
self.logger.info(
f"Unrestricted bandwidth: {unrestricted_bw} bytes/sec. "
f"Configuring per shard limit to {per_shard_bw_limit} bytes/sec. "
f"Expecting throughput to be less than {expected_bw} bytes/sec.")

self.redpanda.set_cluster_config(
{'cloud_storage_max_throughput_per_shard': per_shard_bw_limit})

restricted_bw = self.measure_consume_throughput()
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info(f"Restricted bandwidth: {restricted_bw} bytes/sec")

assert restricted_bw < expected_bw, f"Expected {restricted_bw=} < {expected_bw=}"


class EndToEndHydrationTimeoutTest(EndToEndShadowIndexingBase):
Expand Down
Loading