diff --git a/.pfnci/config.pbtxt b/.pfnci/config.pbtxt index 8367be57..d7e3399e 100644 --- a/.pfnci/config.pbtxt +++ b/.pfnci/config.pbtxt @@ -3,13 +3,13 @@ configs{ key: "chainerio" value { requirement { - cpu: 2 - memory: 6 + cpu: 4 + memory: 16 disk: 10 } # https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933 time_limit { - seconds: 900 # 15 minutes + seconds: 1200 # 20 minutes } command: "bash .pfnci/script.sh" @@ -26,7 +26,7 @@ configs{ } # https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933 time_limit { - seconds: 600 # 15 minutes + seconds: 900 # 15 minutes } command: "bash .pfnci/script-old.sh" diff --git a/pfio/cache/sparse_file.py b/pfio/cache/sparse_file.py index 5d23f970..f7152b96 100644 --- a/pfio/cache/sparse_file.py +++ b/pfio/cache/sparse_file.py @@ -64,9 +64,10 @@ class _CachedWrapperBase: ''' def __init__(self, fileobj, size, cachedir=None, close_on_close=False, - multithread_safe=False): + cache_size_limit=None, multithread_safe=False): self.fileobj = fileobj self.cachedir = cachedir + self.cache_size_limit = cache_size_limit self.multithread_safe = multithread_safe if self.multithread_safe: @@ -74,6 +75,7 @@ def __init__(self, fileobj, size, cachedir=None, close_on_close=False, else: self.lock = DummyLock() + self.pos = 0 self.size = size assert size > 0 if cachedir is None: @@ -337,16 +339,23 @@ class CachedWrapper(_CachedWrapperBase): ''' def __init__(self, fileobj, size, cachedir=None, close_on_close=False, - pagesize=16*1024*1024, multithread_safe=False): + pagesize=16*1024*1024, multithread_safe=False, + cache_size_limit=None): super().__init__(fileobj, size, cachedir, close_on_close, + cache_size_limit=cache_size_limit, multithread_safe=multithread_safe) assert pagesize > 0 self.pagesize = pagesize - pagecount = size // pagesize + self.size = size + self._init_ranges() + + def _init_ranges(self): + self.cache_size = 0 + pagecount = self.size // self.pagesize self.ranges = [_Range(i * self.pagesize, self.pagesize, cached=False) for i in range(pagecount)] - remain = size % self.pagesize + remain = self.size % self.pagesize if remain > 0: r = _Range(pagecount*self.pagesize, remain, cached=False) self.ranges.append(r) @@ -357,6 +366,11 @@ def read(self, size=-1) -> bytes: # this lock into per-page locks return self._read(size) + def _is_full(self) -> bool: + if self.cache_size_limit is None: + return False + return self.cache_size >= self.cache_size_limit + def _read(self, size) -> bytes: if self._closed: raise RuntimeError("closed") @@ -366,8 +380,10 @@ def _read(self, size) -> bytes: start = self.pos // self.pagesize end = (self.pos + size) // self.pagesize + if (self.pos + size) % self.pagesize != 0: + end += 1 - for i in range(start, end + 1): + for i in range(start, end): # print('range=', i, "total=", len(self.ranges)) r = self.ranges[i] @@ -388,6 +404,12 @@ def _read(self, size) -> bytes: self.pos %= self.size if self.pos != self.fileobj.tell(): self.fileobj.seek(self.pos, io.SEEK_SET) + + # cache_size_limit is a soft limit + if self._is_full(): + os.truncate(self.cachefp.fileno(), 0) + self._init_ranges() + return buf @@ -431,13 +453,15 @@ class MPCachedWrapper(CachedWrapper): ''' def __init__(self, fileobj, size, cachedir=None, close_on_close=False, + cache_size_limit=None, pagesize=16*1024*1024, local_cachefile=None, local_indexfile=None, - pagesize=16*1024*1024, multithread_safe=False): + multithread_safe=False): super().__init__(fileobj, size, cachedir, close_on_close, + cache_size_limit=cache_size_limit, multithread_safe=multithread_safe) assert pagesize > 0 self.pagesize = pagesize - pagecount = size // pagesize + self.size = size # Both none or both string file assert bool(local_indexfile) == bool(local_cachefile) @@ -450,17 +474,7 @@ def __init__(self, fileobj, size, cachedir=None, close_on_close=False, self.local_indexfile = self.indexfp.name self.local_cachefile = self.cachefp.name self._open_fd(self.indexfp.name, self.cachefp.name) - - for i in range(pagecount): - r = _Range(i * self.pagesize, self.pagesize, cached=False) - self._set_index(i, r) - r = self._get_index(i) - - remain = size % self.pagesize - if remain > 0: - offset = pagecount * self.pagesize - r = _Range(offset, remain, cached=False) - self._set_index(pagecount, r) + self._init_indexfile() else: self.indexfp = open(local_indexfile, 'rb') @@ -486,6 +500,20 @@ def _open_fd(self, indexfile, cachefile): self.indexfd = os.open(self.local_indexfile, os.O_RDWR) self.cachefd = os.open(self.local_cachefile, os.O_RDWR) + def _init_indexfile(self): + pagecount = self.size // self.pagesize + + for i in range(pagecount): + r = _Range(i * self.pagesize, self.pagesize, cached=False) + self._set_index(i, r) + r = self._get_index(i) + + remain = self.size % self.pagesize + if remain > 0: + offset = pagecount * self.pagesize + r = _Range(offset, remain, cached=False) + self._set_index(pagecount, r) + def _get_index(self, i): assert i >= 0 width = _Range.size() @@ -508,8 +536,10 @@ def read(self, size=-1) -> bytes: start = self.pos // self.pagesize end = (self.pos + size) // self.pagesize + if (self.pos + size) % self.pagesize != 0: + end += 1 - for i in range(start, end + 1): + for i in range(start, end): with _shflock(self.indexfd): r = self._get_index(i) @@ -535,4 +565,22 @@ def read(self, size=-1) -> bytes: self.pos %= self.size if self.pos != self.fileobj.tell(): self.fileobj.seek(self.pos, io.SEEK_SET) + + with _shflock(self.indexfd): + is_full = self._is_full() + + # If the cache file is more than the limit, just flush them all. + if is_full: + with _exflock(self.indexfd): + is_full = self._is_full() + if is_full: + os.truncate(self.cachefd, 0) + self._init_indexfile() + return buf + + def _is_full(self): + if self.cache_size_limit is None: + return False + stat = os.stat(self.local_cachefile) + return stat.st_blocks * 512 >= self.cache_size_limit diff --git a/tests/cache_tests/test_sparse_file.py b/tests/cache_tests/test_sparse_file.py index 48ab03bf..0712c992 100644 --- a/tests/cache_tests/test_sparse_file.py +++ b/tests/cache_tests/test_sparse_file.py @@ -5,6 +5,8 @@ import tempfile import zipfile +import pytest + from pfio.cache import MultiprocessSparseFileCache, SparseFileCache from pfio.testing import ZipForTest @@ -37,7 +39,8 @@ def test_sparse_file_cache(): fp.seek(0) -def test_sparse_file_cache2(): +@pytest.mark.parametrize("pagesize", [123, 4095, 9979, 16*1024*1024]) +def test_sparse_file_cache2(pagesize): with tempfile.TemporaryDirectory() as tempdir: filepath = os.path.join(tempdir, "test.zip") @@ -50,7 +53,7 @@ def test_sparse_file_cache2(): size = stat.st_size with open(filepath, 'rb') as xfp, open(filepath, 'rb') as yfp: - with SparseFileCache(xfp, size) as fp: + with SparseFileCache(xfp, size, pagesize=pagesize) as fp: fp.seek(26) # print('seek done:', fp.pos, xfp.tell()) @@ -168,3 +171,45 @@ def test_sparse_cache_zip(): with zfp.open("dir/f", "r") as fp: assert b'bar' == fp.read() + + +@pytest.mark.parametrize("klass", [SparseFileCache, + MultiprocessSparseFileCache]) +@pytest.mark.parametrize("pagesize", + [4095, 9979, 123, 16*1024*1024, 128*1024*1024]) +def test_cache(pagesize, klass): + with tempfile.TemporaryDirectory() as tempdir: + filepath = os.path.join(tempdir, "test16MB") + pb = b'**pocketburger**' + data = pb * 1024 * 1024 + + with open(filepath, 'wb') as fp: + fp.write(data) + + with open(filepath, 'rb') as ofp: + with klass(ofp, len(data), pagesize=pagesize) as fp: + for i in range(1024*16): + assert not fp._is_full() + assert pb*64 == fp.read(1024) + + +@pytest.mark.parametrize("klass", [SparseFileCache, + MultiprocessSparseFileCache]) +@pytest.mark.parametrize("limit", [4096*1024, 1024*1024]) +@pytest.mark.parametrize("pagesize", [4095, 9979]) +def test_cache_limit(pagesize, limit, klass): + with tempfile.TemporaryDirectory() as tempdir: + filepath = os.path.join(tempdir, "test16MB") + pb = b'**pocketburger**' + data = pb * 1024 * 1024 + + with open(filepath, 'wb') as fp: + fp.write(data) + + with open(filepath, 'rb', buffering=0) as ofp: + + with klass(ofp, len(data), pagesize=pagesize, + cache_size_limit=limit) as fp: + for i in range(1024*16): + assert not fp._is_full() + assert pb*64 == fp.read(1024)