diff --git a/CHANGELOG.md b/CHANGELOG.md index 546719e2c..66756f024 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Optimize the logic for file saving and retrieval in the artifact_store. - Add backfill on load of vector index - Add startup event to initialize db.apply jobs +- Update job_id after job submission #### New Features & Functionality diff --git a/superduper/backends/local/compute.py b/superduper/backends/local/compute.py index 571fdd1d1..9cba2ae01 100644 --- a/superduper/backends/local/compute.py +++ b/superduper/backends/local/compute.py @@ -55,7 +55,7 @@ def broadcast(self, events: t.List): def submit( self, function: t.Callable, *args, compute_kwargs: t.Dict = {}, **kwargs - ) -> t.Tuple[str, str]: + ) -> str: """ Submits a function for local execution. @@ -73,7 +73,7 @@ def submit( logging.success( f"Job submitted on {self}. function:{function} future:{future_key}" ) - return future_key, future_key + return future_key @property def tasks(self) -> t.Dict[str, t.Any]: diff --git a/superduper/jobs/job.py b/superduper/jobs/job.py index 957f32807..59bf997a1 100644 --- a/superduper/jobs/job.py +++ b/superduper/jobs/job.py @@ -65,7 +65,7 @@ def watch(self): return self.db.metadata.watch_job(identifier=self.identifier) @abstractmethod - def submit(self, compute, dependencies=(), update_job=True): + def submit(self, compute, dependencies=()): """Submit job for execution. :param compute: compute engine @@ -124,7 +124,7 @@ def dict(self): d['_path'] = f'superduper/jobs/job/FunctionJob/{path}' return d - def submit(self, dependencies=(), update_job=True): + def submit(self, dependencies=()): """Submit job for execution. :param dependencies: list of dependencies @@ -153,9 +153,11 @@ def __call__(self, db: t.Union['Datalayer', None], dependencies=()): db = build_datalayer() self.db = db - db.metadata.create_job(self.dict()) + db.metadata.create_job(self.dict()) self.submit(dependencies=dependencies) + if self.future: + db.metadata.update_job(self.job_id, 'job_id', self.future) return self @@ -245,6 +247,8 @@ def __call__(self, db: t.Union['Datalayer', None] = None, dependencies=()): db.metadata.create_job(self.dict()) self.submit(dependencies=dependencies) + if self.future: + db.metadata.update_job(self.job_id, 'job_id', self.future) return def dict(self):