Skip to content

Commit

Permalink
cache: added clear_expired_cache (#1055 - #1061) (#1092)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl-marasco authored Nov 4, 2022
1 parent b5ce591 commit 39efd7f
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 0 deletions.
39 changes: 39 additions & 0 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,44 @@ def clear_cache(self):
rmtree(self.storage[-1])
self.load_cache()

def clear_expired_cache(self, expiry_time=None):
"""Remove all expired files and metadata from the cache
In the case of multiple cache locations, this clears only the last one,
which is assumed to be the read/write one.
Parameters
----------
expiry_time: int
The time in seconds after which a local copy is considered useless.
If not defined the default is equivalent to the attribute from the
file caching instantiation.
"""

if not expiry_time:
expiry_time = self.expiry

self._check_cache()

for path, detail in self.cached_files[-1].copy().items():
if time.time() - detail["time"] > expiry_time:
if self.same_names:
basename = os.path.basename(detail["original"])
fn = os.path.join(self.storage[-1], basename)
else:
fn = os.path.join(self.storage[-1], detail["fn"])
if os.path.exists(fn):
os.remove(fn)
self.cached_files[-1].pop(path)

if self.cached_files[-1]:
cache_path = os.path.join(self.storage[-1], "cache")
with open(cache_path, "wb") as fc:
pickle.dump(self.cached_files[-1], fc)
else:
rmtree(self.storage[-1])
self.load_cache()

def pop_from_cache(self, path):
"""Remove cached version of given file
Expand Down Expand Up @@ -389,6 +427,7 @@ def __getattribute__(self, item):
"_check_cache",
"_mkcache",
"clear_cache",
"clear_expired_cache",
"pop_from_cache",
"_mkcache",
"local_file",
Expand Down
103 changes: 103 additions & 0 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,109 @@ def test_clear():
assert len(os.listdir(cache1)) < 2


def test_clear_expired(tmp_path):
def __ager(cache_fn, fn):
"""
Modify the cache file to virtually add time lag to selected files.
Parameters
---------
cache_fn: str
cache path
fn: str
file name to be modified
"""
import pathlib
import time

if os.path.exists(cache_fn):
with open(cache_fn, "rb") as f:
cached_files = pickle.load(f)
fn_posix = pathlib.Path(fn).as_posix()
cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200
assert os.access(cache_fn, os.W_OK), "Cache is not writable"
with open(cache_fn, "wb") as f:
pickle.dump(cached_files, f)
time.sleep(1)

origin = tmp_path.joinpath("origin")
cache1 = tmp_path.joinpath("cache1")
cache2 = tmp_path.joinpath("cache2")
cache3 = tmp_path.joinpath("cache3")

origin.mkdir()
cache1.mkdir()
cache2.mkdir()
cache3.mkdir()

data = b"test data"
f1 = origin.joinpath("afile")
f2 = origin.joinpath("bfile")
f3 = origin.joinpath("cfile")
f4 = origin.joinpath("dfile")

with open(f1, "wb") as f:
f.write(data)
with open(f2, "wb") as f:
f.write(data)
with open(f3, "wb") as f:
f.write(data)
with open(f4, "wb") as f:
f.write(data)

# populates first cache
fs = fsspec.filesystem(
"filecache", target_protocol="file", cache_storage=str(cache1), cache_check=1
)
assert fs.cat(str(f1)) == data

# populates "last" cache if file not found in first one
fs = fsspec.filesystem(
"filecache",
target_protocol="file",
cache_storage=[str(cache1), str(cache2)],
cache_check=1,
)
assert fs.cat(str(f2)) == data
assert fs.cat(str(f3)) == data
assert len(os.listdir(cache2)) == 3

# force the expiration
cache_fn = os.path.join(fs.storage[-1], "cache")
__ager(cache_fn, f2)

# remove from cache2 the expired files
fs.clear_expired_cache()
assert len(os.listdir(cache2)) == 2

# check complete cleanup
__ager(cache_fn, f3)

fs.clear_expired_cache()
assert not fs._check_file(f2)
assert not fs._check_file(f3)
assert len(os.listdir(cache2)) < 2

# check cache1 to be untouched after cleaning
assert len(os.listdir(cache1)) == 2

# check cleaning with 'same_name' option enabled
fs = fsspec.filesystem(
"filecache",
target_protocol="file",
cache_storage=[str(cache1), str(cache2), str(cache3)],
same_names=True,
cache_check=1,
)
assert fs.cat(str(f4)) == data

cache_fn = os.path.join(fs.storage[-1], "cache")
__ager(cache_fn, f4)

fs.clear_expired_cache()
assert not fs._check_file(str(f4))


def test_pop():
import tempfile

Expand Down

0 comments on commit 39efd7f

Please sign in to comment.