diff --git a/cacholote/clean.py b/cacholote/clean.py index 362de93..a04a6d0 100644 --- a/cacholote/clean.py +++ b/cacholote/clean.py @@ -106,23 +106,25 @@ def __init__(self) -> None: if basename: self.file_sizes[posixpath.join(urldir, basename)] += size + self.disk_usage = sum(self.file_sizes.values()) self.log_disk_usage() - @property - def disk_usage(self) -> int: - return sum(self.file_sizes.values()) + def pop_file_size(self, file: str) -> int: + size = self.file_sizes.pop(file, 0) + self.disk_usage -= size + return size def log_disk_usage(self) -> None: - self.logger.info("disk usage check", disk_usage=self.disk_usage) + self.logger.info("check disk usage", disk_usage=self.disk_usage) def stop_cleaning(self, maxsize: int) -> bool: return self.disk_usage <= maxsize - def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int]: + def get_unknown_files(self, lock_validity_period: float | None) -> set[str]: self.logger.info("getting unknown files") utcnow = utils.utcnow() - files_to_skip = [] + locked_files = set() for urlpath in self.file_sizes: if urlpath.endswith(".lock"): modified = self.fs.modified(urlpath) @@ -132,29 +134,27 @@ def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int if lock_validity_period is None or delta < datetime.timedelta( seconds=lock_validity_period ): - files_to_skip.append(urlpath) - files_to_skip.append(urlpath.rsplit(".lock", 1)[0]) + locked_files.add(urlpath) + locked_files.add(urlpath.rsplit(".lock", 1)[0]) - unknown_sizes = { - k: v for k, v in self.file_sizes.items() if k not in files_to_skip - } - if unknown_sizes: + if unknown_files := (set(self.file_sizes) - locked_files): with config.get().instantiated_sessionmaker() as session: for cache_entry in session.scalars(sa.select(database.CacheEntry)): - for file in _get_files_from_cache_entry(cache_entry): - unknown_sizes.pop(file, 0) - return unknown_sizes + for known_file in _get_files_from_cache_entry(cache_entry): + unknown_files.discard(known_file) + if not unknown_files: + break + return unknown_files def delete_unknown_files( self, lock_validity_period: float | None, recursive: bool ) -> None: - unknown_sizes = self.get_unknown_sizes(lock_validity_period) - for urlpath in unknown_sizes: - self.file_sizes.pop(urlpath, 0) + unknown_files = self.get_unknown_files(lock_validity_period) + for urlpath in unknown_files: + self.pop_file_size(urlpath) self.remove_files( - list(unknown_sizes), + list(unknown_files), recursive=recursive, - msg="deleting unknown files", ) self.log_disk_usage() @@ -207,17 +207,13 @@ def remove_files( self, files: list[str], max_tries: int = 10, - msg: str = "deleting cache files", **kwargs: Any, ) -> None: assert max_tries >= 1 + if not files: + return - if files: - self.logger.info( - msg, - number_of_files=len(files), - recursive=kwargs.get("recursive", False), - ) + self.logger.info("deleting files", n_files_to_delete=len(files), **kwargs) n_tries = 0 while files: @@ -226,6 +222,7 @@ def remove_files( self.fs.rm(files, **kwargs) return except FileNotFoundError: + # Another concurrent process might have deleted files if n_tries >= max_tries: raise files = [file for file in files if self.fs.exists(file)] @@ -246,18 +243,18 @@ def delete_cache_files( files_to_delete = [] dirs_to_delete = [] self.logger.info("getting cache entries to delete") - number_of_cache_entries = 0 + n_entries_to_delete = 0 with config.get().instantiated_sessionmaker() as session: for cache_entry in session.scalars( sa.select(database.CacheEntry).filter(*filters).order_by(*sorters) ): files = _get_files_from_cache_entry(cache_entry) if files: - number_of_cache_entries += 1 + n_entries_to_delete += 1 session.delete(cache_entry) for file, file_type in files.items(): - self.file_sizes.pop(file, 0) + self.pop_file_size(file) if file_type == "application/vnd+zarr": dirs_to_delete.append(file) else: @@ -266,10 +263,9 @@ def delete_cache_files( if self.stop_cleaning(maxsize): break - if number_of_cache_entries: + if n_entries_to_delete: self.logger.info( - "deleting cache entries", - number_of_cache_entries=number_of_cache_entries, + "deleting cache entries", n_entries_to_delete=n_entries_to_delete ) database._commit_or_rollback(session) @@ -282,7 +278,7 @@ def delete_cache_files( ( f"Unable to clean {self.dirname!r}." f" Final disk usage: {self.disk_usage!r}." - f" Expected disk usage: {maxsize!r}" + f" Target disk usage: {maxsize!r}" ) ) diff --git a/cacholote/extra_encoders.py b/cacholote/extra_encoders.py index b188d33..8a4a47d 100644 --- a/cacholote/extra_encoders.py +++ b/cacholote/extra_encoders.py @@ -85,9 +85,8 @@ def _guess_type( local_path: str, default: str = "application/octet-stream", ) -> str: - content_type: str = fs.info(local_path).get("ContentType", "") - if content_type: - return content_type + if content_type := fs.info(local_path).get("ContentType", ""): + return str(content_type) filetype, *_ = mimetypes.guess_type(local_path, strict=False) if filetype is None and _HAS_MAGIC: @@ -132,8 +131,9 @@ class FileInfoModel(pydantic.BaseModel): def _dictify_file(fs: fsspec.AbstractFileSystem, local_path: str) -> dict[str, Any]: + settings = config.get() href = posixpath.join( - config.get().cache_files_urlpath_readonly or config.get().cache_files_urlpath, + settings.cache_files_urlpath_readonly or settings.cache_files_urlpath, posixpath.basename(local_path), ) file_dict = { @@ -143,7 +143,6 @@ def _dictify_file(fs: fsspec.AbstractFileSystem, local_path: str) -> dict[str, A "file:size": fs.size(local_path), "file:local_path": local_path, } - return FileInfoModel(**file_dict).model_dump(by_alias=True) @@ -152,9 +151,10 @@ def _get_fs_and_urlpath( storage_options: dict[str, Any] | None = None, validate: bool = False, ) -> tuple[fsspec.AbstractFileSystem, str]: + settings = config.get() urlpath = file_json["file:local_path"] if storage_options is None: - storage_options = config.get().cache_files_storage_options + storage_options = settings.cache_files_storage_options if not validate: fs, *_ = fsspec.get_fs_token_paths(urlpath, storage_options=storage_options) @@ -175,7 +175,7 @@ def _get_fs_and_urlpath( ) if expected != actual: raise ValueError(f"checksum mismatch: {urlpath=} {expected=} {actual=}") - config.get().logger.info( + settings.logger.info( "retrieve cache file", urlpath=fs.unstrip_protocol(urlpath) ) return (fs, urlpath) @@ -262,76 +262,78 @@ def decode_io_object( @_requires_xarray_and_dask -def _maybe_store_xr_object( +def _store_xr_object( obj: xr.Dataset | xr.DataArray, fs: fsspec.AbstractFileSystem, urlpath: str, filetype: str, ) -> None: if filetype == "application/vnd+zarr": - with utils.FileLock( - fs, urlpath, timeout=config.get().lock_timeout - ) as file_exists: - if not file_exists: - # Write directly on any filesystem - mapper = fs.get_mapper(urlpath) - with _logging_timer("upload", urlpath=fs.unstrip_protocol(urlpath)): - obj.to_zarr(mapper, consolidated=True) - elif not fs.exists(urlpath): - # Need a tmp local copy to write on a different filesystem - with tempfile.TemporaryDirectory() as tmpdirname: - tmpfilename = str(pathlib.Path(tmpdirname) / pathlib.Path(urlpath).name) - - with _logging_timer("write tmp file", urlpath=tmpfilename): - if filetype == "application/netcdf": - obj.to_netcdf(tmpfilename) - elif filetype == "application/x-grib": - import cfgrib.xarray_to_grib - - cfgrib.xarray_to_grib.to_grib(obj, tmpfilename) - else: - # Should never get here! xarray_cache_type is checked in config.py - raise ValueError(f"type {filetype!r} is NOT supported.") - - _maybe_store_file_object( - fs if _filesystem_is_local(fs) else fsspec.filesystem("file"), - tmpfilename, - fs, - urlpath, - io_delete_original=True, - ) + # Write directly on any filesystem + mapper = fs.get_mapper(urlpath) + with _logging_timer("upload", urlpath=fs.unstrip_protocol(urlpath)): + obj.to_zarr(mapper, consolidated=True) + return + + # Need a tmp local copy to write on a different filesystem + with tempfile.TemporaryDirectory() as tmpdirname: + tmpfilename = str(pathlib.Path(tmpdirname) / pathlib.Path(urlpath).name) + + with _logging_timer("write tmp file", urlpath=tmpfilename): + if filetype == "application/netcdf": + obj.to_netcdf(tmpfilename) + elif filetype == "application/x-grib": + import cfgrib.xarray_to_grib + + cfgrib.xarray_to_grib.to_grib(obj, tmpfilename) + else: + # Should never get here! xarray_cache_type is checked in config.py + raise ValueError(f"type {filetype!r} is NOT supported.") + + _store_file_object( + fs if _filesystem_is_local(fs) else fsspec.filesystem("file"), + tmpfilename, + fs, + urlpath, + io_delete_original=True, + ) @_requires_xarray_and_dask def dictify_xr_object(obj: xr.Dataset | xr.DataArray) -> dict[str, Any]: """Encode a ``xr.Dataset`` to JSON deserialized data (``dict``).""" + settings = config.get() with dask.config.set({"tokenize.ensure-deterministic": True}): root = dask.base.tokenize(obj) - filetype = config.get().xarray_cache_type - ext = mimetypes.guess_extension(filetype, strict=False) - urlpath_out = posixpath.join(config.get().cache_files_urlpath, f"{root}{ext}") + ext = mimetypes.guess_extension(settings.xarray_cache_type, strict=False) + urlpath_out = posixpath.join(settings.cache_files_urlpath, f"{root}{ext}") fs_out, *_ = fsspec.get_fs_token_paths( urlpath_out, - storage_options=config.get().cache_files_storage_options, + storage_options=settings.cache_files_storage_options, ) - _maybe_store_xr_object(obj, fs_out, urlpath_out, filetype) + with utils.FileLock( + fs_out, urlpath_out, timeout=settings.lock_timeout + ) as file_exists: + if not file_exists: + _store_xr_object(obj, fs_out, urlpath_out, settings.xarray_cache_type) - file_json = _dictify_file(fs_out, urlpath_out) - kwargs: dict[str, Any] = {"chunks": {}} - if filetype == "application/vnd+zarr": - kwargs.update({"engine": "zarr", "consolidated": True}) + file_json = _dictify_file(fs_out, urlpath_out) - return encode.dictify_python_call( - decode_xr_dataset if isinstance(obj, xr.Dataset) else decode_xr_dataarray, - file_json, - storage_options=config.get().cache_files_storage_options, - **kwargs, - ) + kwargs: dict[str, Any] = {"chunks": {}} + if settings.xarray_cache_type == "application/vnd+zarr": + kwargs.update({"engine": "zarr", "consolidated": True}) + + return encode.dictify_python_call( + decode_xr_dataset if isinstance(obj, xr.Dataset) else decode_xr_dataarray, + file_json, + storage_options=settings.cache_files_storage_options, + **kwargs, + ) -def _maybe_store_file_object( +def _store_file_object( fs_in: fsspec.AbstractFileSystem, urlpath_in: str, fs_out: fsspec.AbstractFileSystem, @@ -340,30 +342,25 @@ def _maybe_store_file_object( ) -> None: if io_delete_original is None: io_delete_original = config.get().io_delete_original - with utils.FileLock( - fs_out, urlpath_out, timeout=config.get().lock_timeout - ) as file_exists: - if not file_exists: - kwargs = {} - content_type = _guess_type(fs_in, urlpath_in) - if content_type: - kwargs["ContentType"] = content_type - with _logging_timer( - "upload", - urlpath=fs_out.unstrip_protocol(urlpath_out), - size=fs_in.size(urlpath_in), - ): - if fs_in == fs_out: - if io_delete_original: - fs_in.mv(urlpath_in, urlpath_out, **kwargs) - else: - fs_in.cp(urlpath_in, urlpath_out, **kwargs) - elif _filesystem_is_local(fs_in): - fs_out.put(urlpath_in, urlpath_out, **kwargs) - else: - with fs_in.open(urlpath_in, "rb") as f_in: - with fs_out.open(urlpath_out, "wb") as f_out: - utils.copy_buffered_file(f_in, f_out) + + kwargs = {} + if content_type := _guess_type(fs_in, urlpath_in): + kwargs["ContentType"] = content_type + with _logging_timer( + "upload", + urlpath=fs_out.unstrip_protocol(urlpath_out), + size=fs_in.size(urlpath_in), + ): + if fs_in == fs_out: + func = fs_in.mv if io_delete_original else fs_in.cp + func(urlpath_in, urlpath_out, **kwargs) + elif _filesystem_is_local(fs_in): + fs_out.put(urlpath_in, urlpath_out, **kwargs) + else: + with fs_in.open(urlpath_in, "rb") as f_in: + with fs_out.open(urlpath_out, "wb") as f_out: + utils.copy_buffered_file(f_in, f_out) + if io_delete_original and fs_in.exists(urlpath_in): with _logging_timer( "remove", @@ -373,50 +370,55 @@ def _maybe_store_file_object( fs_in.rm(urlpath_in) -def _maybe_store_io_object( +def _store_io_object( f_in: _UNION_IO_TYPES, fs_out: fsspec.AbstractFileSystem, urlpath_out: str, ) -> None: - with utils.FileLock( - fs_out, urlpath_out, timeout=config.get().lock_timeout - ) as file_exists: - if not file_exists: - f_out = fs_out.open(urlpath_out, "wb") - with _logging_timer("upload", urlpath=fs_out.unstrip_protocol(urlpath_out)): - utils.copy_buffered_file(f_in, f_out) + f_out = fs_out.open(urlpath_out, "wb") + with _logging_timer("upload", urlpath=fs_out.unstrip_protocol(urlpath_out)): + utils.copy_buffered_file(f_in, f_out) def dictify_io_object(obj: _UNION_IO_TYPES) -> dict[str, Any]: """Encode a file object to JSON deserialized data (``dict``).""" - cache_files_urlpath = config.get().cache_files_urlpath + settings = config.get() + + cache_files_urlpath = settings.cache_files_urlpath fs_out, *_ = fsspec.get_fs_token_paths( cache_files_urlpath, - storage_options=config.get().cache_files_storage_options, + storage_options=settings.cache_files_storage_options, ) - if hasattr(obj, "path") or hasattr(obj, "name"): - urlpath_in = obj.path if hasattr(obj, "path") else obj.name # type: ignore[union-attr] + if urlpath_in := getattr(obj, "path", getattr(obj, "name", "")): fs_in = getattr(obj, "fs", fsspec.filesystem("file")) root = f"{fs_in.checksum(urlpath_in):x}" ext = pathlib.Path(urlpath_in).suffix urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}") - _maybe_store_file_object(fs_in, urlpath_in, fs_out, urlpath_out) else: root = hashlib.md5(f"{hash(obj)}".encode()).hexdigest() # fsspec uses md5 urlpath_out = posixpath.join(cache_files_urlpath, root) - _maybe_store_io_object(obj, fs_out, urlpath_out) - - file_json = _dictify_file(fs_out, urlpath_out) - params = inspect.signature(open).parameters - kwargs = {k: getattr(obj, k) for k in params.keys() if hasattr(obj, k)} - return encode.dictify_python_call( - decode_io_object, - file_json, - storage_options=config.get().cache_files_storage_options, - **kwargs, - ) + with utils.FileLock( + fs_out, urlpath_out, timeout=settings.lock_timeout + ) as file_exists: + if not file_exists: + if urlpath_in: + _store_file_object(fs_in, urlpath_in, fs_out, urlpath_out) + else: + _store_io_object(obj, fs_out, urlpath_out) + + file_json = _dictify_file(fs_out, urlpath_out) + + params = inspect.signature(open).parameters + kwargs = {k: getattr(obj, k) for k in params.keys() if hasattr(obj, k)} + + return encode.dictify_python_call( + decode_io_object, + file_json, + storage_options=settings.cache_files_storage_options, + **kwargs, + ) def register_all() -> None: diff --git a/ci/environment-ci.yml b/ci/environment-ci.yml index 3f58f47..1f02127 100644 --- a/ci/environment-ci.yml +++ b/ci/environment-ci.yml @@ -31,3 +31,5 @@ dependencies: - types-requests - xarray>=2022.6.0 - zarr +- pip: + - pytest-structlog diff --git a/ci/environment-integration.yml b/ci/environment-integration.yml index 4f89df5..c77c667 100644 --- a/ci/environment-integration.yml +++ b/ci/environment-integration.yml @@ -9,6 +9,7 @@ dependencies: # DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW - aiohttp - moto +- pip - postgresql - psycopg - psycopg2 @@ -16,3 +17,5 @@ dependencies: - pytest-postgresql - requests - s3fs +- pip: + - pytest-structlog diff --git a/tests/test_40_xarray_encoder.py b/tests/test_40_xarray_encoder.py index d763e6a..6cda718 100644 --- a/tests/test_40_xarray_encoder.py +++ b/tests/test_40_xarray_encoder.py @@ -5,6 +5,7 @@ import fsspec import pytest +import pytest_structlog import structlog from cacholote import cache, config, decode, encode, extra_encoders, utils @@ -163,37 +164,48 @@ def test_xr_corrupted_files( assert fs.exists(cached_path) -def test_xr_logging(capsys: pytest.CaptureFixture[str]) -> None: +def test_xr_logging(log: pytest_structlog.StructuredLogCapture) -> None: config.set(logger=structlog.get_logger(), raise_all_encoding_errors=True) # Cache dataset cfunc = cache.cacheable(get_grib_ds) cached_ds = cfunc() - captured = iter(capsys.readouterr().out.splitlines()) - - line = next(captured) - assert "start write tmp file" in line - assert "urlpath=" in line - - line = next(captured) - assert "end write tmp file" in line - assert "urlpath=" in line - assert "write_tmp_file_time=" in line - - line = next(captured) - assert "start upload" in line - assert f"urlpath=file://{cached_ds.encoding['source']}" in line - assert "size=22597" in line - - line = next(captured) - assert "end upload" in line - assert f"urlpath=file://{cached_ds.encoding['source']}" in line - assert "upload_time=" in line - assert "size=22597" in line - - line = next(captured) - assert "retrieve cache file" in line - assert f"urlpath=file://{cached_ds.encoding['source']}" in line + urlpath = f"file://{cached_ds.encoding['source']}" + tmpfile = log.events[0]["urlpath"] + assert urlpath.rsplit("/", 1)[1] == tmpfile.rsplit("/", 1)[1] + + expected = [ + { + "urlpath": tmpfile, + "event": "start write tmp file", + "level": "info", + }, + { + "urlpath": tmpfile, + "write_tmp_file_time": log.events[1]["write_tmp_file_time"], + "event": "end write tmp file", + "level": "info", + }, + { + "urlpath": urlpath, + "size": 22597, + "event": "start upload", + "level": "info", + }, + { + "urlpath": urlpath, + "size": 22597, + "upload_time": log.events[3]["upload_time"], + "event": "end upload", + "level": "info", + }, + { + "urlpath": urlpath, + "event": "retrieve cache file", + "level": "info", + }, + ] + assert log.events == expected @pytest.mark.parametrize( diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index 8db6712..3d14981 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -11,6 +11,7 @@ import fsspec import pytest import pytest_httpserver +import pytest_structlog import structlog from cacholote import cache, config, decode, encode, extra_encoders, utils @@ -187,37 +188,49 @@ def test_content_type(tmp_path: pathlib.Path, set_cache: str) -> None: @pytest.mark.parametrize("set_cache", ["cads"], indirect=True) -def test_io_logging(capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path) -> None: +def test_io_logging( + log: pytest_structlog.StructuredLogCapture, tmp_path: pathlib.Path +) -> None: config.set(logger=structlog.get_logger(), io_delete_original=True) # Cache file tmpfile = tmp_path / "test.txt" fsspec.filesystem("file").touch(tmpfile) cached_file = cached_open(tmpfile) - captured = iter(capsys.readouterr().out.splitlines()) - - line = next(captured) - assert "start upload" in line - assert f"urlpath=s3://{cached_file.path}" in line - assert "size=0" in line - - line = next(captured) - assert "end upload" in line - assert f"urlpath=s3://{cached_file.path}" in line - assert "upload_time=" in line - assert "size=0" in line - - line = next(captured) - assert "start remove" in line - assert f"urlpath=file://{tmpfile}" in line - assert "size=0" in line - - line = next(captured) - assert "end remove" in line - assert f"urlpath=file://{tmpfile}" in line - assert "remove_time=" in line - assert "size=0" in line - - line = next(captured) - assert "retrieve cache file" in line - assert f"urlpath=s3://{cached_file.path}" in line + + tmp_urlpath = f"file://{tmpfile!s}" + cached_urlpath = f"s3://{cached_file.path}" + expected = [ + { + "urlpath": cached_urlpath, + "size": 0, + "event": "start upload", + "level": "info", + }, + { + "urlpath": cached_urlpath, + "size": 0, + "upload_time": log.events[1]["upload_time"], + "event": "end upload", + "level": "info", + }, + { + "urlpath": tmp_urlpath, + "size": 0, + "event": "start remove", + "level": "info", + }, + { + "urlpath": tmp_urlpath, + "size": 0, + "remove_time": log.events[3]["remove_time"], + "event": "end remove", + "level": "info", + }, + { + "urlpath": cached_urlpath, + "event": "retrieve cache file", + "level": "info", + }, + ] + assert log.events == expected diff --git a/tests/test_60_clean.py b/tests/test_60_clean.py index 2bdfa65..d8f2bf7 100644 --- a/tests/test_60_clean.py +++ b/tests/test_60_clean.py @@ -10,6 +10,7 @@ import fsspec import pydantic import pytest +import pytest_structlog import structlog from cacholote import cache, clean, config, utils @@ -253,7 +254,7 @@ def test_clean_invalid_cache_entries( def test_cleaner_logging( - capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path + log: pytest_structlog.StructuredLogCapture, tmp_path: pathlib.Path ) -> None: # Cache file and create unknown tmpfile = tmp_path / "test.txt" @@ -265,37 +266,28 @@ def test_cleaner_logging( # Clean config.set(logger=structlog.get_logger()) clean.clean_cache_files(0, delete_unknown_files=True) - captured = iter(capsys.readouterr().out.splitlines()) - sep = " " * 15 - line = next(captured) - assert "getting disk usage" in line - - line = next(captured) - assert line.endswith(f"disk usage check{sep}disk_usage=2") - - line = next(captured) - assert "getting unknown files" in line - - line = next(captured) - line.endswith(f"deleting unknown files{sep}number_of_files=1{sep}recursive=False") - - line = next(captured) - line.endswith(f"disk usage check{sep}disk_usage=1") - - line = next(captured) - assert "getting cache entries to delete" in line - - line = next(captured) - line.endswith(f"deleting cache entries{sep}number_of_cache_entries=1") - - line = next(captured) - line.endswith(f"deleting cache files{sep}number_of_files=1{sep}recursive=False") - - line = next(captured) - line.endswith(f"disk usage check{sep}disk_usage=0") - - assert next(captured, None) is None + assert log.events == [ + {"event": "getting disk usage", "level": "info"}, + {"disk_usage": 2, "event": "check disk usage", "level": "info"}, + {"event": "getting unknown files", "level": "info"}, + { + "n_files_to_delete": 1, + "recursive": False, + "event": "deleting files", + "level": "info", + }, + {"disk_usage": 1, "event": "check disk usage", "level": "info"}, + {"event": "getting cache entries to delete", "level": "info"}, + {"n_entries_to_delete": 1, "event": "deleting cache entries", "level": "info"}, + { + "n_files_to_delete": 1, + "recursive": False, + "event": "deleting files", + "level": "info", + }, + {"disk_usage": 0, "event": "check disk usage", "level": "info"}, + ] def test_clean_multiple_files(tmp_path: pathlib.Path) -> None: