-
Notifications
You must be signed in to change notification settings - Fork 4
/
fileson.py
171 lines (141 loc) · 6.59 KB
/
fileson.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Fileson class to manipulate Fileson databases."""
import json, os, time, re
from datetime import datetime
from typing import Any, Tuple, Generator
from logdict import LogDict
from hash import sha_file
# Speed up scanning with scandir in Python 3.5 (or PIP package)
try: from os import scandir
except ImportError: from scandir import scandir
def scantree(path, skip=lambda x: False):
"""Recursively yield DirEntry objects for given directory."""
for e in scandir(path):
if skip(e.path): continue
yield e # the entry itself
if e.is_dir(follow_symlinks=False):
yield from scantree(e.path, skip)
def gmt_str(mtime: int=None) -> str:
"""Convert st_mtime to GMT string."""
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(mtime))
def gmt_epoch(mtime: str) -> int:
"""Convert YYYY-MM-DD HH:MM:SS in GMT to epoch."""
utc_time = datetime.strptime(mtime, '%Y-%m-%d %H:%M:%S')
return int((utc_time - datetime(1970, 1, 1)).total_seconds())
class Fileson(LogDict):
"""File database with previous versions support based on LogDict.
The file format is fully compatible so you can use :meth:`LogDict.create`
to instantiate one. Special keys like :scan:, :checksum: used for metadata
and additional :meth:`files` and :meth:`dirs` methods expose certain types
of contents. Also, :meth:`set` used to implement "set if changed"
functionality.
"""
summer = {
'none': lambda p,f: None,
'sha1': lambda p,f: sha_file(p),
'sha1fast': lambda p,f: sha_file(p, quick=True)+str(f['size']),
}
@classmethod
def load_or_scan(cls: 'Fileson', db_or_dir: str, **kwargs) -> 'Fileson':
"""Load Fileson database or create one by scanning a directory.
This basically calls :meth:`load` or creates a new
instance and uses :meth:`scan` after it (passing kwargs).
Args:
db_or_dir (str): Database or directory name
Returns:
Fileson: New class instance
"""
if os.path.isdir(db_or_dir):
fs = cls()
fs.scan(db_or_dir, **kwargs)
return fs
else: return cls.load(db_or_dir)
@classmethod
def load(cls: 'Fileson', dbfile: str) -> 'Fileson':
"""Overloaded class method to support f.fson~1 history syntax."""
m = re.match(r'(.*)~(\d+)', dbfile)
if m: dbfile = m.group(1)
fs = super(Fileson, cls).load(dbfile)
if m: end = (':scan:', fs[':scan:'] - int(m.group(2)) + 1)
return fs.slice(None, end) if m else fs
def dirs(self) -> list:
"""Return paths to dirs."""
return [p for p in self if p[0] != ':' and not 'size' in self[p]]
def files(self) -> list:
"""Return paths to files."""
return [p for p in self if p[0] != ':' and 'size' in self[p]]
def set(self, key: Any, val: Any) -> bool:
"""Set key to val if there's a change, in which case return True."""
if key in self and self[key] == val: return False
self[key] = val # change will be recorded by LogDict
return True
def scan(self, directory: str, **kwargs) -> None:
"""Scan a directory for objects or changes.
Every invocation creates a new 'run', a version to Fileson
database. Only changes need to be stored. You can then use
for example :meth:`genItems` and pick only objects that
were changed on a given run.
Args:
directory (str): Directory to scan
**kwargs: Booleans 'verbose' and 'strict' control behaviour
"""
checksum = kwargs.get('checksum', None)
verbose = kwargs.get('verbose', 0)
skiplist = kwargs.get('skip', [])
strict = kwargs.get('strict', False)
# On strict mode, use full path as key, otherwise just the filename.
# Additionally, store the modified time and size to detect changes.
# Other metadata is not used for comparison.
make_key = lambda p,f: (p if strict else p.split(os.sep)[-1],
f['modified_gmt'], f['size'])
# Set metadata for run
self[':scan:'] = self.get(':scan:', 0) + 1 # first in a scan!
self[':directory:'] = directory
self[':checksum:'] = checksum
self[':date_gmt:'] = gmt_str()
# Create checksum cache, make_key is used to store and retrieve
ccache = {}
if checksum:
for p in self.files():
f = self[p]
if isinstance(f, dict) and checksum in f:
ccache[make_key(p,f)] = f[checksum]
missing = set(self.files()) | set(self.dirs())
skip = lambda p: any(pat in p for pat in skiplist)
startTime, fileCount, byteCount, seenG = time.time(), 0, 0, 0
if verbose: print('Scanning', directory, 'skipping', skiplist)
for e in scantree(directory, skip):
p = os.path.relpath(e.path, directory)
missing.discard(p)
# Store symlink details
if e.is_symlink():
# Get relative path to target
relative = os.path.relpath(os.readlink(e.path), directory)
self.set(p, { 'link': relative,
'modified_gmt': gmt_str(e.stat().st_mtime),
'permissions': e.stat().st_mode })
if verbose > 1: print('Symlink', p, '->', self[p]['link'])
# Process directories
elif e.is_dir(follow_symlinks=False):
self.set(p, { 'modified_gmt': gmt_str(e.stat().st_mtime),
'permissions': e.stat().st_mode})
# Should be a file
else:
f = { 'size': e.stat().st_size,
'modified_gmt': gmt_str(e.stat().st_mtime),
'permissions': e.stat().st_mode }
if checksum:
if verbose > 1 and not make_key(p,f) in ccache:
print(checksum, p)
f[checksum] = ccache.get(make_key(p,f), None) or \
Fileson.summer[checksum](e.path, f)
self.set(p, f)
if verbose >= 1:
fileCount += 1
byteCount += f['size']
if byteCount // 2**30 > seenG:
seenG = byteCount // 2**30
secs = time.time() - startTime
print(f'{fileCount} files, {seenG:.2f} GiB in {secs}s')
for p in missing:
if verbose > 1: print('Removed missing entry', p)
del self[p] # remove elements not seen this time