Skip to content

Commit

Permalink
refactor: rename job_metadata -> pipeline_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 8, 2024
1 parent 06a9e02 commit 57bcf8a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 36 deletions.
30 changes: 12 additions & 18 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2759,7 +2759,10 @@ def assemble_render_pipeline(
# ------------------------------------------------------------------------------------------------------------------
# job-related methods
def queue_manifest(
self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None
self,
manifest: Manifest,
inputs: Union[None, Mapping[str, Any]] = None,
**job_metadata: Any,
) -> uuid.UUID:
"""
Queue a job using the provided manifest to describe the module and config that should be executed.
Expand All @@ -2786,7 +2789,10 @@ def queue_manifest(
return job_id

def run_manifest(
self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None
self,
manifest: Manifest,
inputs: Union[None, Mapping[str, Any]] = None,
**job_metadata: Any,
) -> ValueMapReadOnly:
"""
Run a job using the provided manifest to describe the module and config that should be executed.
Expand All @@ -2796,11 +2802,12 @@ def run_manifest(
Arguments:
manifest: the manifest
inputs: the job inputs (can be either references to values, or raw inputs
job_metadata: additional metadata to store with the job
Returns:
a result value map instance
"""
job_id = self.queue_manifest(manifest=manifest, inputs=inputs)
job_id = self.queue_manifest(manifest=manifest, inputs=inputs, **job_metadata)
return self.context.job_registry.retrieve_result(job_id=job_id)

def queue_job(
Expand All @@ -2822,19 +2829,12 @@ def queue_job(
operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found)..
inputs: the operation inputs
operation_config: the (optional) module config in case 'operation' is a module name
**job_metadata: additional metadata to store with the job
job_metadata: additional metadata to store with the job
Returns:
the queued job id
"""

if "comment" not in job_metadata.keys():
raise KiaraException("You need to provide a 'comment' for the job.")

comment = job_metadata.get("comment")
if not isinstance(comment, str):
raise KiaraException("The 'comment' must be a string.")

if inputs is None:
inputs = {}

Expand Down Expand Up @@ -2903,14 +2903,8 @@ def queue_job(
else:
manifest = _operation

job_id = self.queue_manifest(manifest=manifest, inputs=inputs)
job_id = self.queue_manifest(manifest=manifest, inputs=inputs, **job_metadata)

from kiara.models.metadata import CommentMetadata

comment_metadata = CommentMetadata(comment=comment)
self.context.metadata_registry.register_metadata_item(
key="comment", item=comment_metadata, force=False, store=None
)
return job_id

def run_job(
Expand Down
8 changes: 6 additions & 2 deletions src/kiara/models/module/pipeline/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,13 @@ def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
"""
job_config = self.pipeline.create_job_config_for_step(step_id)

job_metadata = {"is_pipeline_step": True, "step_id": step_id}
job_metadata = {
"is_pipeline_step": True,
"step_id": step_id,
"pipeline_id": self.pipeline.pipeline_id,
}
job_id = self._job_registry.execute_job(
job_config=job_config, job_metadata=job_metadata
job_config=job_config, pipeline_metadata=job_metadata
)
# job_id = self._processor.create_job(job_config=job_config)
# self._processor.queue_job(job_id=job_id)
Expand Down
12 changes: 6 additions & 6 deletions src/kiara/processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ def get_job_record(self, job_id: uuid.UUID) -> JobRecord:
raise Exception(f"No job record for job with id '{job_id}' registered.")

def create_job(
self, job_config: JobConfig, job_metadata: Union[None, Mapping[str, Any]]
self, job_config: JobConfig, pipeline_metadata: Union[None, Mapping[str, Any]]
) -> uuid.UUID:

environments = {
env_name: env.instance_id
for env_name, env in self._kiara.current_environments.items()
}

if job_metadata is None:
job_metadata = {}
if pipeline_metadata is None:
pipeline_metadata = {}

result_pedigree = ValuePedigree(
kiara_id=self._kiara.id,
Expand Down Expand Up @@ -142,7 +142,7 @@ def create_job(
"job": job,
"module": module,
"outputs": outputs,
"job_metadata": job_metadata,
"job_metadata": pipeline_metadata,
}
self._created_jobs[job_id] = job_details

Expand All @@ -159,10 +159,10 @@ def create_job(
or dev_settings.log.pre_run.internal_modules
):

is_pipeline_step = job_metadata.get("is_pipeline_step", False)
is_pipeline_step = pipeline_metadata.get("is_pipeline_step", False)
if is_pipeline_step:
if dev_settings.log.pre_run.pipeline_steps:
step_id = job_metadata.get("step_id", None)
step_id = pipeline_metadata.get("step_id", None)
assert step_id is not None
title = (
f"Pre-run information for pipeline step: [i]{step_id}[/i]"
Expand Down
34 changes: 24 additions & 10 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,19 +430,31 @@ def execute(
manifest: Manifest,
inputs: Mapping[str, Any],
wait: bool = False,
job_metadata: Union[None, Any] = None,
) -> uuid.UUID:

job_config = self.prepare_job_config(manifest=manifest, inputs=inputs)
return self.execute_job(job_config, wait=wait, job_metadata=job_metadata)
return self.execute_job(job_config, wait=wait)

def execute_job(
self,
job_config: JobConfig,
wait: bool = False,
job_metadata: Union[None, Any] = None,
pipeline_metadata: Union[None, Any] = None,
) -> uuid.UUID:

# from kiara.models.metadata import CommentMetadata
# if "comment" not in job_metadata.keys():
# raise KiaraException("You need to provide a 'comment' for the job.")
#
# comment = job_metadata.get("comment")
# if not isinstance(comment, str):
# raise KiaraException("The 'comment' must be a string.")
#
# comment_metadata = CommentMetadata(comment=comment)
# self.context.metadata_registry.register_metadata_item(
# key="comment", item=comment_metadata, force=False, store=None
# )

if job_config.module_type != "pipeline":
log = logger.bind(
module_type=job_config.module_type,
Expand All @@ -469,8 +481,10 @@ def execute_job(
if is_develop():

module = self._kiara.module_registry.create_module(manifest=job_config)
if job_metadata and job_metadata.get("is_pipeline_step", True):
step_id = job_metadata.get("step_id", None)
if pipeline_metadata and pipeline_metadata.get(
"is_pipeline_step", True
):
step_id = pipeline_metadata.get("step_id", None)
title = f"Using cached pipeline step: {step_id}"
else:
title = f"Using cached job for: {module.module_type_name}"
Expand All @@ -497,20 +511,20 @@ def execute_job(

return stored_job

if job_metadata is None:
job_metadata = {}
if pipeline_metadata is None:
pipeline_metadata = {}

is_pipeline_step = job_metadata.get("is_pipeline_step", False)
is_pipeline_step = pipeline_metadata.get("is_pipeline_step", False)
dbg_data = {
"module_type": job_config.module_type,
"is_pipeline_step": is_pipeline_step,
}
if is_pipeline_step:
dbg_data["step_id"] = job_metadata["step_id"]
dbg_data["step_id"] = pipeline_metadata["step_id"]
log.debug("job.execute", **dbg_data)

job_id = self._processor.create_job(
job_config=job_config, job_metadata=job_metadata
job_config=job_config, pipeline_metadata=pipeline_metadata
)
self._active_jobs[job_config.job_hash] = job_id

Expand Down
5 changes: 5 additions & 0 deletions src/kiara/registries/metadata/metadata_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,8 @@ def _store_metadata_item(
conn.commit()

return metadata_item_id

def _store_metadata_reference(
self, reference_item_type: str, reference_item_id: str, metadata_item_id: str
) -> None:
raise NotImplementedError()

0 comments on commit 57bcf8a

Please sign in to comment.