Skip to content

Commit

Permalink
Fix/cache maxsize + tag override for artifacts/tasks + proxy issue (#327
Browse files Browse the repository at this point in the history
)

* Only take into account first forwarded host/proto

* Fix issue with tag override for artifacts/tasks

* Fix issue with cache exceeding set size

There were two main issues with the current implementation:
  - the size of files was considered to be the number of bytes which is
    not the best for large number of small files which can happen when the
    cache is dealing with artifacts (lots of small ones). The fix is to reason
    based on the block size.
  - there was a possibility where a file could be marked as deletable and would
    then be re-created only to be then deleted again. This fix addresses this
    situation.

* Added an expiration to the pg pools to recycle connections

* no exception, so have to use echo (based on code convention here)

* actually convention seems to be "warn(None, ...)"

Co-authored-by: Jackie Tung <jackie@outerbounds.co>
  • Loading branch information
romain-intel and jackie-ob committed Aug 30, 2022
1 parent 94855d3 commit 9c1d587
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 36 deletions.
1 change: 1 addition & 0 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT
minsize=db_conf.pool_min,
maxsize=db_conf.pool_max,
timeout=db_conf.timeout,
pool_recycle=10 * db_conf.timeout,
echo=AIOPG_ECHO)

for table in self.tables:
Expand Down
4 changes: 2 additions & 2 deletions services/ui_backend_service/api/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ async def get_table_postprocessor(self, table_name):
refiner_postprocess = None
table = None
if table_name == self.db.task_table_postgres.table_name:
table = self.db.task_table_postgres
table = self.db.run_table_postgres
refiner_postprocess = self.task_refiner.postprocess
elif table_name == self.db.artifact_table_postgres.table_name:
table = self.db.artifact_table_postgres
table = self.db.run_table_postgres
refiner_postprocess = self.artifact_refiner.postprocess

if table:
Expand Down
3 changes: 3 additions & 0 deletions services/ui_backend_service/data/cache/client/cache_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ def _error_callback(self, worker, res):
def cli(root=None,
max_actions=None,
max_size=None):
# NOTE: The store will only be accessed by this process. The processes
# in the pool never touch the store itself. This is done in the __init__ and
# terminate methods in Worker which all happen in this process.
store = CacheStore(root, max_size, echo)
Scheduler(store, max_actions).loop()

Expand Down
93 changes: 59 additions & 34 deletions services/ui_backend_service/data/cache/client/cache_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import os
import time
import shutil
Expand Down Expand Up @@ -28,7 +29,7 @@ def makedirs(path):


def key_filename(key):
return hashlib.sha1(key.encode('utf-8')).hexdigest()
return hashlib.sha1(key.encode("utf-8")).hexdigest()


def object_path(root, key):
Expand All @@ -38,7 +39,7 @@ def object_path(root, key):


def stream_path(root, key):
return object_path(root, key) + '.stream'
return object_path(root, key) + ".stream"


def is_safely_readable(path):
Expand All @@ -50,11 +51,17 @@ def is_safely_readable(path):


def filesize(path):
return os.stat(path).st_size
try:
blk_sz = os.statvfs(path).f_bsize
sz = os.stat(path).st_size
if sz == 0:
return blk_sz
return blk_sz * math.ceil(sz / blk_sz)
except Exception:
return None


class CacheStore(object):

def __init__(self, root, max_size, echo, fill_factor=0.8):
self.root = os.path.abspath(root)
self.tmproot = self._init_temp(self.root)
Expand All @@ -79,7 +86,7 @@ def warn(self, ex, msg):
self.echo("IO ERROR: (%s) %s" % (ex, msg))

def _init_temp(self, root):
tmproot = os.path.join(root, 'tmp')
tmproot = os.path.join(root, "tmp")
if os.path.exists(tmproot):
self.safe_fileop(shutil.rmtree, tmproot)
makedirs(tmproot)
Expand All @@ -94,18 +101,23 @@ def _init_gc(self, root):
self.safe_fileop(os.unlink, path)
else:
stat_res = self.safe_fileop(os.stat, path)
if stat_res:
sz = filesize(path)
if stat_res and sz is not None:
_, info = stat_res
if info.st_mtime == TIMESTAMP_FOR_DELETABLE:
self.safe_fileop(os.unlink, path)
# If we can't unlink it, we add it to the gc_queue
# to try at least another time
if not self.safe_fileop(os.unlink, path):
self.gc_queue[path] = (time.time(), sz)
elif info.st_mtime == TIMESTAMP_FOR_DISPOSABLE:
self.disposables_queue[path] = info.st_size
self.disposables_queue[path] = sz
else:
objects.append((info.st_mtime, (path, info.st_size)))
objects.append((info.st_mtime, (path, sz)))

self.objects_queue.update(x for _, x in sorted(objects))
self.total_size = sum(self.disposables_queue.values()) +\
sum(self.objects_queue.values())
self.total_size = sum(self.disposables_queue.values()) + sum(
self.objects_queue.values()
)

# It is possible that the datastore contains more than gc_watermark
# bytes. To ensure that we start below the gc_watermark, we run the GC:
Expand All @@ -115,17 +127,19 @@ def _init_gc(self, root):
# yet, so we can safely delete the marked objects without a quarantine:
self._gc_objects(quarantine=-1)

self.echo("Cache initialized with %d permanents objects, "
"%d disposable objects, totaling %d bytes."
% (len(self.objects_queue),
len(self.disposables_queue),
self.total_size))
self.warn(
None,
"Cache initialized with %d permanents objects, "
"%d disposable objects, totaling %d bytes."
% (len(self.objects_queue), len(self.disposables_queue), self.total_size)
)

def _gc_objects(self, quarantine=GC_MARKER_QUARANTINE):
def mark_for_deletion(path, size):
self.safe_fileop(os.utime, path, (TIMESTAMP_FOR_DELETABLE,
TIMESTAMP_FOR_DELETABLE))
self.gc_queue[path] = (time.time(), size)
if self.safe_fileop(
os.utime, path, (TIMESTAMP_FOR_DELETABLE, TIMESTAMP_FOR_DELETABLE)
):
self.gc_queue[path] = (time.time(), size)

# 1) delete marked objects that are past their quarantine period
limit = time.time() - quarantine
Expand All @@ -135,6 +149,13 @@ def mark_for_deletion(path, size):
if self.safe_fileop(os.unlink, path):
del self.gc_queue[path]
self.total_size -= size
else:
self.echo(
"Could not remove file at '%s' -- removing from GC" % path
)
# We still remove to prevent the garbage collector from
# being stuck a few lines below.
del self.gc_queue[path]
else:
break

Expand Down Expand Up @@ -169,14 +190,15 @@ def ensure_path(self, path):

def open_tempdir(self, token, action_name, stream_key):
self._gc_objects()

if self.total_size > self.max_size:
self.warn(None, "Cache soft limit reached! Used %d bytes, max %s bytes"
% (self.total_size, self.max_size))
self.warn(
None,
"Cache soft limit reached! Used %d bytes, max %s bytes"
% (self.total_size, self.max_size),
)

try:
tmp = tempfile.mkdtemp(prefix='cache_action_%s.' % token,
dir=self.tmproot)
tmp = tempfile.mkdtemp(prefix="cache_action_%s." % token, dir=self.tmproot)
except Exception as ex:
msg = "Could not create a temp directory for request %s" % token
self.warn(ex, msg)
Expand All @@ -187,7 +209,7 @@ def open_tempdir(self, token, action_name, stream_key):
# make sure that the new symlink points at a valid (empty!)
# file by creating a dummy destination file
self.ensure_path(src)
open_res = self.safe_fileop(open, dst, 'w')
open_res = self.safe_fileop(open, dst, "w")
if open_res:
_, f = open_res
f.close()
Expand All @@ -198,8 +220,7 @@ def open_tempdir(self, token, action_name, stream_key):
# simultaneously. We don't consider an existing
# symlink (errno 17) to be an error.
if ex.errno != 17:
err = "Could not create a symlink %s->%s"\
% (src, dst)
err = "Could not create a symlink %s->%s" % (src, dst)
self.warn(ex, err)
except Exception as ex:
self.warn(ex, "Unknown error")
Expand Down Expand Up @@ -227,6 +248,12 @@ def _insert(queue, key, value):
# previous entry first
queue.pop(key, None)
queue[key] = value
# If we are inserting something in disposables_queue or objects_queue,
# we make sure it is no longer in the gc_queue. This can happen if, for
# example, an object is marked as deletable, is therefore not "readable"
# and is therefore re-created.
if key in self.gc_queue:
del self.gc_queue[key]

disposables = frozenset(disposable_keys)
missing = []
Expand All @@ -243,21 +270,19 @@ def _insert(queue, key, value):
if os.path.exists(src):
dst = object_path(self.root, key)
self.ensure_path(dst)
stat_res = self.safe_fileop(os.stat, src)
if stat_res and self.safe_fileop(os.rename, src, dst):
_, info = stat_res
size = info.st_size
sz = filesize(src)
if sz is not None and self.safe_fileop(os.rename, src, dst):
if key in disposables:
# we proceed even if we fail to mark the object as
# disposable. It just means that during a possible
# restart the object is treated as a non-disposable
# object
tstamp = TIMESTAMP_FOR_DISPOSABLE
self.safe_fileop(os.utime, dst, (tstamp, tstamp))
_insert(self.disposables_queue, dst, size)
_insert(self.disposables_queue, dst, sz)
else:
_insert(self.objects_queue, dst, size)
self.total_size += size
_insert(self.objects_queue, dst, sz)
self.total_size += sz
else:
missing.append(key)

Expand Down
3 changes: 3 additions & 0 deletions services/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def format_qs(query: Dict[str, str], overwrite=None):
def format_baseurl(request: web.BaseRequest):
scheme = request.headers.get("X-Forwarded-Proto") or request.scheme
host = request.headers.get("X-Forwarded-Host") or request.host
# Only get the first Forwarded-Host/Proto in case there are more than one
scheme = scheme.split(",")[0].strip()
host = host.split(",")[0].strip()
baseurl = os.environ.get(
"MF_BASEURL", "{scheme}://{host}".format(scheme=scheme, host=host))
return "{baseurl}{path}".format(baseurl=baseurl, path=request.path)
Expand Down

0 comments on commit 9c1d587

Please sign in to comment.