Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the sparse cache size #279

Merged
merged 6 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pfnci/config.pbtxt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ configs{
key: "chainerio"
value {
requirement {
cpu: 2
memory: 6
cpu: 4
memory: 16
disk: 10
}
# https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933
time_limit {
seconds: 900 # 15 minutes
seconds: 1200 # 20 minutes
}
command:
"bash .pfnci/script.sh"
Expand All @@ -26,7 +26,7 @@ configs{
}
# https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933
time_limit {
seconds: 600 # 15 minutes
seconds: 900 # 15 minutes
}
command:
"bash .pfnci/script-old.sh"
Expand Down
86 changes: 67 additions & 19 deletions pfio/cache/sparse_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,18 @@ class _CachedWrapperBase:
'''

def __init__(self, fileobj, size, cachedir=None, close_on_close=False,
multithread_safe=False):
cache_size_limit=None, multithread_safe=False):
self.fileobj = fileobj
self.cachedir = cachedir
self.cache_size_limit = cache_size_limit

self.multithread_safe = multithread_safe
if self.multithread_safe:
self.lock = RWLock()
else:
self.lock = DummyLock()

self.pos = 0
self.size = size
assert size > 0
if cachedir is None:
Expand Down Expand Up @@ -337,16 +339,23 @@ class CachedWrapper(_CachedWrapperBase):
'''

def __init__(self, fileobj, size, cachedir=None, close_on_close=False,
pagesize=16*1024*1024, multithread_safe=False):
pagesize=16*1024*1024, multithread_safe=False,
cache_size_limit=None):
super().__init__(fileobj, size, cachedir, close_on_close,
cache_size_limit=cache_size_limit,
multithread_safe=multithread_safe)
assert pagesize > 0
self.pagesize = pagesize
pagecount = size // pagesize
self.size = size
self._init_ranges()

def _init_ranges(self):
self.cache_size = 0
pagecount = self.size // self.pagesize
self.ranges = [_Range(i * self.pagesize, self.pagesize, cached=False)
for i in range(pagecount)]

remain = size % self.pagesize
remain = self.size % self.pagesize
if remain > 0:
r = _Range(pagecount*self.pagesize, remain, cached=False)
self.ranges.append(r)
Expand All @@ -357,6 +366,11 @@ def read(self, size=-1) -> bytes:
# this lock into per-page locks
return self._read(size)

def _is_full(self) -> bool:
if self.cache_size_limit is None:
return False
return self.cache_size >= self.cache_size_limit

def _read(self, size) -> bytes:
if self._closed:
raise RuntimeError("closed")
Expand All @@ -366,8 +380,10 @@ def _read(self, size) -> bytes:

start = self.pos // self.pagesize
end = (self.pos + size) // self.pagesize
if (self.pos + size) % self.pagesize != 0:
end += 1

for i in range(start, end + 1):
for i in range(start, end):
# print('range=', i, "total=", len(self.ranges))
r = self.ranges[i]

Expand All @@ -388,6 +404,12 @@ def _read(self, size) -> bytes:
self.pos %= self.size
if self.pos != self.fileobj.tell():
self.fileobj.seek(self.pos, io.SEEK_SET)

# cache_size_limit is a soft limit
if self._is_full():
os.truncate(self.cachefp.fileno(), 0)
self._init_ranges()

return buf


Expand Down Expand Up @@ -431,13 +453,15 @@ class MPCachedWrapper(CachedWrapper):
'''

def __init__(self, fileobj, size, cachedir=None, close_on_close=False,
cache_size_limit=None, pagesize=16*1024*1024,
local_cachefile=None, local_indexfile=None,
pagesize=16*1024*1024, multithread_safe=False):
multithread_safe=False):
super().__init__(fileobj, size, cachedir, close_on_close,
cache_size_limit=cache_size_limit,
multithread_safe=multithread_safe)
assert pagesize > 0
self.pagesize = pagesize
pagecount = size // pagesize
self.size = size

# Both none or both string file
assert bool(local_indexfile) == bool(local_cachefile)
Expand All @@ -450,17 +474,7 @@ def __init__(self, fileobj, size, cachedir=None, close_on_close=False,
self.local_indexfile = self.indexfp.name
self.local_cachefile = self.cachefp.name
self._open_fd(self.indexfp.name, self.cachefp.name)

for i in range(pagecount):
r = _Range(i * self.pagesize, self.pagesize, cached=False)
self._set_index(i, r)
r = self._get_index(i)

remain = size % self.pagesize
if remain > 0:
offset = pagecount * self.pagesize
r = _Range(offset, remain, cached=False)
self._set_index(pagecount, r)
self._init_indexfile()

else:
self.indexfp = open(local_indexfile, 'rb')
Expand All @@ -486,6 +500,20 @@ def _open_fd(self, indexfile, cachefile):
self.indexfd = os.open(self.local_indexfile, os.O_RDWR)
self.cachefd = os.open(self.local_cachefile, os.O_RDWR)

def _init_indexfile(self):
pagecount = self.size // self.pagesize

for i in range(pagecount):
r = _Range(i * self.pagesize, self.pagesize, cached=False)
self._set_index(i, r)
r = self._get_index(i)

remain = self.size % self.pagesize
if remain > 0:
offset = pagecount * self.pagesize
r = _Range(offset, remain, cached=False)
self._set_index(pagecount, r)

def _get_index(self, i):
assert i >= 0
width = _Range.size()
Expand All @@ -508,8 +536,10 @@ def read(self, size=-1) -> bytes:

start = self.pos // self.pagesize
end = (self.pos + size) // self.pagesize
if (self.pos + size) % self.pagesize != 0:
end += 1

for i in range(start, end + 1):
for i in range(start, end):
with _shflock(self.indexfd):
r = self._get_index(i)

Expand All @@ -535,4 +565,22 @@ def read(self, size=-1) -> bytes:
self.pos %= self.size
if self.pos != self.fileobj.tell():
self.fileobj.seek(self.pos, io.SEEK_SET)

with _shflock(self.indexfd):
is_full = self._is_full()

# If the cache file is more than the limit, just flush them all.
if is_full:
with _exflock(self.indexfd):
is_full = self._is_full()
if is_full:
os.truncate(self.cachefd, 0)
self._init_indexfile()

return buf

def _is_full(self):
if self.cache_size_limit is None:
return False
stat = os.stat(self.local_cachefile)
return stat.st_blocks * 512 >= self.cache_size_limit
49 changes: 47 additions & 2 deletions tests/cache_tests/test_sparse_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import tempfile
import zipfile

import pytest

from pfio.cache import MultiprocessSparseFileCache, SparseFileCache
from pfio.testing import ZipForTest

Expand Down Expand Up @@ -37,7 +39,8 @@ def test_sparse_file_cache():
fp.seek(0)


def test_sparse_file_cache2():
@pytest.mark.parametrize("pagesize", [123, 4095, 9979, 16*1024*1024])
def test_sparse_file_cache2(pagesize):

with tempfile.TemporaryDirectory() as tempdir:
filepath = os.path.join(tempdir, "test.zip")
Expand All @@ -50,7 +53,7 @@ def test_sparse_file_cache2():
size = stat.st_size
with open(filepath, 'rb') as xfp, open(filepath, 'rb') as yfp:

with SparseFileCache(xfp, size) as fp:
with SparseFileCache(xfp, size, pagesize=pagesize) as fp:

fp.seek(26)
# print('seek done:', fp.pos, xfp.tell())
Expand Down Expand Up @@ -168,3 +171,45 @@ def test_sparse_cache_zip():

with zfp.open("dir/f", "r") as fp:
assert b'bar' == fp.read()


@pytest.mark.parametrize("klass", [SparseFileCache,
MultiprocessSparseFileCache])
@pytest.mark.parametrize("pagesize",
[4095, 9979, 123, 16*1024*1024, 128*1024*1024])
def test_cache(pagesize, klass):
with tempfile.TemporaryDirectory() as tempdir:
filepath = os.path.join(tempdir, "test16MB")
pb = b'**pocketburger**'
data = pb * 1024 * 1024

with open(filepath, 'wb') as fp:
fp.write(data)

with open(filepath, 'rb') as ofp:
with klass(ofp, len(data), pagesize=pagesize) as fp:
for i in range(1024*16):
assert not fp._is_full()
assert pb*64 == fp.read(1024)


@pytest.mark.parametrize("klass", [SparseFileCache,
MultiprocessSparseFileCache])
@pytest.mark.parametrize("limit", [4096*1024, 1024*1024])
@pytest.mark.parametrize("pagesize", [4095, 9979])
def test_cache_limit(pagesize, limit, klass):
with tempfile.TemporaryDirectory() as tempdir:
filepath = os.path.join(tempdir, "test16MB")
pb = b'**pocketburger**'
data = pb * 1024 * 1024

with open(filepath, 'wb') as fp:
fp.write(data)

with open(filepath, 'rb', buffering=0) as ofp:

with klass(ofp, len(data), pagesize=pagesize,
cache_size_limit=limit) as fp:
for i in range(1024*16):
assert not fp._is_full()
assert pb*64 == fp.read(1024)