diff --git a/astacus/common/progress.py b/astacus/common/progress.py index 762666c4..cb10cb6d 100644 --- a/astacus/common/progress.py +++ b/astacus/common/progress.py @@ -65,7 +65,6 @@ def wrap(self, i: Iterable[T]) -> Iterable[T]: for item in i: yield item self.add_success() - self.done() return None def start(self, n) -> None: diff --git a/astacus/common/snapshot.py b/astacus/common/snapshot.py index 2251f250..07778d69 100644 --- a/astacus/common/snapshot.py +++ b/astacus/common/snapshot.py @@ -4,6 +4,7 @@ """ from astacus.common.magic import DEFAULT_EMBEDDED_FILE_SIZE from typing import Sequence +from typing_extensions import Self import dataclasses @@ -15,3 +16,6 @@ class SnapshotGroup: excluded_names: Sequence[str] = () # None means "no limit": all files matching the glob will be embedded embedded_file_size_max: int | None = DEFAULT_EMBEDDED_FILE_SIZE + + def without_excluded_names(self) -> Self: + return dataclasses.replace(self, excluded_names=()) diff --git a/astacus/node/memory_snapshot.py b/astacus/node/memory_snapshot.py index 2a5dd80a..2b5f587c 100644 --- a/astacus/node/memory_snapshot.py +++ b/astacus/node/memory_snapshot.py @@ -11,7 +11,6 @@ from astacus.common.snapshot import SnapshotGroup from astacus.node.snapshot import Snapshot from astacus.node.snapshotter import hash_hexdigest_readable, Snapshotter -from glob import iglob from pathlib import Path from typing import Iterable, Iterator, Mapping, Sequence @@ -77,13 +76,11 @@ def get_all_digests(self) -> Iterable[SnapshotHash]: class MemorySnapshotter(Snapshotter[MemorySnapshot]): def _list_files(self, basepath: Path) -> list[FoundFile]: result_files = set() - for group in self._groups: - for p in iglob(group.root_glob, root_dir=basepath, recursive=True): + for group in self._groups.groups: + for p in group.glob(root_dir=basepath): path = basepath / p if not path.is_file() or path.is_symlink(): continue - if path.name in group.excluded_names: - continue relpath = path.relative_to(basepath) for parent in relpath.parents: if parent.name == magic.ASTACUS_TMPDIR: @@ -92,9 +89,7 @@ def _list_files(self, basepath: Path) -> list[FoundFile]: result_files.add( FoundFile( relative_path=relpath, - group=SnapshotGroup( - root_glob=group.root_glob, embedded_file_size_max=group.embedded_file_size_max - ), + group=group.group.without_excluded_names(), ) ) return sorted(result_files, key=lambda found_file: found_file.relative_path) @@ -151,7 +146,7 @@ def _snapshot_remove_extra_files(self, *, src_files: Sequence[FoundFile], dst_fi snapshotfile = self.snapshot.get_file(found_file.relative_path) if snapshotfile: self.snapshot.remove_file(snapshotfile) - dst_path.unlink() + dst_path.unlink(missing_ok=True) if increase_worth_reporting(i): logger.info("#%d. extra file: %r", i, found_file.relative_path) changes += 1 diff --git a/astacus/node/snapshot_groups.py b/astacus/node/snapshot_groups.py new file mode 100644 index 00000000..75e0a2fa --- /dev/null +++ b/astacus/node/snapshot_groups.py @@ -0,0 +1,59 @@ +""" + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details + +Classes for working with snapshot groups. + +""" +from astacus.common.snapshot import SnapshotGroup +from pathlib import Path +from typing import Iterable, Optional, Sequence +from typing_extensions import Self +from wcmatch.glob import GLOBSTAR, iglob, translate + +import dataclasses +import os +import re + +WCMATCH_FLAGS = GLOBSTAR + + +@dataclasses.dataclass +class CompiledGroup: + group: SnapshotGroup + regex: re.Pattern + + @classmethod + def compile(cls, group: SnapshotGroup) -> Self: + return cls(group, glob_compile(group.root_glob)) + + def matches(self, relative_path: Path) -> bool: + return bool(self.regex.match(str(relative_path))) and relative_path.name not in self.group.excluded_names + + def glob(self, root_dir: Optional[Path] = None) -> Iterable[str]: + for path in iglob(self.group.root_glob, root_dir=root_dir, flags=WCMATCH_FLAGS): + if os.path.basename(path) not in self.group.excluded_names: + yield path + + +@dataclasses.dataclass +class CompiledGroups: + groups: Sequence[CompiledGroup] + + @classmethod + def compile(cls, groups: Sequence[SnapshotGroup]) -> Self: + return cls([CompiledGroup.compile(group) for group in groups]) + + def get_matching(self, relative_path: Path) -> list[SnapshotGroup]: + return [group.group for group in self.groups if group.matches(relative_path)] + + def any_match(self, relative_path: Path) -> bool: + return any(group.matches(relative_path) for group in self.groups) + + def root_globs(self) -> list[str]: + return [group.group.root_glob for group in self.groups] + + +def glob_compile(glob: str) -> re.Pattern: + return re.compile(translate(glob, flags=WCMATCH_FLAGS)[0][0]) diff --git a/astacus/node/snapshot_op.py b/astacus/node/snapshot_op.py index d8e550d9..6fb0fa0e 100644 --- a/astacus/node/snapshot_op.py +++ b/astacus/node/snapshot_op.py @@ -101,3 +101,4 @@ def release(self) -> None: with self.snapshotter.lock: self.check_op_id() self.snapshotter.release(self.req.hexdigests, progress=self.result.progress) + self.result.progress.done() diff --git a/astacus/node/snapshotter.py b/astacus/node/snapshotter.py index 2f5a81c4..94f9cfa1 100644 --- a/astacus/node/snapshotter.py +++ b/astacus/node/snapshotter.py @@ -10,6 +10,7 @@ from astacus.common.progress import Progress from astacus.common.snapshot import SnapshotGroup from astacus.node.snapshot import Snapshot +from astacus.node.snapshot_groups import CompiledGroups from multiprocessing import dummy from pathlib import Path from threading import Lock @@ -24,11 +25,10 @@ class Snapshotter(ABC, Generic[T]): def __init__(self, groups: Sequence[SnapshotGroup], src: Path, dst: Path, snapshot: T, parallel: int) -> None: - assert groups # model has empty; either plugin or configuration must supply them self.snapshot = snapshot self._src = src self._dst = dst - self._groups = groups + self._groups = CompiledGroups.compile(groups) self._parallel = parallel self._dst.mkdir(parents=True, exist_ok=True) @@ -45,9 +45,7 @@ def release(self, hexdigests: Iterable[str], *, progress: Progress) -> None: ... def get_snapshot_state(self) -> SnapshotState: - return SnapshotState( - root_globs=[group.root_glob for group in self._groups], files=list(self.snapshot.get_all_files()) - ) + return SnapshotState(root_globs=self._groups.root_globs(), files=list(self.snapshot.get_all_files())) def _file_in_src(self, relative_path: Path) -> SnapshotFile: src_path = self._src / relative_path @@ -71,10 +69,7 @@ def _cb(snapshotfile: SnapshotFile) -> SnapshotFile: yield from p.imap_unordered(_cb, files) def _embedded_file_size_max_for_file(self, file: SnapshotFile) -> int | None: - groups = [] - for group in self._groups: - if file.relative_path.match(group.root_glob): - groups.append(group) + groups = self._groups.get_matching(file.relative_path) assert groups head, *tail = groups for group in tail: diff --git a/astacus/node/sqlite_snapshot.py b/astacus/node/sqlite_snapshot.py index 6e87fdca..38380a92 100644 --- a/astacus/node/sqlite_snapshot.py +++ b/astacus/node/sqlite_snapshot.py @@ -11,14 +11,16 @@ from astacus.node.snapshot import Snapshot from astacus.node.snapshotter import Snapshotter from contextlib import closing -from fnmatch import fnmatch from pathlib import Path from typing import Iterable, Sequence from typing_extensions import override +import logging import os import sqlite3 +logger = logging.getLogger(__name__) + class SQLiteSnapshot(Snapshot): def __init__(self, dst: Path, db: Path) -> None: @@ -107,15 +109,17 @@ def __init__( def perform_snapshot(self, *, progress: Progress) -> None: files = self._list_files_and_create_directories() - new_or_existing = self._compare_current_snapshot(files) - for_upsert = self._compare_with_src(new_or_existing) - with_digests = self._compute_digests(for_upsert) - self._upsert_files(with_digests) - self._con.execute("drop table if exists new_files;") - self._con.commit() + with self._con: + self._con.execute("begin") + new_or_existing = self._compare_current_snapshot(files) + for_upsert = self._compare_with_src(new_or_existing) + with_digests = self._compute_digests(for_upsert) + self._upsert_files(with_digests) + self._con.execute("drop table if exists new_files;") def _list_files_and_create_directories(self) -> Iterable[Path]: """List all files, and create directories in src.""" + logger.info("Listing files in %s", self._src) for dir_, _, files in os.walk(self._src): dir_path = Path(dir_) if any(parent.name == magic.ASTACUS_TMPDIR for parent in dir_path.parents): @@ -124,15 +128,8 @@ def _list_files_and_create_directories(self) -> Iterable[Path]: (self._dst / rel_dir).mkdir(parents=True, exist_ok=True) for f in files: rel_path = rel_dir / f - full_path = dir_path / f - if full_path.is_symlink(): - continue - for group in self._groups: - # fnmatch works strangely with paths until 3.13 so convert to string - # https://github.com/python/cpython/issues/73435 - if fnmatch(str(rel_path), group.root_glob) and f not in group.excluded_names: - yield rel_path - break + if not (dir_path / f).is_symlink() and self._groups.any_match(rel_path): + yield rel_path def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Path, SnapshotFile | None]]: with closing(self._con.cursor()) as cur: @@ -164,9 +161,10 @@ def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Pat """ ) if not self._same_root_mode(): + logger.info("Deleting files in %s that are not in current snapshot", self._dst) for (relative_path,) in cur: - os.unlink(self._dst / relative_path) - self._con.commit() + assert isinstance(relative_path, str) + (self._dst / relative_path).unlink(missing_ok=True) cur.execute( """ insert into new_files @@ -201,13 +199,18 @@ def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Pat yield Path(row[0]), row_to_snapshotfile(row) def _compare_with_src(self, files: Iterable[tuple[Path, SnapshotFile | None]]) -> Iterable[SnapshotFile]: + logger.info("Checking metadata for files in %s", self._dst) for relpath, existing in files: - new = self._file_in_src(relpath) - if existing is None or not existing.underlying_file_is_the_same(new): - self._maybe_link(relpath) - yield new + try: + new = self._file_in_src(relpath) + if existing is None or not existing.underlying_file_is_the_same(new): + self._maybe_link(relpath) + yield new + except FileNotFoundError: + logger.warning("File %s disappeared while snapshotting", relpath) def _upsert_files(self, files: Iterable[SnapshotFile]) -> None: + logger.info("Upserting files in snapshot db") self._con.executemany( """ insert or replace @@ -219,28 +222,31 @@ def _upsert_files(self, files: Iterable[SnapshotFile]) -> None: ) def release(self, hexdigests: Iterable[str], *, progress: Progress) -> None: - with closing(self._con.cursor()) as cur: - cur.execute( - """ - create temporary table hexdigests ( - hexdigest text not null - ); - """ - ) - cur.executemany( - "insert into hexdigests (hexdigest) values (?);", - ((h,) for h in hexdigests if h != ""), - ) - cur.execute( - """ - select relative_path - from snapshot_files - where hexdigest in (select hexdigest from hexdigests); - """ - ) - for (relative_path,) in cur: - (self._dst / relative_path).unlink(missing_ok=True) - cur.execute("drop table hexdigests;") + with self._con: + self._con.execute("begin") + with closing(self._con.cursor()) as cur: + cur.execute( + """ + create temporary table hexdigests ( + hexdigest text not null + ); + """ + ) + cur.executemany( + "insert into hexdigests (hexdigest) values (?);", + ((h,) for h in hexdigests if h != ""), + ) + cur.execute( + """ + select relative_path + from snapshot_files + where hexdigest in (select hexdigest from hexdigests); + """ + ) + for (relative_path,) in cur: + assert isinstance(relative_path, str) + (self._dst / relative_path).unlink(missing_ok=True) + cur.execute("drop table hexdigests;") def row_to_path_and_snapshotfile(row: tuple) -> tuple[Path, SnapshotFile | None]: diff --git a/setup.cfg b/setup.cfg index 858f034b..41f57a92 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,6 +18,7 @@ install_requires = tabulate==0.9.0 typing-extensions==4.7.1 uvicorn==0.15.0 + wcmatch==8.4.1 # Pinned transitive deps pydantic==1.10.2 diff --git a/tests/unit/node/test_snapshot_groups.py b/tests/unit/node/test_snapshot_groups.py new file mode 100644 index 00000000..05b5f7c5 --- /dev/null +++ b/tests/unit/node/test_snapshot_groups.py @@ -0,0 +1,69 @@ +""" + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details + +""" +from astacus.common.snapshot import SnapshotGroup +from astacus.node.snapshot_groups import CompiledGroup, CompiledGroups, glob_compile +from pathlib import Path + +import os + +POSITIVE_TEST_CASES: list[tuple[Path, str]] = [ + (Path("foo"), "foo"), + (Path("foo"), "*"), + (Path("foo/bar"), "*/bar"), + (Path("foo"), "**"), + (Path("foo/bar"), "**"), + (Path("foo/bar/baz"), "**/*"), + (Path("foo/bar"), "**/*"), + (Path("foo/bar"), "**/**"), +] + +NEGATIVE_TEST_CASES: list[tuple[Path, str]] = [ + (Path("foo/bar/baz"), "*/*"), + (Path("foo"), "foobar"), + (Path("foo"), "*/foo"), +] + + +def test_compile() -> None: + for path, glob in POSITIVE_TEST_CASES: + assert glob_compile(glob).match(str(path)) is not None + for path, glob in NEGATIVE_TEST_CASES: + assert glob_compile(glob).match(str(path)) is None + + +def test_CompiledGroup_matches() -> None: + for path, glob in POSITIVE_TEST_CASES: + group = SnapshotGroup(root_glob=glob) + assert CompiledGroup.compile(group).matches(path) + group = SnapshotGroup(root_glob=glob, excluded_names=[os.path.basename(path)]) + assert not CompiledGroup.compile(group).matches(path) + for path, glob in NEGATIVE_TEST_CASES: + group = SnapshotGroup(root_glob=glob) + assert not CompiledGroup.compile(group).matches(path) + + +def test_CompiledGroups() -> None: + for path, glob in POSITIVE_TEST_CASES: + group1 = SnapshotGroup(root_glob=glob) + group2 = SnapshotGroup(root_glob=glob, excluded_names=[os.path.basename(path)]) + group3 = SnapshotGroup(root_glob="doesntmatch") + compiled = CompiledGroups.compile([group1, group2, group3]) + assert compiled.any_match(path) + assert compiled.get_matching(path) == [group1] + + +def test_CompiledGroup_glob(tmp_path: Path) -> None: + for p, _ in POSITIVE_TEST_CASES + NEGATIVE_TEST_CASES: + p = tmp_path / p + p.mkdir(parents=True, exist_ok=True) + p.touch() + for p, glob in POSITIVE_TEST_CASES: + group = SnapshotGroup(root_glob=glob) + assert str(p) in CompiledGroup.compile(group).glob(tmp_path) + for p, glob in NEGATIVE_TEST_CASES: + group = SnapshotGroup(root_glob=glob) + assert str(p) not in CompiledGroup.compile(group).glob(tmp_path)