diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index faa88ef2..a88146c6 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -80,6 +80,7 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT minsize=db_conf.pool_min, maxsize=db_conf.pool_max, timeout=db_conf.timeout, + pool_recycle=10 * db_conf.timeout, echo=AIOPG_ECHO) for table in self.tables: diff --git a/services/ui_backend_service/api/ws.py b/services/ui_backend_service/api/ws.py index e0173fce..7813f7f3 100644 --- a/services/ui_backend_service/api/ws.py +++ b/services/ui_backend_service/api/ws.py @@ -213,10 +213,10 @@ async def get_table_postprocessor(self, table_name): refiner_postprocess = None table = None if table_name == self.db.task_table_postgres.table_name: - table = self.db.task_table_postgres + table = self.db.run_table_postgres refiner_postprocess = self.task_refiner.postprocess elif table_name == self.db.artifact_table_postgres.table_name: - table = self.db.artifact_table_postgres + table = self.db.run_table_postgres refiner_postprocess = self.artifact_refiner.postprocess if table: diff --git a/services/ui_backend_service/data/cache/client/cache_server.py b/services/ui_backend_service/data/cache/client/cache_server.py index c881ab5a..a06f897b 100644 --- a/services/ui_backend_service/data/cache/client/cache_server.py +++ b/services/ui_backend_service/data/cache/client/cache_server.py @@ -294,6 +294,9 @@ def _error_callback(self, worker, res): def cli(root=None, max_actions=None, max_size=None): + # NOTE: The store will only be accessed by this process. The processes + # in the pool never touch the store itself. This is done in the __init__ and + # terminate methods in Worker which all happen in this process. store = CacheStore(root, max_size, echo) Scheduler(store, max_actions).loop() diff --git a/services/ui_backend_service/data/cache/client/cache_store.py b/services/ui_backend_service/data/cache/client/cache_store.py index 7eb85066..dd73fbf1 100644 --- a/services/ui_backend_service/data/cache/client/cache_store.py +++ b/services/ui_backend_service/data/cache/client/cache_store.py @@ -1,3 +1,4 @@ +import math import os import time import shutil @@ -28,7 +29,7 @@ def makedirs(path): def key_filename(key): - return hashlib.sha1(key.encode('utf-8')).hexdigest() + return hashlib.sha1(key.encode("utf-8")).hexdigest() def object_path(root, key): @@ -38,7 +39,7 @@ def object_path(root, key): def stream_path(root, key): - return object_path(root, key) + '.stream' + return object_path(root, key) + ".stream" def is_safely_readable(path): @@ -50,11 +51,17 @@ def is_safely_readable(path): def filesize(path): - return os.stat(path).st_size + try: + blk_sz = os.statvfs(path).f_bsize + sz = os.stat(path).st_size + if sz == 0: + return blk_sz + return blk_sz * math.ceil(sz / blk_sz) + except Exception: + return None class CacheStore(object): - def __init__(self, root, max_size, echo, fill_factor=0.8): self.root = os.path.abspath(root) self.tmproot = self._init_temp(self.root) @@ -79,7 +86,7 @@ def warn(self, ex, msg): self.echo("IO ERROR: (%s) %s" % (ex, msg)) def _init_temp(self, root): - tmproot = os.path.join(root, 'tmp') + tmproot = os.path.join(root, "tmp") if os.path.exists(tmproot): self.safe_fileop(shutil.rmtree, tmproot) makedirs(tmproot) @@ -94,18 +101,23 @@ def _init_gc(self, root): self.safe_fileop(os.unlink, path) else: stat_res = self.safe_fileop(os.stat, path) - if stat_res: + sz = filesize(path) + if stat_res and sz is not None: _, info = stat_res if info.st_mtime == TIMESTAMP_FOR_DELETABLE: - self.safe_fileop(os.unlink, path) + # If we can't unlink it, we add it to the gc_queue + # to try at least another time + if not self.safe_fileop(os.unlink, path): + self.gc_queue[path] = (time.time(), sz) elif info.st_mtime == TIMESTAMP_FOR_DISPOSABLE: - self.disposables_queue[path] = info.st_size + self.disposables_queue[path] = sz else: - objects.append((info.st_mtime, (path, info.st_size))) + objects.append((info.st_mtime, (path, sz))) self.objects_queue.update(x for _, x in sorted(objects)) - self.total_size = sum(self.disposables_queue.values()) +\ - sum(self.objects_queue.values()) + self.total_size = sum(self.disposables_queue.values()) + sum( + self.objects_queue.values() + ) # It is possible that the datastore contains more than gc_watermark # bytes. To ensure that we start below the gc_watermark, we run the GC: @@ -115,17 +127,19 @@ def _init_gc(self, root): # yet, so we can safely delete the marked objects without a quarantine: self._gc_objects(quarantine=-1) - self.echo("Cache initialized with %d permanents objects, " - "%d disposable objects, totaling %d bytes." - % (len(self.objects_queue), - len(self.disposables_queue), - self.total_size)) + self.warn( + None, + "Cache initialized with %d permanents objects, " + "%d disposable objects, totaling %d bytes." + % (len(self.objects_queue), len(self.disposables_queue), self.total_size) + ) def _gc_objects(self, quarantine=GC_MARKER_QUARANTINE): def mark_for_deletion(path, size): - self.safe_fileop(os.utime, path, (TIMESTAMP_FOR_DELETABLE, - TIMESTAMP_FOR_DELETABLE)) - self.gc_queue[path] = (time.time(), size) + if self.safe_fileop( + os.utime, path, (TIMESTAMP_FOR_DELETABLE, TIMESTAMP_FOR_DELETABLE) + ): + self.gc_queue[path] = (time.time(), size) # 1) delete marked objects that are past their quarantine period limit = time.time() - quarantine @@ -135,6 +149,13 @@ def mark_for_deletion(path, size): if self.safe_fileop(os.unlink, path): del self.gc_queue[path] self.total_size -= size + else: + self.echo( + "Could not remove file at '%s' -- removing from GC" % path + ) + # We still remove to prevent the garbage collector from + # being stuck a few lines below. + del self.gc_queue[path] else: break @@ -169,14 +190,15 @@ def ensure_path(self, path): def open_tempdir(self, token, action_name, stream_key): self._gc_objects() - if self.total_size > self.max_size: - self.warn(None, "Cache soft limit reached! Used %d bytes, max %s bytes" - % (self.total_size, self.max_size)) + self.warn( + None, + "Cache soft limit reached! Used %d bytes, max %s bytes" + % (self.total_size, self.max_size), + ) try: - tmp = tempfile.mkdtemp(prefix='cache_action_%s.' % token, - dir=self.tmproot) + tmp = tempfile.mkdtemp(prefix="cache_action_%s." % token, dir=self.tmproot) except Exception as ex: msg = "Could not create a temp directory for request %s" % token self.warn(ex, msg) @@ -187,7 +209,7 @@ def open_tempdir(self, token, action_name, stream_key): # make sure that the new symlink points at a valid (empty!) # file by creating a dummy destination file self.ensure_path(src) - open_res = self.safe_fileop(open, dst, 'w') + open_res = self.safe_fileop(open, dst, "w") if open_res: _, f = open_res f.close() @@ -198,8 +220,7 @@ def open_tempdir(self, token, action_name, stream_key): # simultaneously. We don't consider an existing # symlink (errno 17) to be an error. if ex.errno != 17: - err = "Could not create a symlink %s->%s"\ - % (src, dst) + err = "Could not create a symlink %s->%s" % (src, dst) self.warn(ex, err) except Exception as ex: self.warn(ex, "Unknown error") @@ -227,6 +248,12 @@ def _insert(queue, key, value): # previous entry first queue.pop(key, None) queue[key] = value + # If we are inserting something in disposables_queue or objects_queue, + # we make sure it is no longer in the gc_queue. This can happen if, for + # example, an object is marked as deletable, is therefore not "readable" + # and is therefore re-created. + if key in self.gc_queue: + del self.gc_queue[key] disposables = frozenset(disposable_keys) missing = [] @@ -243,10 +270,8 @@ def _insert(queue, key, value): if os.path.exists(src): dst = object_path(self.root, key) self.ensure_path(dst) - stat_res = self.safe_fileop(os.stat, src) - if stat_res and self.safe_fileop(os.rename, src, dst): - _, info = stat_res - size = info.st_size + sz = filesize(src) + if sz is not None and self.safe_fileop(os.rename, src, dst): if key in disposables: # we proceed even if we fail to mark the object as # disposable. It just means that during a possible @@ -254,10 +279,10 @@ def _insert(queue, key, value): # object tstamp = TIMESTAMP_FOR_DISPOSABLE self.safe_fileop(os.utime, dst, (tstamp, tstamp)) - _insert(self.disposables_queue, dst, size) + _insert(self.disposables_queue, dst, sz) else: - _insert(self.objects_queue, dst, size) - self.total_size += size + _insert(self.objects_queue, dst, sz) + self.total_size += sz else: missing.append(key) diff --git a/services/utils/__init__.py b/services/utils/__init__.py index 4189b7ec..e92f38d7 100644 --- a/services/utils/__init__.py +++ b/services/utils/__init__.py @@ -134,6 +134,9 @@ def format_qs(query: Dict[str, str], overwrite=None): def format_baseurl(request: web.BaseRequest): scheme = request.headers.get("X-Forwarded-Proto") or request.scheme host = request.headers.get("X-Forwarded-Host") or request.host + # Only get the first Forwarded-Host/Proto in case there are more than one + scheme = scheme.split(",")[0].strip() + host = host.split(",")[0].strip() baseurl = os.environ.get( "MF_BASEURL", "{scheme}://{host}".format(scheme=scheme, host=host)) return "{baseurl}{path}".format(baseurl=baseurl, path=request.path)