forked from aio-libs/aioftp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconftest.py
211 lines (171 loc) · 6.23 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import ssl
import collections
import contextlib
import tempfile
import asyncio
import math
import time
import functools
import socket
from pathlib import Path
import pytest
import pytest_asyncio
import trustme
from async_timeout import timeout
import aioftp
from siosocks.io.asyncio import socks_server_handler
# No ssl tests since https://bugs.python.org/issue36098
ca = trustme.CA()
server_cert = ca.issue_server_cert("127.0.0.1", "::1")
ssl_server = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_cert.configure_cert(ssl_server)
ssl_client = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ca.configure_trust(ssl_client)
class Container:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
@pytest.fixture
def Client():
return Container
@pytest.fixture
def Server():
return Container
def _wrap_with_defaults(kwargs):
test_defaults = dict(
path_io_factory=aioftp.MemoryPathIO,
)
return collections.ChainMap(kwargs, test_defaults)
@pytest.fixture(params=["127.0.0.1", "::1"])
def pair_factory(request):
class Factory:
def __init__(self, client=None, server=None, *,
connected=True, logged=True, do_quit=True,
host=request.param,
server_factory=aioftp.Server,
client_factory=aioftp.Client):
if client is None:
client = Container()
self.client = client_factory(*client.args,
**_wrap_with_defaults(client.kwargs))
if server is None:
server = Container()
self.server = server_factory(*server.args,
**_wrap_with_defaults(server.kwargs))
self.connected = connected
self.logged = logged
self.do_quit = do_quit
self.host = host
self.timeout = timeout(1)
async def make_server_files(self, *paths, size=None, atom=b"-"):
if size is None:
size = aioftp.DEFAULT_BLOCK_SIZE * 3
data = atom * size
for p in paths:
await self.client.make_directory(Path(p).parent)
async with self.client.upload_stream(p) as stream:
await stream.write(data)
async def make_client_files(self, *paths, size=None, atom=b"-"):
if size is None:
size = aioftp.DEFAULT_BLOCK_SIZE * 3
data = atom * size
for p in map(Path, paths):
await self.client.path_io.mkdir(p.parent, parents=True,
exist_ok=True)
async with self.client.path_io.open(p, mode="wb") as f:
await f.write(data)
async def server_paths_exists(self, *paths):
values = []
for p in paths:
values.append(await self.client.exists(p))
if all(values):
return True
if any(values):
raise ValueError("Mixed exists/not exists list")
return False
async def client_paths_exists(self, *paths):
values = []
for p in paths:
values.append(await self.client.path_io.exists(Path(p)))
if all(values):
return True
if any(values):
raise ValueError("Mixed exists/not exists list")
return False
async def __aenter__(self):
await self.timeout.__aenter__()
await self.server.start(host=self.host)
if self.connected:
await self.client.connect(self.server.server_host,
self.server.server_port)
if self.logged:
await self.client.login()
return self
async def __aexit__(self, *exc_info):
if self.connected and self.do_quit:
await self.client.quit()
self.client.close()
await self.server.close()
await self.timeout.__aexit__(*exc_info)
return Factory
@pytest.fixture
def expect_codes_in_exception():
@contextlib.contextmanager
def context(*codes):
try:
yield
except aioftp.StatusCodeError as e:
assert set(e.received_codes) == set(codes)
else:
raise RuntimeError("There was no exception")
return context
@pytest.fixture(params=[aioftp.MemoryPathIO, aioftp.PathIO,
aioftp.AsyncPathIO])
def path_io(request):
return request.param()
@pytest.fixture
def temp_dir(path_io):
if isinstance(path_io, aioftp.MemoryPathIO):
yield Path("/")
else:
with tempfile.TemporaryDirectory() as name:
yield Path(name)
class Sleep:
def __init__(self):
self.delay = 0
self.first_sleep = None
async def sleep(self, delay, result=None, **kwargs):
if self.first_sleep is None:
self.first_sleep = time.monotonic()
delay = (time.monotonic() - self.first_sleep) + delay
self.delay = max(self.delay, delay)
return result
def is_close(self, delay, *, rel_tol=0.05, abs_tol=0.5):
ok = math.isclose(self.delay, delay, rel_tol=rel_tol, abs_tol=abs_tol)
if not ok:
print(f"latest sleep: {self.delay}; expected delay: "
f"{delay}; rel: {rel_tol}")
return ok
@pytest.fixture
def skip_sleep(monkeypatch):
with monkeypatch.context() as m:
sleeper = Sleep()
m.setattr(asyncio, "sleep", sleeper.sleep)
yield sleeper
@pytest_asyncio.fixture(params=[("127.0.0.1", socket.AF_INET),
("::1", socket.AF_INET6)])
async def socks(request, unused_tcp_port):
handler = functools.partial(
socks_server_handler,
allowed_versions={5},
username="foo",
password="bar",
)
Socks = collections.namedtuple("Socks", "host port server")
host, family = request.param
port = unused_tcp_port
server = await asyncio.start_server(handler, host=host, port=port,
family=family)
yield Socks(host, port, server)
server.close()
await server.wait_closed()