Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support db apply blocking #2594

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Include templates data in accessible directory (failed)
- Add ci workflow to test templates
- Add deploy flag in model.
- Modify the apply endpoint in the REST API to an asynchronous interface.
- Add db apply wait on create events.

#### Bug Fixes

Expand Down
4 changes: 3 additions & 1 deletion superduper/backends/local/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ def _put(self, component: Component):
self._cache[component.uuid] = component
if (component.type_id, component.identifier) in self._component_to_uuid:
current = self._component_to_uuid[component.type_id, component.identifier]
current_version = self._cache[current].version
current_component = self._cache[current]
current_version = current_component.version
if current_version < component.version:
self._component_to_uuid[
component.type_id, component.identifier
] = component.uuid
self.expire(current_component.uuid)
else:
self._component_to_uuid[
component.type_id, component.identifier
Expand Down
30 changes: 30 additions & 0 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import time
import typing as t
import warnings
from collections import namedtuple
Expand Down Expand Up @@ -44,6 +45,8 @@
PredictResult = t.Union[Document, t.Sequence[Document]]
ExecuteResult = t.Union[SelectResult, DeleteResult, UpdateResult, InsertResult]

_WAIT_TIMEOUT = 60


class Datalayer:
"""
Expand Down Expand Up @@ -447,6 +450,7 @@ def apply(
self,
object: t.Union[Component, t.Sequence[t.Any], t.Any],
force: bool | None = None,
wait: bool = True,
):
"""
Add functionality in the form of components.
Expand All @@ -457,6 +461,7 @@ def apply(
:param object: Object to be stored.
:param dependencies: List of jobs which should execute before component
initialization begins.
:param wait: Wait for apply events.
:return: Tuple containing the added object(s) and the original object(s).
"""
if force is None:
Expand Down Expand Up @@ -537,8 +542,33 @@ def uniquify(x):
):
return object
self.cluster.queue.publish(events=events)
if wait:
self._wait_on_events(unique_create_events)
return object

def _wait_on_events(self, events):
remaining = len(events)
time_left = _WAIT_TIMEOUT
while True:
for event in events:
identifier = event.component['identifier']
type_id = event.component['type_id']
version = event.component['version']
try:
self.show(type_id=type_id, identifier=identifier, version=version)
except FileNotFoundError:
pass
else:
remaining -= 1

if remaining <= 0:
return
elif time_left == 0:
raise TimeoutError("Timeout error while waiting for create events.")
else:
time.sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe too much?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could u suggest a timeout sleep?

time_left -= 1

def remove(
self,
type_id: str,
Expand Down
Loading