forked from aio-libs/aioftp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_throttle.py
73 lines (63 loc) · 2.72 KB
/
test_throttle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
from functools import reduce
from pathlib import Path
import pytest
import aioftp
@pytest.mark.asyncio
async def test_patched_sleep(skip_sleep):
await asyncio.sleep(10)
assert skip_sleep.is_close(10)
SIZE = 3 * 100 * 1024 # 300KiB
@pytest.mark.parametrize("times", [10, 20, 30])
@pytest.mark.parametrize("type", ["read", "write"])
@pytest.mark.parametrize("direction", ["download", "upload"])
@pytest.mark.asyncio
async def test_client_side_throttle(pair_factory, skip_sleep, times, type,
direction):
async with pair_factory() as pair:
await pair.make_server_files("foo", size=SIZE)
await pair.make_client_files("foo", size=SIZE)
getattr(pair.client.throttle, type).limit = SIZE / times
await getattr(pair.client, direction)("foo")
if (type, direction) in {("read", "download"), ("write", "upload")}:
assert skip_sleep.is_close(times)
else:
assert skip_sleep.is_close(0)
@pytest.mark.parametrize("times", [10, 20, 30])
@pytest.mark.parametrize("users", [1, 2, 3])
@pytest.mark.parametrize("throttle_direction", ["read", "write"])
@pytest.mark.parametrize("data_direction", ["download", "upload"])
@pytest.mark.parametrize("throttle_level", ["throttle",
"throttle_per_connection"])
@pytest.mark.asyncio
async def test_server_side_throttle(pair_factory, skip_sleep, times, users,
throttle_direction, data_direction,
throttle_level):
async with pair_factory() as pair:
names = []
for i in range(users):
name = f"foo{i}"
names.append(name)
await pair.make_server_files(name, size=SIZE)
throttle = reduce(getattr, [throttle_level, throttle_direction],
pair.server)
throttle.limit = SIZE / times
clients = []
for name in names:
c = aioftp.Client(path_io_factory=aioftp.MemoryPathIO)
async with c.path_io.open(Path(name), "wb") as f:
await f.write(b"-" * SIZE)
await c.connect(pair.server.server_host, pair.server.server_port)
await c.login()
clients.append(c)
coros = [getattr(c, data_direction)(n) for c, n in zip(clients, names)]
await asyncio.gather(*coros)
await asyncio.gather(*[c.quit() for c in clients])
throttled = {("read", "upload"), ("write", "download")}
if (throttle_direction, data_direction) not in throttled:
assert skip_sleep.is_close(0)
else:
t = times
if throttle_level == "throttle": # global
t *= users
assert skip_sleep.is_close(t)