From e5b8f909400cdf6b0f469938287ccbbe08732be4 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Mon, 25 Jul 2022 22:48:04 +0900 Subject: [PATCH 1/6] Limit the cache size --- pfio/cache/sparse_file.py | 75 ++++++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/pfio/cache/sparse_file.py b/pfio/cache/sparse_file.py index 5d23f970..a3b5d388 100644 --- a/pfio/cache/sparse_file.py +++ b/pfio/cache/sparse_file.py @@ -64,9 +64,10 @@ class _CachedWrapperBase: ''' def __init__(self, fileobj, size, cachedir=None, close_on_close=False, - multithread_safe=False): + cache_size_limit=None, multithread_safe=False): self.fileobj = fileobj self.cachedir = cachedir + self.cache_size_limit = cache_size_limit self.multithread_safe = multithread_safe if self.multithread_safe: @@ -337,16 +338,23 @@ class CachedWrapper(_CachedWrapperBase): ''' def __init__(self, fileobj, size, cachedir=None, close_on_close=False, - pagesize=16*1024*1024, multithread_safe=False): + pagesize=16*1024*1024, multithread_safe=False, + cache_size_limit=None): super().__init__(fileobj, size, cachedir, close_on_close, + cache_size_limit=cache_size_limit, multithread_safe=multithread_safe) assert pagesize > 0 self.pagesize = pagesize - pagecount = size // pagesize + self.size = size + self._init_ranges() + + def _init_ranges(self): + self.cache_size = 0 + pagecount = self.size // self.pagesize self.ranges = [_Range(i * self.pagesize, self.pagesize, cached=False) for i in range(pagecount)] - remain = size % self.pagesize + remain = self.size % self.pagesize if remain > 0: r = _Range(pagecount*self.pagesize, remain, cached=False) self.ranges.append(r) @@ -357,6 +365,11 @@ def read(self, size=-1) -> bytes: # this lock into per-page locks return self._read(size) + def _is_full(self) -> bool: + if self.cache_size_limit is None: + return False + return self.cache_size >= self.cache_size_limit + def _read(self, size) -> bytes: if self._closed: raise RuntimeError("closed") @@ -388,6 +401,12 @@ def _read(self, size) -> bytes: self.pos %= self.size if self.pos != self.fileobj.tell(): self.fileobj.seek(self.pos, io.SEEK_SET) + + # cache_size_limit is a soft limit + if self._is_full(): + os.truncate(self.cachefp.fileno(), 0) + self._init_ranges() + return buf @@ -431,13 +450,15 @@ class MPCachedWrapper(CachedWrapper): ''' def __init__(self, fileobj, size, cachedir=None, close_on_close=False, + cache_size_limit=None, pagesize=16*1024*1024, local_cachefile=None, local_indexfile=None, - pagesize=16*1024*1024, multithread_safe=False): + multithread_safe=False): super().__init__(fileobj, size, cachedir, close_on_close, + cache_size_limit=cache_size_limit, multithread_safe=multithread_safe) assert pagesize > 0 self.pagesize = pagesize - pagecount = size // pagesize + self.size = size # Both none or both string file assert bool(local_indexfile) == bool(local_cachefile) @@ -450,17 +471,7 @@ def __init__(self, fileobj, size, cachedir=None, close_on_close=False, self.local_indexfile = self.indexfp.name self.local_cachefile = self.cachefp.name self._open_fd(self.indexfp.name, self.cachefp.name) - - for i in range(pagecount): - r = _Range(i * self.pagesize, self.pagesize, cached=False) - self._set_index(i, r) - r = self._get_index(i) - - remain = size % self.pagesize - if remain > 0: - offset = pagecount * self.pagesize - r = _Range(offset, remain, cached=False) - self._set_index(pagecount, r) + self._init_indexfile() else: self.indexfp = open(local_indexfile, 'rb') @@ -486,6 +497,20 @@ def _open_fd(self, indexfile, cachefile): self.indexfd = os.open(self.local_indexfile, os.O_RDWR) self.cachefd = os.open(self.local_cachefile, os.O_RDWR) + def _init_indexfile(self): + pagecount = self.size // self.pagesize + + for i in range(pagecount): + r = _Range(i * self.pagesize, self.pagesize, cached=False) + self._set_index(i, r) + r = self._get_index(i) + + remain = self.size % self.pagesize + if remain > 0: + offset = pagecount * self.pagesize + r = _Range(offset, remain, cached=False) + self._set_index(pagecount, r) + def _get_index(self, i): assert i >= 0 width = _Range.size() @@ -535,4 +560,20 @@ def read(self, size=-1) -> bytes: self.pos %= self.size if self.pos != self.fileobj.tell(): self.fileobj.seek(self.pos, io.SEEK_SET) + + with _shflock(self.indexfd): + is_full = self._is_full() + + # If the cache file is more than the limit, just flush them all. + if is_full: + with _exflock(self.indexfd): + os.truncate(self.cachefd, 0) + self._init_indexfile() + return buf + + def _is_full(self): + if self.cache_size_limit is None: + return False + stat = os.stat(self.local_cachefile) + return stat.st_blocks * 512 >= self.cache_size_limit From 7af66af95e65db6e6a8b819db6f987768ac4fccc Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Mon, 25 Jul 2022 22:51:47 +0900 Subject: [PATCH 2/6] Fix race condition --- pfio/cache/sparse_file.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pfio/cache/sparse_file.py b/pfio/cache/sparse_file.py index a3b5d388..793bb37f 100644 --- a/pfio/cache/sparse_file.py +++ b/pfio/cache/sparse_file.py @@ -567,8 +567,10 @@ def read(self, size=-1) -> bytes: # If the cache file is more than the limit, just flush them all. if is_full: with _exflock(self.indexfd): - os.truncate(self.cachefd, 0) - self._init_indexfile() + is_full = self._is_full() + if is_full: + os.truncate(self.cachefd, 0) + self._init_indexfile() return buf From 25bcec66369939b11ecf9b000c4c02a465a73570 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Tue, 26 Jul 2022 10:32:52 +0900 Subject: [PATCH 3/6] Fix offset bug and add a few tests --- pfio/cache/sparse_file.py | 9 +++-- tests/cache_tests/test_sparse_file.py | 49 +++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/pfio/cache/sparse_file.py b/pfio/cache/sparse_file.py index 793bb37f..f7152b96 100644 --- a/pfio/cache/sparse_file.py +++ b/pfio/cache/sparse_file.py @@ -75,6 +75,7 @@ def __init__(self, fileobj, size, cachedir=None, close_on_close=False, else: self.lock = DummyLock() + self.pos = 0 self.size = size assert size > 0 if cachedir is None: @@ -379,8 +380,10 @@ def _read(self, size) -> bytes: start = self.pos // self.pagesize end = (self.pos + size) // self.pagesize + if (self.pos + size) % self.pagesize != 0: + end += 1 - for i in range(start, end + 1): + for i in range(start, end): # print('range=', i, "total=", len(self.ranges)) r = self.ranges[i] @@ -533,8 +536,10 @@ def read(self, size=-1) -> bytes: start = self.pos // self.pagesize end = (self.pos + size) // self.pagesize + if (self.pos + size) % self.pagesize != 0: + end += 1 - for i in range(start, end + 1): + for i in range(start, end): with _shflock(self.indexfd): r = self._get_index(i) diff --git a/tests/cache_tests/test_sparse_file.py b/tests/cache_tests/test_sparse_file.py index 48ab03bf..0b79c484 100644 --- a/tests/cache_tests/test_sparse_file.py +++ b/tests/cache_tests/test_sparse_file.py @@ -5,6 +5,8 @@ import tempfile import zipfile +import pytest + from pfio.cache import MultiprocessSparseFileCache, SparseFileCache from pfio.testing import ZipForTest @@ -37,7 +39,8 @@ def test_sparse_file_cache(): fp.seek(0) -def test_sparse_file_cache2(): +@pytest.mark.parametrize("pagesize", [123, 4095, 9979, 16*1024*1024]) +def test_sparse_file_cache2(pagesize): with tempfile.TemporaryDirectory() as tempdir: filepath = os.path.join(tempdir, "test.zip") @@ -50,7 +53,7 @@ def test_sparse_file_cache2(): size = stat.st_size with open(filepath, 'rb') as xfp, open(filepath, 'rb') as yfp: - with SparseFileCache(xfp, size) as fp: + with SparseFileCache(xfp, size, pagesize=pagesize) as fp: fp.seek(26) # print('seek done:', fp.pos, xfp.tell()) @@ -168,3 +171,45 @@ def test_sparse_cache_zip(): with zfp.open("dir/f", "r") as fp: assert b'bar' == fp.read() + + +@pytest.mark.parametrize("klass", [SparseFileCache, + MultiprocessSparseFileCache]) +@pytest.mark.parametrize("pagesize", + [4095, 9979, 123, 16*1024*1024, 128*1024*1024]) +def test_cache(pagesize, klass): + with tempfile.TemporaryDirectory() as tempdir: + filepath = os.path.join(tempdir, "test16MB") + pb = b'**pocketburger**' + data = pb * 1024 * 1024 + + with open(filepath, 'wb') as fp: + fp.write(data) + + with open(filepath, 'rb') as ofp: + with klass(ofp, len(data), pagesize=pagesize) as fp: + for i in range(1024*1024): + assert not fp._is_full() + assert pb == fp.read(16) + + +@pytest.mark.parametrize("klass", [SparseFileCache, + MultiprocessSparseFileCache]) +@pytest.mark.parametrize("limit", [4096*1024, 1024*1024]) +@pytest.mark.parametrize("pagesize", [4095, 9979]) +def test_cache_limit(pagesize, limit, klass): + with tempfile.TemporaryDirectory() as tempdir: + filepath = os.path.join(tempdir, "test16MB") + pb = b'**pocketburger**' + data = pb * 1024 * 1024 + + with open(filepath, 'wb') as fp: + fp.write(data) + + with open(filepath, 'rb', buffering=0) as ofp: + + with klass(ofp, len(data), pagesize=pagesize, + cache_size_limit=limit) as fp: + for i in range(1024*1024): + assert not fp._is_full() + assert pb == fp.read(16) From 788b1700da05523d1c5adf9acc7e2748445cc709 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Tue, 26 Jul 2022 10:58:22 +0900 Subject: [PATCH 4/6] Extend CI timeout --- .pfnci/config.pbtxt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pfnci/config.pbtxt b/.pfnci/config.pbtxt index 8367be57..70276d45 100644 --- a/.pfnci/config.pbtxt +++ b/.pfnci/config.pbtxt @@ -9,7 +9,7 @@ configs{ } # https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933 time_limit { - seconds: 900 # 15 minutes + seconds: 1200 # 20 minutes } command: "bash .pfnci/script.sh" @@ -26,7 +26,7 @@ configs{ } # https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933 time_limit { - seconds: 600 # 15 minutes + seconds: 900 # 15 minutes } command: "bash .pfnci/script-old.sh" From 9b57d11bb973655ec3a5be25af7bf4cc183872bb Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Tue, 26 Jul 2022 11:23:52 +0900 Subject: [PATCH 5/6] Increase CI resource --- .pfnci/config.pbtxt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pfnci/config.pbtxt b/.pfnci/config.pbtxt index 70276d45..d7e3399e 100644 --- a/.pfnci/config.pbtxt +++ b/.pfnci/config.pbtxt @@ -3,8 +3,8 @@ configs{ key: "chainerio" value { requirement { - cpu: 2 - memory: 6 + cpu: 4 + memory: 16 disk: 10 } # https://github.pfidev.jp/ci/imosci/blob/master/proto/data.proto#L933 From d9c0f1712305d962d6e4a4577e031e4cdcf2486c Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Tue, 26 Jul 2022 11:57:32 +0900 Subject: [PATCH 6/6] Faster testing --- tests/cache_tests/test_sparse_file.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/cache_tests/test_sparse_file.py b/tests/cache_tests/test_sparse_file.py index 0b79c484..0712c992 100644 --- a/tests/cache_tests/test_sparse_file.py +++ b/tests/cache_tests/test_sparse_file.py @@ -188,9 +188,9 @@ def test_cache(pagesize, klass): with open(filepath, 'rb') as ofp: with klass(ofp, len(data), pagesize=pagesize) as fp: - for i in range(1024*1024): + for i in range(1024*16): assert not fp._is_full() - assert pb == fp.read(16) + assert pb*64 == fp.read(1024) @pytest.mark.parametrize("klass", [SparseFileCache, @@ -210,6 +210,6 @@ def test_cache_limit(pagesize, limit, klass): with klass(ofp, len(data), pagesize=pagesize, cache_size_limit=limit) as fp: - for i in range(1024*1024): + for i in range(1024*16): assert not fp._is_full() - assert pb == fp.read(16) + assert pb*64 == fp.read(1024)