Skip to content
This repository was archived by the owner on Feb 12, 2025. It is now read-only.

snapshotter: use wcmatch.glob.globmatch function #154

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion astacus/common/progress.py
Original file line number Diff line number Diff line change
@@ -65,7 +65,6 @@ def wrap(self, i: Iterable[T]) -> Iterable[T]:
for item in i:
yield item
self.add_success()
self.done()
return None

def start(self, n) -> None:
4 changes: 4 additions & 0 deletions astacus/common/snapshot.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
"""
from astacus.common.magic import DEFAULT_EMBEDDED_FILE_SIZE
from typing import Sequence
from typing_extensions import Self

import dataclasses

@@ -15,3 +16,6 @@ class SnapshotGroup:
excluded_names: Sequence[str] = ()
# None means "no limit": all files matching the glob will be embedded
embedded_file_size_max: int | None = DEFAULT_EMBEDDED_FILE_SIZE

def without_excluded_names(self) -> Self:
return dataclasses.replace(self, excluded_names=())
13 changes: 4 additions & 9 deletions astacus/node/memory_snapshot.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
from astacus.common.snapshot import SnapshotGroup
from astacus.node.snapshot import Snapshot
from astacus.node.snapshotter import hash_hexdigest_readable, Snapshotter
from glob import iglob
from pathlib import Path
from typing import Iterable, Iterator, Mapping, Sequence

@@ -77,13 +76,11 @@ def get_all_digests(self) -> Iterable[SnapshotHash]:
class MemorySnapshotter(Snapshotter[MemorySnapshot]):
def _list_files(self, basepath: Path) -> list[FoundFile]:
result_files = set()
for group in self._groups:
for p in iglob(group.root_glob, root_dir=basepath, recursive=True):
for group in self._groups.groups:
for p in group.glob(root_dir=basepath):
path = basepath / p
if not path.is_file() or path.is_symlink():
continue
if path.name in group.excluded_names:
continue
relpath = path.relative_to(basepath)
for parent in relpath.parents:
if parent.name == magic.ASTACUS_TMPDIR:
@@ -92,9 +89,7 @@ def _list_files(self, basepath: Path) -> list[FoundFile]:
result_files.add(
FoundFile(
relative_path=relpath,
group=SnapshotGroup(
root_glob=group.root_glob, embedded_file_size_max=group.embedded_file_size_max
),
group=group.group.without_excluded_names(),
)
)
return sorted(result_files, key=lambda found_file: found_file.relative_path)
@@ -151,7 +146,7 @@ def _snapshot_remove_extra_files(self, *, src_files: Sequence[FoundFile], dst_fi
snapshotfile = self.snapshot.get_file(found_file.relative_path)
if snapshotfile:
self.snapshot.remove_file(snapshotfile)
dst_path.unlink()
dst_path.unlink(missing_ok=True)
if increase_worth_reporting(i):
logger.info("#%d. extra file: %r", i, found_file.relative_path)
changes += 1
59 changes: 59 additions & 0 deletions astacus/node/snapshot_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
Classes for working with snapshot groups.
"""
from astacus.common.snapshot import SnapshotGroup
from pathlib import Path
from typing import Iterable, Optional, Sequence
from typing_extensions import Self
from wcmatch.glob import GLOBSTAR, iglob, translate

import dataclasses
import os
import re

WCMATCH_FLAGS = GLOBSTAR


@dataclasses.dataclass
class CompiledGroup:
group: SnapshotGroup
regex: re.Pattern

@classmethod
def compile(cls, group: SnapshotGroup) -> Self:
return cls(group, glob_compile(group.root_glob))

def matches(self, relative_path: Path) -> bool:
return bool(self.regex.match(str(relative_path))) and relative_path.name not in self.group.excluded_names

def glob(self, root_dir: Optional[Path] = None) -> Iterable[str]:
for path in iglob(self.group.root_glob, root_dir=root_dir, flags=WCMATCH_FLAGS):
if os.path.basename(path) not in self.group.excluded_names:
yield path


@dataclasses.dataclass
class CompiledGroups:
groups: Sequence[CompiledGroup]

@classmethod
def compile(cls, groups: Sequence[SnapshotGroup]) -> Self:
return cls([CompiledGroup.compile(group) for group in groups])

def get_matching(self, relative_path: Path) -> list[SnapshotGroup]:
return [group.group for group in self.groups if group.matches(relative_path)]

def any_match(self, relative_path: Path) -> bool:
return any(group.matches(relative_path) for group in self.groups)

def root_globs(self) -> list[str]:
return [group.group.root_glob for group in self.groups]


def glob_compile(glob: str) -> re.Pattern:
return re.compile(translate(glob, flags=WCMATCH_FLAGS)[0][0])
1 change: 1 addition & 0 deletions astacus/node/snapshot_op.py
Original file line number Diff line number Diff line change
@@ -101,3 +101,4 @@ def release(self) -> None:
with self.snapshotter.lock:
self.check_op_id()
self.snapshotter.release(self.req.hexdigests, progress=self.result.progress)
self.result.progress.done()
13 changes: 4 additions & 9 deletions astacus/node/snapshotter.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
from astacus.common.progress import Progress
from astacus.common.snapshot import SnapshotGroup
from astacus.node.snapshot import Snapshot
from astacus.node.snapshot_groups import CompiledGroups
from multiprocessing import dummy
from pathlib import Path
from threading import Lock
@@ -24,11 +25,10 @@

class Snapshotter(ABC, Generic[T]):
def __init__(self, groups: Sequence[SnapshotGroup], src: Path, dst: Path, snapshot: T, parallel: int) -> None:
assert groups # model has empty; either plugin or configuration must supply them
self.snapshot = snapshot
self._src = src
self._dst = dst
self._groups = groups
self._groups = CompiledGroups.compile(groups)
self._parallel = parallel
self._dst.mkdir(parents=True, exist_ok=True)

@@ -45,9 +45,7 @@ def release(self, hexdigests: Iterable[str], *, progress: Progress) -> None:
...

def get_snapshot_state(self) -> SnapshotState:
return SnapshotState(
root_globs=[group.root_glob for group in self._groups], files=list(self.snapshot.get_all_files())
)
return SnapshotState(root_globs=self._groups.root_globs(), files=list(self.snapshot.get_all_files()))

def _file_in_src(self, relative_path: Path) -> SnapshotFile:
src_path = self._src / relative_path
@@ -71,10 +69,7 @@ def _cb(snapshotfile: SnapshotFile) -> SnapshotFile:
yield from p.imap_unordered(_cb, files)

def _embedded_file_size_max_for_file(self, file: SnapshotFile) -> int | None:
groups = []
for group in self._groups:
if file.relative_path.match(group.root_glob):
groups.append(group)
groups = self._groups.get_matching(file.relative_path)
assert groups
head, *tail = groups
for group in tail:
94 changes: 50 additions & 44 deletions astacus/node/sqlite_snapshot.py
Original file line number Diff line number Diff line change
@@ -11,14 +11,16 @@
from astacus.node.snapshot import Snapshot
from astacus.node.snapshotter import Snapshotter
from contextlib import closing
from fnmatch import fnmatch
from pathlib import Path
from typing import Iterable, Sequence
from typing_extensions import override

import logging
import os
import sqlite3

logger = logging.getLogger(__name__)


class SQLiteSnapshot(Snapshot):
def __init__(self, dst: Path, db: Path) -> None:
@@ -107,15 +109,17 @@ def __init__(

def perform_snapshot(self, *, progress: Progress) -> None:
files = self._list_files_and_create_directories()
new_or_existing = self._compare_current_snapshot(files)
for_upsert = self._compare_with_src(new_or_existing)
with_digests = self._compute_digests(for_upsert)
self._upsert_files(with_digests)
self._con.execute("drop table if exists new_files;")
self._con.commit()
with self._con:
self._con.execute("begin")
new_or_existing = self._compare_current_snapshot(files)
for_upsert = self._compare_with_src(new_or_existing)
with_digests = self._compute_digests(for_upsert)
self._upsert_files(with_digests)
self._con.execute("drop table if exists new_files;")

def _list_files_and_create_directories(self) -> Iterable[Path]:
"""List all files, and create directories in src."""
logger.info("Listing files in %s", self._src)
for dir_, _, files in os.walk(self._src):
dir_path = Path(dir_)
if any(parent.name == magic.ASTACUS_TMPDIR for parent in dir_path.parents):
@@ -124,15 +128,8 @@ def _list_files_and_create_directories(self) -> Iterable[Path]:
(self._dst / rel_dir).mkdir(parents=True, exist_ok=True)
for f in files:
rel_path = rel_dir / f
full_path = dir_path / f
if full_path.is_symlink():
continue
for group in self._groups:
# fnmatch works strangely with paths until 3.13 so convert to string
# https://github.com/python/cpython/issues/73435
if fnmatch(str(rel_path), group.root_glob) and f not in group.excluded_names:
yield rel_path
break
if not (dir_path / f).is_symlink() and self._groups.any_match(rel_path):
yield rel_path

def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Path, SnapshotFile | None]]:
with closing(self._con.cursor()) as cur:
@@ -164,9 +161,10 @@ def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Pat
"""
)
if not self._same_root_mode():
logger.info("Deleting files in %s that are not in current snapshot", self._dst)
for (relative_path,) in cur:
os.unlink(self._dst / relative_path)
self._con.commit()
assert isinstance(relative_path, str)
(self._dst / relative_path).unlink(missing_ok=True)
cur.execute(
"""
insert into new_files
@@ -201,13 +199,18 @@ def _compare_current_snapshot(self, files: Iterable[Path]) -> Iterable[tuple[Pat
yield Path(row[0]), row_to_snapshotfile(row)

def _compare_with_src(self, files: Iterable[tuple[Path, SnapshotFile | None]]) -> Iterable[SnapshotFile]:
logger.info("Checking metadata for files in %s", self._dst)
for relpath, existing in files:
new = self._file_in_src(relpath)
if existing is None or not existing.underlying_file_is_the_same(new):
self._maybe_link(relpath)
yield new
try:
new = self._file_in_src(relpath)
if existing is None or not existing.underlying_file_is_the_same(new):
self._maybe_link(relpath)
yield new
except FileNotFoundError:
logger.warning("File %s disappeared while snapshotting", relpath)

def _upsert_files(self, files: Iterable[SnapshotFile]) -> None:
logger.info("Upserting files in snapshot db")
self._con.executemany(
"""
insert or replace
@@ -219,28 +222,31 @@ def _upsert_files(self, files: Iterable[SnapshotFile]) -> None:
)

def release(self, hexdigests: Iterable[str], *, progress: Progress) -> None:
with closing(self._con.cursor()) as cur:
cur.execute(
"""
create temporary table hexdigests (
hexdigest text not null
);
"""
)
cur.executemany(
"insert into hexdigests (hexdigest) values (?);",
((h,) for h in hexdigests if h != ""),
)
cur.execute(
"""
select relative_path
from snapshot_files
where hexdigest in (select hexdigest from hexdigests);
"""
)
for (relative_path,) in cur:
(self._dst / relative_path).unlink(missing_ok=True)
cur.execute("drop table hexdigests;")
with self._con:
self._con.execute("begin")
with closing(self._con.cursor()) as cur:
cur.execute(
"""
create temporary table hexdigests (
hexdigest text not null
);
"""
)
cur.executemany(
"insert into hexdigests (hexdigest) values (?);",
((h,) for h in hexdigests if h != ""),
)
cur.execute(
"""
select relative_path
from snapshot_files
where hexdigest in (select hexdigest from hexdigests);
"""
)
for (relative_path,) in cur:
assert isinstance(relative_path, str)
(self._dst / relative_path).unlink(missing_ok=True)
cur.execute("drop table hexdigests;")


def row_to_path_and_snapshotfile(row: tuple) -> tuple[Path, SnapshotFile | None]:
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ install_requires =
tabulate==0.9.0
typing-extensions==4.7.1
uvicorn==0.15.0
wcmatch==8.4.1
# Pinned transitive deps
pydantic==1.10.2

69 changes: 69 additions & 0 deletions tests/unit/node/test_snapshot_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from astacus.common.snapshot import SnapshotGroup
from astacus.node.snapshot_groups import CompiledGroup, CompiledGroups, glob_compile
from pathlib import Path

import os

POSITIVE_TEST_CASES: list[tuple[Path, str]] = [
(Path("foo"), "foo"),
(Path("foo"), "*"),
(Path("foo/bar"), "*/bar"),
(Path("foo"), "**"),
(Path("foo/bar"), "**"),
(Path("foo/bar/baz"), "**/*"),
(Path("foo/bar"), "**/*"),
(Path("foo/bar"), "**/**"),
]

NEGATIVE_TEST_CASES: list[tuple[Path, str]] = [
(Path("foo/bar/baz"), "*/*"),
(Path("foo"), "foobar"),
(Path("foo"), "*/foo"),
]


def test_compile() -> None:
for path, glob in POSITIVE_TEST_CASES:
assert glob_compile(glob).match(str(path)) is not None
for path, glob in NEGATIVE_TEST_CASES:
assert glob_compile(glob).match(str(path)) is None


def test_CompiledGroup_matches() -> None:
for path, glob in POSITIVE_TEST_CASES:
group = SnapshotGroup(root_glob=glob)
assert CompiledGroup.compile(group).matches(path)
group = SnapshotGroup(root_glob=glob, excluded_names=[os.path.basename(path)])
assert not CompiledGroup.compile(group).matches(path)
for path, glob in NEGATIVE_TEST_CASES:
group = SnapshotGroup(root_glob=glob)
assert not CompiledGroup.compile(group).matches(path)


def test_CompiledGroups() -> None:
for path, glob in POSITIVE_TEST_CASES:
group1 = SnapshotGroup(root_glob=glob)
group2 = SnapshotGroup(root_glob=glob, excluded_names=[os.path.basename(path)])
group3 = SnapshotGroup(root_glob="doesntmatch")
compiled = CompiledGroups.compile([group1, group2, group3])
assert compiled.any_match(path)
assert compiled.get_matching(path) == [group1]


def test_CompiledGroup_glob(tmp_path: Path) -> None:
for p, _ in POSITIVE_TEST_CASES + NEGATIVE_TEST_CASES:
p = tmp_path / p
p.mkdir(parents=True, exist_ok=True)
p.touch()
for p, glob in POSITIVE_TEST_CASES:
group = SnapshotGroup(root_glob=glob)
assert str(p) in CompiledGroup.compile(group).glob(tmp_path)
for p, glob in NEGATIVE_TEST_CASES:
group = SnapshotGroup(root_glob=glob)
assert str(p) not in CompiledGroup.compile(group).glob(tmp_path)