Skip to content
This repository was archived by the owner on Feb 12, 2025. It is now read-only.

Commit f788129

Browse files
Merge pull request #154 from Aiven-Open/joelynch/glob-pain
snapshotter: use wcmatch.glob.globmatch function #154
2 parents b46086f + 63b5063 commit f788129

9 files changed

+192
-63
lines changed

astacus/common/progress.py

-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ def wrap(self, i: Iterable[T]) -> Iterable[T]:
6565
for item in i:
6666
yield item
6767
self.add_success()
68-
self.done()
6968
return None
7069

7170
def start(self, n) -> None:

astacus/common/snapshot.py

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55
from astacus.common.magic import DEFAULT_EMBEDDED_FILE_SIZE
66
from typing import Sequence
7+
from typing_extensions import Self
78

89
import dataclasses
910

@@ -15,3 +16,6 @@ class SnapshotGroup:
1516
excluded_names: Sequence[str] = ()
1617
# None means "no limit": all files matching the glob will be embedded
1718
embedded_file_size_max: int | None = DEFAULT_EMBEDDED_FILE_SIZE
19+
20+
def without_excluded_names(self) -> Self:
21+
return dataclasses.replace(self, excluded_names=())

astacus/node/memory_snapshot.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from astacus.common.snapshot import SnapshotGroup
1212
from astacus.node.snapshot import Snapshot
1313
from astacus.node.snapshotter import hash_hexdigest_readable, Snapshotter
14-
from glob import iglob
1514
from pathlib import Path
1615
from typing import Iterable, Iterator, Mapping, Sequence
1716

@@ -77,13 +76,11 @@ def get_all_digests(self) -> Iterable[SnapshotHash]:
7776
class MemorySnapshotter(Snapshotter[MemorySnapshot]):
7877
def _list_files(self, basepath: Path) -> list[FoundFile]:
7978
result_files = set()
80-
for group in self._groups:
81-
for p in iglob(group.root_glob, root_dir=basepath, recursive=True):
79+
for group in self._groups.groups:
80+
for p in group.glob(root_dir=basepath):
8281
path = basepath / p
8382
if not path.is_file() or path.is_symlink():
8483
continue
85-
if path.name in group.excluded_names:
86-
continue
8784
relpath = path.relative_to(basepath)
8885
for parent in relpath.parents:
8986
if parent.name == magic.ASTACUS_TMPDIR:
@@ -92,9 +89,7 @@ def _list_files(self, basepath: Path) -> list[FoundFile]:
9289
result_files.add(
9390
FoundFile(
9491
relative_path=relpath,
95-
group=SnapshotGroup(
96-
root_glob=group.root_glob, embedded_file_size_max=group.embedded_file_size_max
97-
),
92+
group=group.group.without_excluded_names(),
9893
)
9994
)
10095
return sorted(result_files, key=lambda found_file: found_file.relative_path)
@@ -151,7 +146,7 @@ def _snapshot_remove_extra_files(self, *, src_files: Sequence[FoundFile], dst_fi
151146
snapshotfile = self.snapshot.get_file(found_file.relative_path)
152147
if snapshotfile:
153148
self.snapshot.remove_file(snapshotfile)
154-
dst_path.unlink()
149+
dst_path.unlink(missing_ok=True)
155150
if increase_worth_reporting(i):
156151
logger.info("#%d. extra file: %r", i, found_file.relative_path)
157152
changes += 1

astacus/node/snapshot_groups.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""
2+
3+
Copyright (c) 2023 Aiven Ltd
4+
See LICENSE for details
5+
6+
Classes for working with snapshot groups.
7+
8+
"""
9+
from astacus.common.snapshot import SnapshotGroup
10+
from pathlib import Path
11+
from typing import Iterable, Optional, Sequence
12+
from typing_extensions import Self
13+
from wcmatch.glob import GLOBSTAR, iglob, translate
14+
15+
import dataclasses
16+
import os
17+
import re
18+
19+
WCMATCH_FLAGS = GLOBSTAR
20+
21+
22+
@dataclasses.dataclass
23+
class CompiledGroup:
24+
group: SnapshotGroup
25+
regex: re.Pattern
26+
27+
@classmethod
28+
def compile(cls, group: SnapshotGroup) -> Self:
29+
return cls(group, glob_compile(group.root_glob))
30+
31+
def matches(self, relative_path: Path) -> bool:
32+
return bool(self.regex.match(str(relative_path))) and relative_path.name not in self.group.excluded_names
33+
34+
def glob(self, root_dir: Optional[Path] = None) -> Iterable[str]:
35+
for path in iglob(self.group.root_glob, root_dir=root_dir, flags=WCMATCH_FLAGS):
36+
if os.path.basename(path) not in self.group.excluded_names:
37+
yield path
38+
39+
40+
@dataclasses.dataclass
41+
class CompiledGroups:
42+
groups: Sequence[CompiledGroup]
43+
44+
@classmethod
45+
def compile(cls, groups: Sequence[SnapshotGroup]) -> Self:
46+
return cls([CompiledGroup.compile(group) for group in groups])
47+
48+
def get_matching(self, relative_path: Path) -> list[SnapshotGroup]:
49+
return [group.group for group in self.groups if group.matches(relative_path)]
50+
51+
def any_match(self, relative_path: Path) -> bool:
52+
return any(group.matches(relative_path) for group in self.groups)
53+
54+
def root_globs(self) -> list[str]:
55+
return [group.group.root_glob for group in self.groups]
56+
57+
58+
def glob_compile(glob: str) -> re.Pattern:
59+
return re.compile(translate(glob, flags=WCMATCH_FLAGS)[0][0])

astacus/node/snapshot_op.py

+1
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,4 @@ def release(self) -> None:
101101
with self.snapshotter.lock:
102102
self.check_op_id()
103103
self.snapshotter.release(self.req.hexdigests, progress=self.result.progress)
104+
self.result.progress.done()

astacus/node/snapshotter.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from astacus.common.progress import Progress
1111
from astacus.common.snapshot import SnapshotGroup
1212
from astacus.node.snapshot import Snapshot
13+
from astacus.node.snapshot_groups import CompiledGroups
1314
from multiprocessing import dummy
1415
from pathlib import Path
1516
from threading import Lock
@@ -24,11 +25,10 @@
2425

2526
class Snapshotter(ABC, Generic[T]):
2627
def __init__(self, groups: Sequence[SnapshotGroup], src: Path, dst: Path, snapshot: T, parallel: int) -> None:
27-
assert groups # model has empty; either plugin or configuration must supply them
2828
self.snapshot = snapshot
2929
self._src = src
3030
self._dst = dst
31-
self._groups = groups
31+
self._groups = CompiledGroups.compile(groups)
3232
self._parallel = parallel
3333
self._dst.mkdir(parents=True, exist_ok=True)
3434

@@ -45,9 +45,7 @@ def release(self, hexdigests: Iterable[str], *, progress: Progress) -> None:
4545
...
4646

4747
def get_snapshot_state(self) -> SnapshotState:
48-
return SnapshotState(
49-
root_globs=[group.root_glob for group in self._groups], files=list(self.snapshot.get_all_files())
50-
)
48+
return SnapshotState(root_globs=self._groups.root_globs(), files=list(self.snapshot.get_all_files()))
5149

5250
def _file_in_src(self, relative_path: Path) -> SnapshotFile:
5351
src_path = self._src / relative_path
@@ -71,10 +69,7 @@ def _cb(snapshotfile: SnapshotFile) -> SnapshotFile:
7169
yield from p.imap_unordered(_cb, files)
7270

7371
def _embedded_file_size_max_for_file(self, file: SnapshotFile) -> int | None:
74-
groups = []
75-
for group in self._groups:
76-
if file.relative_path.match(group.root_glob):
77-
groups.append(group)
72+
groups = self._groups.get_matching(file.relative_path)
7873
assert groups
7974
head, *tail = groups
8075
for group in tail:

astacus/node/sqlite_snapshot.py

+50-44
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@
1111
from astacus.node.snapshot import Snapshot
1212
from astacus.node.snapshotter import Snapshotter
1313
from contextlib import closing
14-
from fnmatch import fnmatch
1514
from pathlib import Path
1615
from typing import Iterable, Sequence
1716
from typing_extensions import override
1817

18+
import logging
1919
import os
2020
import sqlite3
2121

22+
logger = logging.getLogger(__name__)
23+
2224

2325
class SQLiteSnapshot(Snapshot):
2426
def __init__(self, dst: Path, db: Path) -> None:
@@ -107,15 +109,17 @@ def __init__(
107109

108110
def perform_snapshot(self, *, progress: Progress) -> None:
109111
files = self._list_files_and_create_directories()
110-
new_or_existing = self._compare_current_snapshot(files)
111-
for_upsert = self._compare_with_src(new_or_existing)
112-
with_digests = self._compute_digests(for_upsert)
113-
self._upsert_files(with_digests)
114-
self._con.execute("drop table if exists new_files;")
115-
self._con.commit()
112+
with self._con:
113+
self._con.execute("begin")
114+
new_or_existing = self._compare_current_snapshot(files)
115+
for_upsert = self._compare_with_src(new_or_existing)
116+
with_digests = self._compute_digests(for_upsert)
117+
self._upsert_files(with_digests)
118+
self._con.execute("drop table if exists new_files;")
116119

117120
def _list_files_and_create_directories(self) -> Iterable[Path]:
118121
"""List all files, and create directories in src."""
122+
logger.info("Listing files in %s", self._src)
119123
for dir_, _, files in os.walk(self._src):
120124
dir_path = Path(dir_)
121125
if any(parent.name == magic.ASTACUS_TMPDIR for parent in dir_path.parents):
@@ -124,15 +128,8 @@ def _list_files_and_create_directories(self) -> Iterable[Path]:
124128
(self._dst / rel_dir).mkdir(parents=True, exist_ok=True)
125129
for f in files:
126130
rel_path = rel_dir / f
127-
full_path = dir_path / f
128-
if full_path.is_symlink():
129-
continue
130-
for group in self._groups:
131-
# fnmatch works strangely with paths until 3.13 so convert to string
132-
# https://github.com/python/cpython/issues/73435
133-
if fnmatch(str(rel_path), group.root_glob) and f not in group.excluded_names:
134-
yield rel_path
135-
break
131+
if not (dir_path / f).is_symlink() and self._groups.any_match(rel_path):
132+
yield rel_path
136133

137134
def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Path, SnapshotFile | None]]:
138135
with closing(self._con.cursor()) as cur:
@@ -164,9 +161,10 @@ def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Pat
164161
"""
165162
)
166163
if not self._same_root_mode():
164+
logger.info("Deleting files in %s that are not in current snapshot", self._dst)
167165
for (relative_path,) in cur:
168-
os.unlink(self._dst / relative_path)
169-
self._con.commit()
166+
assert isinstance(relative_path, str)
167+
(self._dst / relative_path).unlink(missing_ok=True)
170168
cur.execute(
171169
"""
172170
insert into new_files
@@ -201,13 +199,18 @@ def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Pat
201199
yield Path(row[0]), row_to_snapshotfile(row)
202200

203201
def _compare_with_src(self, files: Iterable[tuple[Path, SnapshotFile | None]]) -> Iterable[SnapshotFile]:
202+
logger.info("Checking metadata for files in %s", self._dst)
204203
for relpath, existing in files:
205-
new = self._file_in_src(relpath)
206-
if existing is None or not existing.underlying_file_is_the_same(new):
207-
self._maybe_link(relpath)
208-
yield new
204+
try:
205+
new = self._file_in_src(relpath)
206+
if existing is None or not existing.underlying_file_is_the_same(new):
207+
self._maybe_link(relpath)
208+
yield new
209+
except FileNotFoundError:
210+
logger.warning("File %s disappeared while snapshotting", relpath)
209211

210212
def _upsert_files(self, files: Iterable[SnapshotFile]) -> None:
213+
logger.info("Upserting files in snapshot db")
211214
self._con.executemany(
212215
"""
213216
insert or replace
@@ -219,28 +222,31 @@ def _upsert_files(self, files: Iterable[SnapshotFile]) -> None:
219222
)
220223

221224
def release(self, hexdigests: Iterable[str], *, progress: Progress) -> None:
222-
with closing(self._con.cursor()) as cur:
223-
cur.execute(
224-
"""
225-
create temporary table hexdigests (
226-
hexdigest text not null
227-
);
228-
"""
229-
)
230-
cur.executemany(
231-
"insert into hexdigests (hexdigest) values (?);",
232-
((h,) for h in hexdigests if h != ""),
233-
)
234-
cur.execute(
235-
"""
236-
select relative_path
237-
from snapshot_files
238-
where hexdigest in (select hexdigest from hexdigests);
239-
"""
240-
)
241-
for (relative_path,) in cur:
242-
(self._dst / relative_path).unlink(missing_ok=True)
243-
cur.execute("drop table hexdigests;")
225+
with self._con:
226+
self._con.execute("begin")
227+
with closing(self._con.cursor()) as cur:
228+
cur.execute(
229+
"""
230+
create temporary table hexdigests (
231+
hexdigest text not null
232+
);
233+
"""
234+
)
235+
cur.executemany(
236+
"insert into hexdigests (hexdigest) values (?);",
237+
((h,) for h in hexdigests if h != ""),
238+
)
239+
cur.execute(
240+
"""
241+
select relative_path
242+
from snapshot_files
243+
where hexdigest in (select hexdigest from hexdigests);
244+
"""
245+
)
246+
for (relative_path,) in cur:
247+
assert isinstance(relative_path, str)
248+
(self._dst / relative_path).unlink(missing_ok=True)
249+
cur.execute("drop table hexdigests;")
244250

245251

246252
def row_to_path_and_snapshotfile(row: tuple) -> tuple[Path, SnapshotFile | None]:

setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ install_requires =
1818
tabulate==0.9.0
1919
typing-extensions==4.7.1
2020
uvicorn==0.15.0
21+
wcmatch==8.4.1
2122
# Pinned transitive deps
2223
pydantic==1.10.2
2324

0 commit comments

Comments
 (0)