From 93f48ad3fa7e7a3c278062dcbdf6aa6f7f8267cf Mon Sep 17 00:00:00 2001 From: TheDude Date: Thu, 29 Aug 2024 18:42:52 +0530 Subject: [PATCH 1/2] Update superduper job_id after submission --- superduper/jobs/job.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/superduper/jobs/job.py b/superduper/jobs/job.py index 957f32807..59bf997a1 100644 --- a/superduper/jobs/job.py +++ b/superduper/jobs/job.py @@ -65,7 +65,7 @@ def watch(self): return self.db.metadata.watch_job(identifier=self.identifier) @abstractmethod - def submit(self, compute, dependencies=(), update_job=True): + def submit(self, compute, dependencies=()): """Submit job for execution. :param compute: compute engine @@ -124,7 +124,7 @@ def dict(self): d['_path'] = f'superduper/jobs/job/FunctionJob/{path}' return d - def submit(self, dependencies=(), update_job=True): + def submit(self, dependencies=()): """Submit job for execution. :param dependencies: list of dependencies @@ -153,9 +153,11 @@ def __call__(self, db: t.Union['Datalayer', None], dependencies=()): db = build_datalayer() self.db = db - db.metadata.create_job(self.dict()) + db.metadata.create_job(self.dict()) self.submit(dependencies=dependencies) + if self.future: + db.metadata.update_job(self.job_id, 'job_id', self.future) return self @@ -245,6 +247,8 @@ def __call__(self, db: t.Union['Datalayer', None] = None, dependencies=()): db.metadata.create_job(self.dict()) self.submit(dependencies=dependencies) + if self.future: + db.metadata.update_job(self.job_id, 'job_id', self.future) return def dict(self): From 137be7237ebc603602d450d6692f6eea7f20fd1f Mon Sep 17 00:00:00 2001 From: TheDude Date: Fri, 30 Aug 2024 11:53:01 +0530 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 1 + superduper/backends/local/compute.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 546719e2c..66756f024 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Optimize the logic for file saving and retrieval in the artifact_store. - Add backfill on load of vector index - Add startup event to initialize db.apply jobs +- Update job_id after job submission #### New Features & Functionality diff --git a/superduper/backends/local/compute.py b/superduper/backends/local/compute.py index 571fdd1d1..9cba2ae01 100644 --- a/superduper/backends/local/compute.py +++ b/superduper/backends/local/compute.py @@ -55,7 +55,7 @@ def broadcast(self, events: t.List): def submit( self, function: t.Callable, *args, compute_kwargs: t.Dict = {}, **kwargs - ) -> t.Tuple[str, str]: + ) -> str: """ Submits a function for local execution. @@ -73,7 +73,7 @@ def submit( logging.success( f"Job submitted on {self}. function:{function} future:{future_key}" ) - return future_key, future_key + return future_key @property def tasks(self) -> t.Dict[str, t.Any]: