-
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
pathlib.py
304 lines (248 loc) · 9.07 KB
/
pathlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import atexit
import errno
import fnmatch
import itertools
import operator
import os
import shutil
import sys
import uuid
from os.path import expanduser
from os.path import expandvars
from os.path import isabs
from os.path import sep
from posixpath import sep as posix_sep
if sys.version_info[:2] >= (3, 6):
from pathlib import Path, PurePath
else:
from pathlib2 import Path, PurePath
__all__ = ["Path", "PurePath"]
LOCK_TIMEOUT = 60 * 60 * 3
get_lock_path = operator.methodcaller("joinpath", ".lock")
def ensure_reset_dir(path):
"""
ensures the given path is an empty directory
"""
if path.exists():
rmtree(path, force=True)
path.mkdir()
def rmtree(path, force=False):
if force:
# NOTE: ignore_errors might leave dead folders around.
# Python needs a rm -rf as a followup.
shutil.rmtree(str(path), ignore_errors=True)
else:
shutil.rmtree(str(path))
def find_prefixed(root, prefix):
"""finds all elements in root that begin with the prefix, case insensitive"""
l_prefix = prefix.lower()
for x in root.iterdir():
if x.name.lower().startswith(l_prefix):
yield x
def extract_suffixes(iter, prefix):
"""
:param iter: iterator over path names
:param prefix: expected prefix of the path names
:returns: the parts of the paths following the prefix
"""
p_len = len(prefix)
for p in iter:
yield p.name[p_len:]
def find_suffixes(root, prefix):
"""combines find_prefixes and extract_suffixes
"""
return extract_suffixes(find_prefixed(root, prefix), prefix)
def parse_num(maybe_num):
"""parses number path suffixes, returns -1 on error"""
try:
return int(maybe_num)
except ValueError:
return -1
def _force_symlink(root, target, link_to):
"""helper to create the current symlink
it's full of race conditions that are reasonably ok to ignore
for the context of best effort linking to the latest testrun
the presumption being thatin case of much parallelism
the inaccuracy is going to be acceptable
"""
current_symlink = root.joinpath(target)
try:
current_symlink.unlink()
except OSError:
pass
try:
current_symlink.symlink_to(link_to)
except Exception:
pass
def make_numbered_dir(root, prefix):
"""create a directory with an increased number as suffix for the given prefix"""
for i in range(10):
# try up to 10 times to create the folder
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
new_number = max_existing + 1
new_path = root.joinpath("{}{}".format(prefix, new_number))
try:
new_path.mkdir()
except Exception:
pass
else:
_force_symlink(root, prefix + "current", new_path)
return new_path
else:
raise EnvironmentError(
"could not create numbered dir with prefix "
"{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
)
def create_cleanup_lock(p):
"""crates a lock to prevent premature folder cleanup"""
lock_path = get_lock_path(p)
try:
fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
except OSError as e:
if e.errno == errno.EEXIST:
raise EnvironmentError(
"cannot create lockfile in {path}".format(path=p)
) from e
else:
raise
else:
pid = os.getpid()
spid = str(pid).encode()
os.write(fd, spid)
os.close(fd)
if not lock_path.is_file():
raise EnvironmentError("lock path got renamed after successful creation")
return lock_path
def register_cleanup_lock_removal(lock_path, register=atexit.register):
"""registers a cleanup function for removing a lock, by default on atexit"""
pid = os.getpid()
def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
current_pid = os.getpid()
if current_pid != original_pid:
# fork
return
try:
lock_path.unlink()
except (OSError, IOError):
pass
return register(cleanup_on_exit)
def maybe_delete_a_numbered_dir(path):
"""removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
lock_path = None
try:
lock_path = create_cleanup_lock(path)
parent = path.parent
garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
path.rename(garbage)
rmtree(garbage, force=True)
except (OSError, EnvironmentError):
# known races:
# * other process did a cleanup at the same time
# * deletable folder was found
# * process cwd (Windows)
return
finally:
# if we created the lock, ensure we remove it even if we failed
# to properly remove the numbered dir
if lock_path is not None:
try:
lock_path.unlink()
except (OSError, IOError):
pass
def ensure_deletable(path, consider_lock_dead_if_created_before):
"""checks if a lock exists and breaks it if its considered dead"""
if path.is_symlink():
return False
lock = get_lock_path(path)
if not lock.exists():
return True
try:
lock_time = lock.stat().st_mtime
except Exception:
return False
else:
if lock_time < consider_lock_dead_if_created_before:
lock.unlink()
return True
else:
return False
def try_cleanup(path, consider_lock_dead_if_created_before):
"""tries to cleanup a folder if we can ensure it's deletable"""
if ensure_deletable(path, consider_lock_dead_if_created_before):
maybe_delete_a_numbered_dir(path)
def cleanup_candidates(root, prefix, keep):
"""lists candidates for numbered directories to be removed - follows py.path"""
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
max_delete = max_existing - keep
paths = find_prefixed(root, prefix)
paths, paths2 = itertools.tee(paths)
numbers = map(parse_num, extract_suffixes(paths2, prefix))
for path, number in zip(paths, numbers):
if number <= max_delete:
yield path
def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
"""cleanup for lock driven numbered directories"""
for path in cleanup_candidates(root, prefix, keep):
try_cleanup(path, consider_lock_dead_if_created_before)
for path in root.glob("garbage-*"):
try_cleanup(path, consider_lock_dead_if_created_before)
def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
"""creates a numbered dir with a cleanup lock and removes old ones"""
e = None
for i in range(10):
try:
p = make_numbered_dir(root, prefix)
lock_path = create_cleanup_lock(p)
register_cleanup_lock_removal(lock_path)
except Exception as exc:
e = exc
else:
consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
cleanup_numbered_dir(
root=root,
prefix=prefix,
keep=keep,
consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
)
return p
assert e is not None
raise e
def resolve_from_str(input, root):
assert not isinstance(input, Path), "would break on py2"
root = Path(root)
input = expanduser(input)
input = expandvars(input)
if isabs(input):
return Path(input)
else:
return root.joinpath(input)
def fnmatch_ex(pattern, path):
"""FNMatcher port from py.path.common which works with PurePath() instances.
The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
for each part of the path, while this algorithm uses the whole path instead.
For example:
"tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
PurePath.match().
This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
this logic.
References:
* https://bugs.python.org/issue29249
* https://bugs.python.org/issue34731
"""
path = PurePath(path)
iswin32 = sys.platform.startswith("win")
if iswin32 and sep not in pattern and posix_sep in pattern:
# Running on Windows, the pattern has no Windows path separators,
# and the pattern has one or more Posix path separators. Replace
# the Posix path separators with the Windows path separator.
pattern = pattern.replace(posix_sep, sep)
if sep not in pattern:
name = path.name
else:
name = str(path)
if path.is_absolute() and not os.path.isabs(pattern):
pattern = "*{}{}".format(os.sep, pattern)
return fnmatch.fnmatch(name, pattern)
def parts(s):
parts = s.split(sep)
return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}