Skip to content

Commit

Permalink
Fix issue with cache exceeding set size
Browse files Browse the repository at this point in the history
There were two main issues with the current implementation:
  - the size of files was considered to be the number of bytes which is
    not the best for large number of small files which can happen when the
    cache is dealing with artifacts (lots of small ones). The fix is to reason
    based on the block size.
  - there was a possibility where a file could be marked as deletable and would
    then be re-created only to be then deleted again. This fix addresses this
    situation.
  • Loading branch information
romain-intel committed Aug 15, 2022
1 parent 04089a1 commit d4961f9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 34 deletions.
3 changes: 3 additions & 0 deletions services/ui_backend_service/data/cache/client/cache_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ def _error_callback(self, worker, res):
def cli(root=None,
max_actions=None,
max_size=None):
# NOTE: The store will only be accessed by this process. The processes
# in the pool never touch the store itself. This is done in the __init__ and
# terminate methods in Worker which all happen in this process.
store = CacheStore(root, max_size, echo)
Scheduler(store, max_actions).loop()

Expand Down
92 changes: 58 additions & 34 deletions services/ui_backend_service/data/cache/client/cache_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import os
import time
import shutil
Expand Down Expand Up @@ -28,7 +29,7 @@ def makedirs(path):


def key_filename(key):
return hashlib.sha1(key.encode('utf-8')).hexdigest()
return hashlib.sha1(key.encode("utf-8")).hexdigest()


def object_path(root, key):
Expand All @@ -38,7 +39,7 @@ def object_path(root, key):


def stream_path(root, key):
return object_path(root, key) + '.stream'
return object_path(root, key) + ".stream"


def is_safely_readable(path):
Expand All @@ -50,11 +51,17 @@ def is_safely_readable(path):


def filesize(path):
return os.stat(path).st_size
try:
blk_sz = os.statvfs(path).f_bsize
sz = os.stat(path).st_size
if sz == 0:
return blk_sz
return blk_sz * math.ceil(sz / blk_sz)
except Exception:
return None


class CacheStore(object):

def __init__(self, root, max_size, echo, fill_factor=0.8):
self.root = os.path.abspath(root)
self.tmproot = self._init_temp(self.root)
Expand All @@ -79,7 +86,7 @@ def warn(self, ex, msg):
self.echo("IO ERROR: (%s) %s" % (ex, msg))

def _init_temp(self, root):
tmproot = os.path.join(root, 'tmp')
tmproot = os.path.join(root, "tmp")
if os.path.exists(tmproot):
self.safe_fileop(shutil.rmtree, tmproot)
makedirs(tmproot)
Expand All @@ -94,18 +101,23 @@ def _init_gc(self, root):
self.safe_fileop(os.unlink, path)
else:
stat_res = self.safe_fileop(os.stat, path)
if stat_res:
sz = filesize(path)
if stat_res and sz is not None:
_, info = stat_res
if info.st_mtime == TIMESTAMP_FOR_DELETABLE:
self.safe_fileop(os.unlink, path)
# If we can't unlink it, we add it to the gc_queue
# to try at least another time
if not self.safe_fileop(os.unlink, path):
self.gc_queue[path] = (time.time(), sz)
elif info.st_mtime == TIMESTAMP_FOR_DISPOSABLE:
self.disposables_queue[path] = info.st_size
self.disposables_queue[path] = sz
else:
objects.append((info.st_mtime, (path, info.st_size)))
objects.append((info.st_mtime, (path, sz)))

self.objects_queue.update(x for _, x in sorted(objects))
self.total_size = sum(self.disposables_queue.values()) +\
sum(self.objects_queue.values())
self.total_size = sum(self.disposables_queue.values()) + sum(
self.objects_queue.values()
)

# It is possible that the datastore contains more than gc_watermark
# bytes. To ensure that we start below the gc_watermark, we run the GC:
Expand All @@ -115,17 +127,18 @@ def _init_gc(self, root):
# yet, so we can safely delete the marked objects without a quarantine:
self._gc_objects(quarantine=-1)

self.echo("Cache initialized with %d permanents objects, "
"%d disposable objects, totaling %d bytes."
% (len(self.objects_queue),
len(self.disposables_queue),
self.total_size))
self.echo(
"Cache initialized with %d permanents objects, "
"%d disposable objects, totaling %d bytes."
% (len(self.objects_queue), len(self.disposables_queue), self.total_size)
)

def _gc_objects(self, quarantine=GC_MARKER_QUARANTINE):
def mark_for_deletion(path, size):
self.safe_fileop(os.utime, path, (TIMESTAMP_FOR_DELETABLE,
TIMESTAMP_FOR_DELETABLE))
self.gc_queue[path] = (time.time(), size)
if self.safe_fileop(
os.utime, path, (TIMESTAMP_FOR_DELETABLE, TIMESTAMP_FOR_DELETABLE)
):
self.gc_queue[path] = (time.time(), size)

# 1) delete marked objects that are past their quarantine period
limit = time.time() - quarantine
Expand All @@ -135,6 +148,13 @@ def mark_for_deletion(path, size):
if self.safe_fileop(os.unlink, path):
del self.gc_queue[path]
self.total_size -= size
else:
self.warn(
"Could not remove file at '%s' -- removing from GC" % path
)
# We still remove to prevent the garbage collector from
# being stuck a few lines below.
del self.gc_queue[path]
else:
break

Expand Down Expand Up @@ -169,14 +189,15 @@ def ensure_path(self, path):

def open_tempdir(self, token, action_name, stream_key):
self._gc_objects()

if self.total_size > self.max_size:
self.warn(None, "Cache soft limit reached! Used %d bytes, max %s bytes"
% (self.total_size, self.max_size))
self.warn(
None,
"Cache soft limit reached! Used %d bytes, max %s bytes"
% (self.total_size, self.max_size),
)

try:
tmp = tempfile.mkdtemp(prefix='cache_action_%s.' % token,
dir=self.tmproot)
tmp = tempfile.mkdtemp(prefix="cache_action_%s." % token, dir=self.tmproot)
except Exception as ex:
msg = "Could not create a temp directory for request %s" % token
self.warn(ex, msg)
Expand All @@ -187,7 +208,7 @@ def open_tempdir(self, token, action_name, stream_key):
# make sure that the new symlink points at a valid (empty!)
# file by creating a dummy destination file
self.ensure_path(src)
open_res = self.safe_fileop(open, dst, 'w')
open_res = self.safe_fileop(open, dst, "w")
if open_res:
_, f = open_res
f.close()
Expand All @@ -198,8 +219,7 @@ def open_tempdir(self, token, action_name, stream_key):
# simultaneously. We don't consider an existing
# symlink (errno 17) to be an error.
if ex.errno != 17:
err = "Could not create a symlink %s->%s"\
% (src, dst)
err = "Could not create a symlink %s->%s" % (src, dst)
self.warn(ex, err)
except Exception as ex:
self.warn(ex, "Unknown error")
Expand Down Expand Up @@ -227,6 +247,12 @@ def _insert(queue, key, value):
# previous entry first
queue.pop(key, None)
queue[key] = value
# If we are inserting something in disposables_queue or objects_queue,
# we make sure it is no longer in the gc_queue. This can happen if, for
# example, an object is marked as deletable, is therefore not "readable"
# and is therefore re-created.
if key in self.gc_queue:
del self.gc_queue[key]

disposables = frozenset(disposable_keys)
missing = []
Expand All @@ -243,21 +269,19 @@ def _insert(queue, key, value):
if os.path.exists(src):
dst = object_path(self.root, key)
self.ensure_path(dst)
stat_res = self.safe_fileop(os.stat, src)
if stat_res and self.safe_fileop(os.rename, src, dst):
_, info = stat_res
size = info.st_size
sz = filesize(src)
if sz is not None and self.safe_fileop(os.rename, src, dst):
if key in disposables:
# we proceed even if we fail to mark the object as
# disposable. It just means that during a possible
# restart the object is treated as a non-disposable
# object
tstamp = TIMESTAMP_FOR_DISPOSABLE
self.safe_fileop(os.utime, dst, (tstamp, tstamp))
_insert(self.disposables_queue, dst, size)
_insert(self.disposables_queue, dst, sz)
else:
_insert(self.objects_queue, dst, size)
self.total_size += size
_insert(self.objects_queue, dst, sz)
self.total_size += sz
else:
missing.append(key)

Expand Down

0 comments on commit d4961f9

Please sign in to comment.