-
Notifications
You must be signed in to change notification settings - Fork 2
/
fileutils.py
1900 lines (1729 loc) · 59.1 KB
/
fileutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
#
# Assorted convenience functions for files and filenames/pathnames.
# - Cameron Simpson <cs@cskk.id.au>
''' My grab bag of convenience functions for files and filenames/pathnames.
'''
# pylint: disable=too-many-lines
from __future__ import with_statement, print_function, absolute_import
from contextlib import contextmanager
import errno
from functools import partial
import gzip
import os
from os import SEEK_CUR, SEEK_END, SEEK_SET, O_RDONLY, read
try:
from os import pread
except ImportError:
pread = None
from os.path import (
abspath,
basename,
dirname,
exists as existspath,
isabs as isabspath,
isdir,
join as joinpath,
splitext,
)
import shutil
import stat
import sys
from tempfile import TemporaryFile, NamedTemporaryFile, mkstemp
from threading import Lock, RLock
import time
from cs.buffer import CornuCopyBuffer
from cs.context import stackattrs
from cs.deco import cachedmethod, decorator, fmtdoc, OBSOLETE, strable
from cs.filestate import FileState
from cs.fs import shortpath
from cs.gimmicks import TimeoutError # pylint: disable=redefined-builtin
from cs.lex import as_lines, cutsuffix, common_prefix
from cs.logutils import error, warning, debug
from cs.pfx import Pfx, pfx, pfx_call
from cs.progress import Progress, progressbar
from cs.py3 import ustr, bytes, pread # pylint: disable=redefined-builtin
from cs.range import Range
from cs.resources import RunState, uses_runstate
from cs.result import CancellationError
from cs.threads import locked
from cs.units import BINARY_BYTES_SCALE
__version__ = '20241122-post'
DISTINFO = {
'keywords': ["python2", "python3"],
'classifiers': [
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
],
'install_requires': [
'cs.buffer',
'cs.context',
'cs.deco',
'cs.filestate',
'cs.fs>=shortpath',
'cs.gimmicks>=TimeoutError',
'cs.lex>=20200914',
'cs.logutils',
'cs.pfx>=pfx_call',
'cs.progress',
'cs.py3',
'cs.range',
'cs.resources',
'cs.result',
'cs.threads',
'cs.units',
],
}
DEFAULT_POLL_INTERVAL = 1.0
DEFAULT_READSIZE = 131072
DEFAULT_TAIL_PAUSE = 0.25
def seekable(fp):
''' Try to test whether a filelike object is seekable.
First try the `IOBase.seekable` method, otherwise try getting a file
descriptor from `fp.fileno` and `os.stat()`ing that,
otherwise return `False`.
'''
try:
test = fp.seekable
except AttributeError:
try:
getfd = fp.fileno
except AttributeError:
return False
test = lambda: stat.S_ISREG(os.fstat(getfd()).st_mode)
return test()
def rename_excl(oldpath, newpath):
''' Safely rRename `oldpath` to `newpath`.
Raise `FileExistsError` if `newpath` already exists.
'''
with pfx_call(open, newpath, 'xb'):
pass
pfx_call(os.rename, oldpath, newpath)
@OBSOLETE("rename_excl")
def saferename(oldpath, newpath):
''' Rename a path using `os.rename()`,
but raise an exception if the target path already exists.
Note: slightly racey.
'''
try:
os.lstat(newpath)
raise OSError(errno.EEXIST)
except OSError as e:
if e.errno != errno.ENOENT:
raise
os.rename(oldpath, newpath)
def trysaferename(oldpath, newpath):
''' A `saferename()` that returns `True` on success,
`False` on failure.
'''
try:
saferename(oldpath, newpath)
except OSError:
return False
##except Exception:
## raise
return True
def compare(f1, f2, mode="rb"):
''' Compare the contents of two file-like objects `f1` and `f2` for equality.
If `f1` or `f2` is a string, open the named file using `mode`
(default: `"rb"`).
'''
if isinstance(f1, str):
with open(f1, mode) as f1fp:
return compare(f1fp, f2, mode)
if isinstance(f2, str):
with open(f2, mode) as f2fp:
return compare(f1, f2fp, mode)
return f1.read() == f2.read()
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
@contextmanager
def NamedTemporaryCopy(f, progress=False, progress_label=None, **nt_kw):
''' A context manager yielding a temporary copy of `filename`
as returned by `NamedTemporaryFile(**nt_kw)`.
Parameters:
* `f`: the name of the file to copy, or an open binary file,
or a `CornuCopyBuffer`
* `progress`: an optional progress indicator, default `False`;
if a `bool`, show a progress bar for the copy phase if true;
if an `int`, show a progress bar for the copy phase
if the file size equals or exceeds the value;
otherwise it should be a `cs.progress.Progress` instance
* `progress_label`: option progress bar label,
only used if a progress bar is made
Other keyword parameters are passed to `tempfile.NamedTemporaryFile`.
'''
if isinstance(f, str):
# copy named file
filename = f
progress_label = (
"copy " + repr(filename) if progress_label is None else progress_label
)
# should we use shutil.copy() and display no progress?
if progress is False:
fast_mode = True
else:
with Pfx("stat(%r)", filename):
S = os.stat(filename)
fast_mode = stat.S_ISREG(S.st_mode)
if fast_mode:
with NamedTemporaryFile(**nt_kw) as T:
with Pfx("shutil.copy(%r,%r)", filename, T.name):
shutil.copy(filename, T.name)
yield T
else:
with Pfx("open(%r)", filename):
with open(filename, 'rb') as f2:
with NamedTemporaryCopy(f2, progress=progress,
progress_label=progress_label, **nt_kw) as T:
yield T
return
prefix = nt_kw.pop('prefix', None)
if prefix is None:
prefix = 'NamedTemporaryCopy'
# prepare the buffer and try to infer the length
if isinstance(f, CornuCopyBuffer):
length = None
bfr = f
else:
if isinstance(f, int):
fd = f
bfr = CornuCopyBuffer.from_fd(fd)
else:
bfr = CornuCopyBuffer.from_file(f)
try:
fd = f.fileno()
except AttributeError:
fd = None
if fd is None:
length = None
else:
S = os.fstat(fd)
length = S.st_size if stat.S_ISREG(S.st_mode) else None
# determine whether we need a progress bar
if isinstance(progress, bool):
need_bar = progress
progress = None
elif isinstance(progress, int):
need_bar = length is None or length >= progress
progress = None
else:
need_bar = False
assert isinstance(progress, Progress)
with NamedTemporaryFile(prefix=prefix, **nt_kw) as T:
it = (
bfr if need_bar else progressbar(
bfr,
label=progress_label,
total=length,
itemlenfunc=len,
units_scale=BINARY_BYTES_SCALE,
)
)
nbs = 0
for bs in it:
while bs:
nwritten = T.write(bs)
if progress is not None:
progress += nwritten
if nwritten != len(bs):
warning(
"NamedTemporaryCopy: %r.write(%d bytes) => %d",
T.name,
len(bs),
nwritten,
)
bs = bs[nwritten:]
else:
bs = b''
nbs += nwritten
bfr.close()
T.flush()
if length is not None and nbs != length:
warning(
"NamedTemporaryCopy: given length=%s, wrote %d bytes to %r",
length,
nbs,
T.name,
)
yield T
# pylint: disable=too-many-arguments
def rewrite(
filepath,
srcf,
mode='w',
backup_ext=None,
do_rename=False,
do_diff=None,
empty_ok=False,
overwrite_anyway=False
):
''' Rewrite the file `filepath` with data from the file object `srcf`.
Return `True` if the content was changed, `False` if unchanged.
Parameters:
* `filepath`: the name of the file to rewrite.
* `srcf`: the source file containing the new content.
* `mode`: the write-mode for the file, default `'w'` (for text);
use `'wb'` for binary data.
* `empty_ok`: if true (default `False`),
do not raise `ValueError` if the new data are empty.
* `overwrite_anyway`: if true (default `False`),
skip the content check and overwrite unconditionally.
* `backup_ext`: if a nonempty string,
take a backup of the original at `filepath + backup_ext`.
* `do_diff`: if not `None`, call `do_diff(filepath,tempfile)`.
* `do_rename`: if true (default `False`),
rename the temp file to `filepath`
after copying the permission bits.
Otherwise (default), copy the tempfile to `filepath`;
this preserves the file's inode and permissions etc.
'''
with Pfx("rewrite(%r)", filepath):
with NamedTemporaryFile(dir=dirname(filepath), mode=mode) as T:
T.write(srcf.read())
T.flush()
if not empty_ok:
st = os.stat(T.name)
if st.st_size == 0:
raise ValueError("no data in temp file")
if do_diff or not overwrite_anyway:
# need to compare data
if compare(T.name, filepath):
# data the same, do nothing
return False
if do_diff:
# call the supplied differ
do_diff(filepath, T.name)
if do_rename:
# rename new file into old path
# tries to preserve perms, but does nothing for other metadata
shutil.copymode(filepath, T.name)
if backup_ext:
os.link(filepath, filepath + backup_ext)
os.rename(T.name, filepath)
else:
# overwrite old file - preserves perms, ownership, hard links
if backup_ext:
shutil.copy2(filepath, filepath + backup_ext)
shutil.copyfile(T.name, filepath)
return True
@contextmanager
def rewrite_cmgr(filepath, mode='w', **kw):
''' Rewrite a file, presented as a context manager.
Parameters:
* `mode`: file write mode, defaulting to "w" for text.
Other keyword parameters are passed to `rewrite()`.
Example:
with rewrite_cmgr(pathname, do_rename=True) as f:
... write new content to f ...
'''
with NamedTemporaryFile(mode=mode) as T:
yield T
T.flush()
with open(T.name, 'rb') as f:
return rewrite(filepath, mode='wb', srcf=f, **kw)
def abspath_from_file(path, from_file):
''' Return the absolute path of `path` with respect to `from_file`,
as one might do for an include file.
'''
if not isabspath(path):
if not isabspath(from_file):
from_file = abspath(from_file)
path = joinpath(dirname(from_file), path)
return path
def poll_file(path, old_state, reload_file, missing_ok=False):
''' Watch a file for modification by polling its state as obtained
by `FileState()`.
Call `reload_file(path)` if the state changes.
Return `(new_state,reload_file(path))` if the file was modified
and was unchanged (stable state) before and after the reload_file().
Otherwise return `(None,None)`.
This may raise an `OSError` if the `path` cannot be `os.stat()`ed
and of course for any exceptions that occur calling `reload_file`.
If `missing_ok` is true then a failure to `os.stat()` which
raises `OSError` with `ENOENT` will just return `(None,None)`.
'''
try:
new_state = FileState(path)
except OSError as e:
if e.errno == errno.ENOENT:
if missing_ok:
return None, None
raise
if old_state is None or old_state != new_state:
# first stat or changed stat
R = reload_file(path)
try:
new_new_state = FileState(path)
except OSError as e:
if e.errno == errno.ENOENT:
if missing_ok:
return None, None
raise
# make sure file was unchanged
if new_new_state == new_state:
return new_state, R
return None, None
@decorator
def file_based(
func,
attr_name=None,
filename=None,
poll_delay=None,
sig_func=None,
**dkw
):
''' A decorator which caches a value obtained from a file.
In addition to all the keyword arguments for `@cs.deco.cachedmethod`,
this decorator also accepts the following arguments:
* `attr_name`: the name for the associated attribute, used as
the basis for the internal cache value attribute
* `filename`: the filename to monitor.
Default from the `._{attr_name}__filename` attribute.
This value will be passed to the method as the `filename` keyword
parameter.
* `poll_delay`: delay between file polls, default `DEFAULT_POLL_INTERVAL`.
* `sig_func`: signature function used to encapsulate the relevant
information about the file; default
cs.filestate.FileState({filename}).
If the decorated function raises OSError with errno == ENOENT,
this returns None. Other exceptions are reraised.
'''
if attr_name is None:
attr_name = func.__name__
filename_attr = '_' + attr_name + '__filename'
filename0 = filename
if poll_delay is None:
poll_delay = DEFAULT_POLL_INTERVAL
sig_func = dkw.pop('sig_func', None)
if sig_func is None:
def sig_func(self):
''' The default signature function: `FileState(filename,missing_ok=True)`.
'''
filename = filename0
if filename is None:
filename = getattr(self, filename_attr)
return FileState(filename, missing_ok=True)
def wrap0(self, *a, **kw):
''' Inner wrapper for `func`.
'''
filename = kw.pop('filename', None)
if filename is None:
if filename0 is None:
filename = getattr(self, filename_attr)
else:
filename = filename0
kw['filename'] = filename
try:
return func(self, *a, **kw)
except OSError as e:
if e.errno == errno.ENOENT:
return None
raise
dkw['attr_name'] = attr_name
dkw['poll_delay'] = poll_delay
dkw['sig_func'] = sig_func
return cachedmethod(**dkw)(wrap0)
@decorator
def file_property(func, **dkw):
''' A property whose value reloads if a file changes.
'''
return property(file_based(func, **dkw))
def files_property(func):
''' A property whose value reloads if any of a list of files changes.
Note: this is just the default mode for `make_files_property`.
`func` accepts the file path and returns the new value.
The underlying attribute name is `'_'+func.__name__`,
the default from `make_files_property()`.
The attribute *{attr_name}*`_lock` is a mutex controlling access to the property.
The attributes *{attr_name}*`_filestates` and *{attr_name}*`_paths` track the
associated file states.
The attribute *{attr_name}*`_lastpoll` tracks the last poll time.
The decorated function is passed the current list of files
and returns the new list of files and the associated value.
One example use would be a configuration file with recurive
include operations; the inner function would parse the first
file in the list, and the parse would accumulate this filename
and those of any included files so that they can be monitored,
triggering a fresh parse if one changes.
Example:
class C(object):
def __init__(self):
self._foo_path = '.foorc'
@files_property
def foo(self,paths):
new_paths, result = parse(paths[0])
return new_paths, result
The load function is called on the first access and on every
access thereafter where an associated file's `FileState` has
changed and the time since the last successful load exceeds
the poll_rate (1s). An attempt at avoiding races is made by
ignoring reloads that raise exceptions and ignoring reloads
where files that were stat()ed during the change check have
changed state after the load.
'''
return make_files_property()(func)
# pylint: disable=too-many-statements
@fmtdoc
def make_files_property(
attr_name=None, unset_object=None, poll_rate=DEFAULT_POLL_INTERVAL
):
''' Construct a decorator that watches multiple associated files.
Parameters:
* `attr_name`: the underlying attribute, default: `'_'+func.__name__`
* `unset_object`: the sentinel value for "uninitialised", default: `None`
* `poll_rate`: how often in seconds to poll the file for changes,
default from `DEFAULT_POLL_INTERVAL`: `{DEFAULT_POLL_INTERVAL}`
The attribute *attr_name*`_lock` controls access to the property.
The attributes *attr_name*`_filestates` and *attr_name*`_paths` track the
associated files' state.
The attribute *attr_name*`_lastpoll` tracks the last poll time.
The decorated function is passed the current list of files
and returns the new list of files and the associated value.
One example use would be a configuration file with recursive
include operations; the inner function would parse the first
file in the list, and the parse would accumulate this filename
and those of any included files so that they can be monitored,
triggering a fresh parse if one changes.
Example:
class C(object):
def __init__(self):
self._foo_path = '.foorc'
@files_property
def foo(self,paths):
new_paths, result = parse(paths[0])
return new_paths, result
The load function is called on the first access and on every
access thereafter where an associated file's `FileState` has
changed and the time since the last successful load exceeds
the `poll_rate`.
An attempt at avoiding races is made by
ignoring reloads that raise exceptions and ignoring reloads
where files that were `os.stat()`ed during the change check have
changed state after the load.
'''
# pylint: disable=too-many-statements
def made_files_property(func):
if attr_name is None:
attr_value = '_' + func.__name__
else:
attr_value = attr_name
attr_lock = attr_value + '_lock'
attr_filestates = attr_value + '_filestates'
attr_paths = attr_value + '_paths'
attr_lastpoll = attr_value + '_lastpoll'
# pylint: disable=too-many-statements,too-many-branches
def getprop(self):
''' Try to reload the property value from the file if the property value
is stale and the file has been modified since the last reload.
'''
with getattr(self, attr_lock):
now = time.time()
then = getattr(self, attr_lastpoll, None)
if then is None or then + poll_rate <= now:
setattr(self, attr_lastpoll, now)
old_paths = getattr(self, attr_paths)
old_filestates = getattr(self, attr_filestates, None)
preload_filestate_map = {}
if old_filestates is None:
changed = True
else:
changed = False
# Instead of breaking out of the loop below on the first change
# found we actually stat every file path because we want to
# maximise the coverage of the stability check after the load.
for path, old_filestate in zip(old_paths, old_filestates):
try:
new_filestate = FileState(path)
except OSError:
changed = True
else:
preload_filestate_map[path] = new_filestate
if old_filestate != new_filestate:
changed = True
if changed:
try:
new_paths, new_value = func(self, old_paths)
new_filestates = [FileState(new_path) for new_path in new_paths]
except NameError:
raise
except AttributeError:
raise
except Exception as e: # pylint: disable=broad-except
new_value = getattr(self, attr_value, unset_object)
if new_value is unset_object:
raise
debug(
"exception reloading .%s, keeping cached value: %s",
attr_value, e
)
else:
# examine new filestates in case they changed during load
# _if_ we knew about them from the earlier load
stable = True
for path, new_filestate in zip(new_paths, new_filestates):
if path in preload_filestate_map:
if preload_filestate_map[path] != new_filestate:
stable = False
break
if stable:
setattr(self, attr_value, new_value)
setattr(self, attr_paths, new_paths)
setattr(self, attr_filestates, new_filestates)
return getattr(self, attr_value, unset_object)
return property(getprop)
return made_files_property
# pylint: disable=too-many-branches
@uses_runstate
@pfx
def makelockfile(
path,
*,
ext=None,
poll_interval=None,
timeout=None,
runstate: RunState,
keepopen=False,
max_interval=37,
):
''' Create a lockfile and return its path.
The lockfile can be removed with `os.remove`.
This is the core functionality supporting the `lockfile()`
context manager.
Parameters:
* `path`: the base associated with the lock file,
often the filesystem object whose access is being managed.
* `ext`: the extension to the base used to construct the lockfile name.
Default: ".lock"
* `timeout`: maximum time to wait before failing.
Default: `None` (wait forever).
Note that zero is an accepted value
and requires the lock to succeed on the first attempt.
* `poll_interval`: polling frequency when timeout is not 0.
* `runstate`: optional `RunState` duck instance supporting cancellation.
Note that if a cancelled `RunState` is provided
no attempt will be made to make the lockfile.
* `keepopen`: optional flag, default `False`:
if true, do not close the lockfile and return `(lockpath,lockfd)`
being the lock file path and the open file descriptor
'''
if poll_interval is None:
poll_interval = DEFAULT_POLL_INTERVAL
if ext is None:
ext = '.lock'
if timeout is not None and timeout < 0:
raise ValueError("timeout should be None or >= 0, not %r" % (timeout,))
start = None
lockpath = path + ext
with Pfx("makelockfile: %r", lockpath):
while True:
if runstate.cancelled:
warning(
"%s cancelled; pid %d waited %ds", runstate, os.getpid(),
0 if start is None else time.time() - start
)
raise CancellationError("lock acquisition cancelled")
try:
lockfd = os.open(lockpath, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0)
except OSError as e:
if e.errno != errno.EEXIST:
raise
if timeout is not None and timeout <= 0:
# immediate failure
# pylint: disable=raise-missing-from
raise TimeoutError("pid %d timed out" % (os.getpid(),), timeout)
now = time.time()
# post: timeout is None or timeout > 0
if start is None:
# first try - set up counters
start = now
complaint_last = start
complaint_interval = 2 * max(DEFAULT_POLL_INTERVAL, poll_interval)
elif now - complaint_last >= complaint_interval:
warning("pid %d waited %ds", os.getpid(), now - start)
complaint_last = now
complaint_interval = min(complaint_interval * 2, max_interval)
# post: start is set
if timeout is None:
sleep_for = poll_interval
else:
sleep_for = min(poll_interval, start + timeout - now)
# test for timeout
if sleep_for <= 0:
# pylint: disable=raise-missing-from
raise TimeoutError("pid %d timed out" % (os.getpid(),), timeout)
time.sleep(sleep_for)
continue
else:
break
if keepopen:
return lockpath, lockfd
os.close(lockfd)
return lockpath
@contextmanager
def lockfile(path, **lock_kw):
''' A context manager which takes and holds a lock file.
An open file descriptor is kept for the lock file as well
to aid locating the process holding the lock file using eg `lsof`.
This is just a context manager shim for `makelockfile`
and all arguments are plumbed through.
'''
lockpath, lockfd = makelockfile(path, keepopen=True, **lock_kw)
try:
yield lockpath
finally:
try:
pfx_call(os.remove, lockpath)
except FileNotFoundError as e:
warning("lock file already removed: %s", e)
pfx_call(os.close, lockfd)
def crop_name(name, ext=None, name_max=255):
''' Crop a file basename so as not to exceed `name_max` in length.
Return the original `name` if it already short enough.
Otherwise crop `name` before the file extension
to make it short enough.
Parameters:
* `name`: the file basename to crop
* `ext`: optional file extension;
the default is to infer the extension with `os.path.splitext`.
* `name_max`: optional maximum length, default: `255`
'''
if ext is None:
base, ext = splitext(name)
else:
base = cutsuffix(name, ext)
if base is name:
base, ext = splitext(name)
max_base_len = name_max - len(ext)
if max_base_len < 0:
raise ValueError(
"cannot crop name before ext %r to <=%s: name=%r" %
(ext, name_max, name)
)
if len(base) <= max_base_len:
return name
return base[:max_base_len] + ext
def max_suffix(dirpath, prefix):
''' Compute the highest existing numeric suffix
for names starting with `prefix`.
This is generally used as a starting point for picking
a new numeric suffix.
'''
prefix = ustr(prefix)
maxn = None
pfxlen = len(prefix)
for e in os.listdir(dirpath):
e = ustr(e)
if len(e) <= pfxlen or not e.startswith(prefix):
continue
tail = e[pfxlen:]
if tail.isdigit():
n = int(tail)
if maxn is None:
maxn = n
elif maxn < n:
maxn = n
return maxn
# pylint: disable=too-many-branches
def mkdirn(path, sep=''):
''' Create a new directory named `path+sep+n`,
where `n` exceeds any name already present.
Parameters:
* `path`: the basic directory path.
* `sep`: a separator between `path` and `n`.
Default: `''`
'''
with Pfx("mkdirn(path=%r, sep=%r)", path, sep):
if os.sep in sep:
raise ValueError("sep contains os.sep (%r)" % (os.sep,))
opath = path
if not path:
path = '.' + os.sep
if path.endswith(os.sep):
if sep:
raise ValueError(
"mkdirn(path=%r, sep=%r): using non-empty sep"
" with a trailing %r seems nonsensical" % (path, sep, os.sep)
)
dirpath = path[:-len(os.sep)]
prefix = ''
else:
dirpath = dirname(path)
if not dirpath:
dirpath = '.'
prefix = basename(path) + sep
if not isdir(dirpath):
error("parent not a directory: %r", dirpath)
return None
# do a quick scan of the directory to find
# if any names of the desired form already exist
# in order to start after them
maxn = max_suffix(dirpath, prefix)
if maxn is None:
newn = 0
else:
newn = maxn
while True:
newn += 1
newpath = path + sep + str(newn)
try:
os.mkdir(newpath)
except OSError as e:
if e.errno == errno.EEXIST:
# taken, try new value
continue
error("mkdir(%s): %s", newpath, e)
return None
if not opath:
newpath = basename(newpath)
return newpath
def tmpdir():
''' Return the pathname of the default temporary directory for scratch data,
the environment variable `$TMPDIR` or `'/tmp'`.
'''
return os.environ.get('TMPDIR', '/tmp')
def tmpdirn(tmp=None):
''' Make a new temporary directory with a numeric suffix.
'''
if tmp is None:
tmp = tmpdir()
return mkdirn(joinpath(tmp, basename(sys.argv[0])))
def find(path, select=None, sort_names=True):
''' Walk a directory tree `path`
yielding selected paths.
Note: not selecting a directory prunes all its descendants.
'''
if select is None:
select = lambda _: True
for dirpath, dirnames, filenames in os.walk(path):
if select(dirpath):
yield dirpath
else:
dirnames[:] = []
continue
if sort_names:
dirnames[:] = sorted(dirnames)
filenames[:] = sorted(filenames)
for filename in filenames:
filepath = joinpath(dirpath, filename)
if select(filepath):
yield filepath
dirnames[:] = [
dirname for dirname in dirnames if select(joinpath(dirpath, dirname))
]
def findup(path, test, first=False):
''' Test the pathname `abspath(path)` and each of its ancestors
against the callable `test`,
yielding paths satisfying the test.
If `first` is true (default `False`)
this function always yields exactly one value,
either the first path satisfying the test or `None`.
This mode supports a use such as:
matched_path = next(findup(path, test, first=True))
# post condition: matched_path will be `None` on no match
# otherwise the first matching path
'''
path = abspath(path)
while True:
if test(path):
yield path
if first:
return
up = dirname(path)
if up == path:
break
path = up
if first:
yield None
def common_path_prefix(*paths):
''' Return the common path prefix of the `paths`.
Note that the common prefix of `'/a/b/c1'` and `'/a/b/c2'`
is `'/a/b/'`, _not_ `'/a/b/c'`.
Callers may find it useful to preadjust the supplied paths
with `normpath`, `abspath` or `realpath` from `os.path`;
see the `os.path` documentation for the various caveats
which go with those functions.
Examples:
>>> # the obvious
>>> common_path_prefix('', '')
''
>>> common_path_prefix('/', '/')
'/'
>>> common_path_prefix('a', 'a')
'a'
>>> common_path_prefix('a', 'b')
''
>>> # nonempty directory path prefixes end in os.sep
>>> common_path_prefix('/', '/a')
'/'
>>> # identical paths include the final basename
>>> common_path_prefix('p/a', 'p/a')
'p/a'
>>> # the comparison does not normalise paths
>>> common_path_prefix('p//a', 'p//a')
'p//a'
>>> common_path_prefix('p//a', 'p//b')
'p//'
>>> common_path_prefix('p//a', 'p/a')
'p/'
>>> common_path_prefix('p/a', 'p/b')
'p/'
>>> # the comparison strips complete unequal path components
>>> common_path_prefix('p/a1', 'p/a2')
'p/'
>>> common_path_prefix('p/a/b1', 'p/a/b2')
'p/a/'
>>> # contrast with cs.lex.common_prefix
>>> common_prefix('abc/def', 'abc/def1')
'abc/def'
>>> common_path_prefix('abc/def', 'abc/def1')
'abc/'
>>> common_prefix('abc/def', 'abc/def1', 'abc/def2')
'abc/def'
>>> common_path_prefix('abc/def', 'abc/def1', 'abc/def2')
'abc/'
'''
prefix = common_prefix(*paths)
if not prefix.endswith(os.sep):
path0 = paths[0]
if not all(map(lambda path: path == path0, paths)):
# strip basename from prefix
base = basename(prefix)
prefix = prefix[:-len(base)]
return prefix
class Pathname(str):
''' Subclass of str presenting convenience properties useful for
format strings related to file paths.
'''
_default_prefixes = (('$HOME/', '~/'),)
def __format__(self, fmt_spec):
''' Calling format(<Pathname>, fmt_spec) treat `fmt_spec` as a new style
formatting string with a single positional parameter of `self`.
'''
if fmt_spec == '':
return str(self)
return fmt_spec.format(self)
@property
def dirname(self):
''' The dirname of the Pathname.
'''
return Pathname(dirname(self))
@property
def basename(self):
''' The basename of this Pathname.
'''