Skip to content

Commit

Permalink
improve locker (#116)
Browse files Browse the repository at this point in the history
* improve locker

* cleanup

* cleanup

* cleanup

* cleanup

* cleanup

* improve structlog test

* fix env

* use discard
  • Loading branch information
malmans2 authored Mar 27, 2024
1 parent 7484731 commit a52e46f
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 222 deletions.
64 changes: 30 additions & 34 deletions cacholote/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,25 @@ def __init__(self) -> None:
if basename:
self.file_sizes[posixpath.join(urldir, basename)] += size

self.disk_usage = sum(self.file_sizes.values())
self.log_disk_usage()

@property
def disk_usage(self) -> int:
return sum(self.file_sizes.values())
def pop_file_size(self, file: str) -> int:
size = self.file_sizes.pop(file, 0)
self.disk_usage -= size
return size

def log_disk_usage(self) -> None:
self.logger.info("disk usage check", disk_usage=self.disk_usage)
self.logger.info("check disk usage", disk_usage=self.disk_usage)

def stop_cleaning(self, maxsize: int) -> bool:
return self.disk_usage <= maxsize

def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int]:
def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
self.logger.info("getting unknown files")

utcnow = utils.utcnow()
files_to_skip = []
locked_files = set()
for urlpath in self.file_sizes:
if urlpath.endswith(".lock"):
modified = self.fs.modified(urlpath)
Expand All @@ -132,29 +134,27 @@ def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int
if lock_validity_period is None or delta < datetime.timedelta(
seconds=lock_validity_period
):
files_to_skip.append(urlpath)
files_to_skip.append(urlpath.rsplit(".lock", 1)[0])
locked_files.add(urlpath)
locked_files.add(urlpath.rsplit(".lock", 1)[0])

unknown_sizes = {
k: v for k, v in self.file_sizes.items() if k not in files_to_skip
}
if unknown_sizes:
if unknown_files := (set(self.file_sizes) - locked_files):
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
for file in _get_files_from_cache_entry(cache_entry):
unknown_sizes.pop(file, 0)
return unknown_sizes
for known_file in _get_files_from_cache_entry(cache_entry):
unknown_files.discard(known_file)
if not unknown_files:
break
return unknown_files

def delete_unknown_files(
self, lock_validity_period: float | None, recursive: bool
) -> None:
unknown_sizes = self.get_unknown_sizes(lock_validity_period)
for urlpath in unknown_sizes:
self.file_sizes.pop(urlpath, 0)
unknown_files = self.get_unknown_files(lock_validity_period)
for urlpath in unknown_files:
self.pop_file_size(urlpath)
self.remove_files(
list(unknown_sizes),
list(unknown_files),
recursive=recursive,
msg="deleting unknown files",
)
self.log_disk_usage()

Expand Down Expand Up @@ -207,17 +207,13 @@ def remove_files(
self,
files: list[str],
max_tries: int = 10,
msg: str = "deleting cache files",
**kwargs: Any,
) -> None:
assert max_tries >= 1
if not files:
return

if files:
self.logger.info(
msg,
number_of_files=len(files),
recursive=kwargs.get("recursive", False),
)
self.logger.info("deleting files", n_files_to_delete=len(files), **kwargs)

n_tries = 0
while files:
Expand All @@ -226,6 +222,7 @@ def remove_files(
self.fs.rm(files, **kwargs)
return
except FileNotFoundError:
# Another concurrent process might have deleted files
if n_tries >= max_tries:
raise
files = [file for file in files if self.fs.exists(file)]
Expand All @@ -246,18 +243,18 @@ def delete_cache_files(
files_to_delete = []
dirs_to_delete = []
self.logger.info("getting cache entries to delete")
number_of_cache_entries = 0
n_entries_to_delete = 0
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
):
files = _get_files_from_cache_entry(cache_entry)
if files:
number_of_cache_entries += 1
n_entries_to_delete += 1
session.delete(cache_entry)

for file, file_type in files.items():
self.file_sizes.pop(file, 0)
self.pop_file_size(file)
if file_type == "application/vnd+zarr":
dirs_to_delete.append(file)
else:
Expand All @@ -266,10 +263,9 @@ def delete_cache_files(
if self.stop_cleaning(maxsize):
break

if number_of_cache_entries:
if n_entries_to_delete:
self.logger.info(
"deleting cache entries",
number_of_cache_entries=number_of_cache_entries,
"deleting cache entries", n_entries_to_delete=n_entries_to_delete
)
database._commit_or_rollback(session)

Expand All @@ -282,7 +278,7 @@ def delete_cache_files(
(
f"Unable to clean {self.dirname!r}."
f" Final disk usage: {self.disk_usage!r}."
f" Expected disk usage: {maxsize!r}"
f" Target disk usage: {maxsize!r}"
)
)

Expand Down
Loading

0 comments on commit a52e46f

Please sign in to comment.