From 255b73eb586d6cc6f211632d1023aa55b745e89a Mon Sep 17 00:00:00 2001 From: TheDude Date: Tue, 5 Nov 2024 23:51:23 +0530 Subject: [PATCH 1/2] Support db apply blocking --- superduper/backends/local/cache.py | 4 +++- superduper/base/datalayer.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/superduper/backends/local/cache.py b/superduper/backends/local/cache.py index 2db3a394f..0b0d1abf7 100644 --- a/superduper/backends/local/cache.py +++ b/superduper/backends/local/cache.py @@ -37,11 +37,13 @@ def _put(self, component: Component): self._cache[component.uuid] = component if (component.type_id, component.identifier) in self._component_to_uuid: current = self._component_to_uuid[component.type_id, component.identifier] - current_version = self._cache[current].version + current_component = self._cache[current] + current_version = current_component.version if current_version < component.version: self._component_to_uuid[ component.type_id, component.identifier ] = component.uuid + self.expire(current_component.uuid) else: self._component_to_uuid[ component.type_id, component.identifier diff --git a/superduper/base/datalayer.py b/superduper/base/datalayer.py index f4d02bce4..0a5158774 100644 --- a/superduper/base/datalayer.py +++ b/superduper/base/datalayer.py @@ -1,4 +1,5 @@ import random +import time import typing as t import warnings from collections import namedtuple @@ -44,6 +45,8 @@ PredictResult = t.Union[Document, t.Sequence[Document]] ExecuteResult = t.Union[SelectResult, DeleteResult, UpdateResult, InsertResult] +_WAIT_TIMEOUT = 60 + class Datalayer: """ @@ -447,6 +450,7 @@ def apply( self, object: t.Union[Component, t.Sequence[t.Any], t.Any], force: bool | None = None, + wait: bool = True, ): """ Add functionality in the form of components. @@ -457,6 +461,7 @@ def apply( :param object: Object to be stored. :param dependencies: List of jobs which should execute before component initialization begins. + :param wait: Wait for apply events. :return: Tuple containing the added object(s) and the original object(s). """ if force is None: @@ -537,8 +542,33 @@ def uniquify(x): ): return object self.cluster.queue.publish(events=events) + if wait: + self._wait_on_events(unique_create_events) return object + def _wait_on_events(self, events): + remaining = len(events) + time_left = _WAIT_TIMEOUT + while True: + for event in events: + identifier = event.component['identifier'] + type_id = event.component['type_id'] + version = event.component['version'] + try: + self.show(type_id=type_id, identifier=identifier, version=version) + except FileNotFoundError: + pass + else: + remaining -= 1 + + if remaining <= 0: + return + elif time_left == 0: + raise TimeoutError("Timeout error while waiting for create events.") + else: + time.sleep(1) + time_left -= 1 + def remove( self, type_id: str, From 1ff2a7bda19ec7059c942c8040052b9d6daa48d9 Mon Sep 17 00:00:00 2001 From: TheDude Date: Wed, 6 Nov 2024 16:48:15 +0530 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0314944e7..a1f401f95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,7 @@ Include templates data in accessible directory (failed) - Add ci workflow to test templates - Add deploy flag in model. - Modify the apply endpoint in the REST API to an asynchronous interface. +- Add db apply wait on create events. #### Bug Fixes