Skip to content

Commit

Permalink
refactor: pipeline metadata store, preparation for job details
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 8, 2024
1 parent 57bcf8a commit 62c2847
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 50 deletions.
34 changes: 23 additions & 11 deletions src/kiara/models/module/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Union

from pydantic import field_validator
from pydantic.fields import Field, PrivateAttr
from pydantic.main import BaseModel
from rich import box
Expand Down Expand Up @@ -74,6 +73,12 @@ def add_log(self, msg: str, log_level: int = logging.DEBUG):
self.log.append(_msg)


class PipelineMetadata(BaseModel):

pipeline_id: uuid.UUID = Field(description="The id of the pipeline.")
step_id: str = Field(description="The id of the step in the pipeline.")


class JobConfig(InputsManifest):

_kiara_model_id: ClassVar = "instance.job_config"
Expand Down Expand Up @@ -113,6 +118,13 @@ def create_from_module(
def _retrieve_data_to_hash(self) -> Any:
return {"manifest": self.manifest_cid, "inputs": self.inputs_cid}

pipeline_metadata: Union[PipelineMetadata, None] = Field(
description="Metadata for the pipeline this job is part of.", default=None
)
# job_metadata: Mapping[str, Any] = Field(
# description="Optional metadata for this job.", default_factory=dict
# )


class ActiveJob(KiaraModel):

Expand Down Expand Up @@ -273,20 +285,20 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):
runtime_details: Union[JobRuntimeDetails, None] = Field(
description="Runtime details for the job."
)
job_metadata: Mapping[str, Any] = Field(
description="Optional metadata for this job.", default_factory=dict
)
# job_metadata: Mapping[str, Any] = Field(
# description="Optional metadata for this job.", default_factory=dict
# )

_is_stored: bool = PrivateAttr(default=None)
_outputs_hash: Union[int, None] = PrivateAttr(default=None)

@field_validator("job_metadata", mode="before")
@classmethod
def validate_metadata(cls, value):

if value is None:
value = {}
return value
# @field_validator("job_metadata", mode="before")
# @classmethod
# def validate_metadata(cls, value):
#
# if value is None:
# value = {}
# return value

def _retrieve_data_to_hash(self) -> Any:
return {
Expand Down
21 changes: 14 additions & 7 deletions src/kiara/models/module/pipeline/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,23 @@ def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
---------
step_id: the id of the step that should be started
"""

from kiara.models.module.jobs import PipelineMetadata

job_config = self.pipeline.create_job_config_for_step(step_id)

job_metadata = {
"is_pipeline_step": True,
"step_id": step_id,
"pipeline_id": self.pipeline.pipeline_id,
}
job_id = self._job_registry.execute_job(
job_config=job_config, pipeline_metadata=job_metadata
# pipeline_metadata = {
# "is_pipeline_step": True,
# "step_id": step_id,
# "pipeline_id": self.pipeline.pipeline_id,
# }

pipeline_metadata = PipelineMetadata(
pipeline_id=self.pipeline.pipeline_id, step_id=step_id
)
job_config.pipeline_metadata = pipeline_metadata

job_id = self._job_registry.execute_job(job_config=job_config)
# job_id = self._processor.create_job(job_config=job_config)
# self._processor.queue_job(job_id=job_id)

Expand Down
33 changes: 18 additions & 15 deletions src/kiara/processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@

import abc
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, Protocol, Union
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Protocol, Union

import structlog
from pydantic import BaseModel

from kiara.exceptions import KiaraProcessingException
from kiara.models.module.jobs import ActiveJob, JobConfig, JobLog, JobRecord, JobStatus
from kiara.models.module.jobs import (
ActiveJob,
JobConfig,
JobLog,
JobRecord,
JobStatus,
)
from kiara.models.values.value import (
ValueMap,
ValueMapReadOnly,
Expand Down Expand Up @@ -99,18 +105,13 @@ def get_job_record(self, job_id: uuid.UUID) -> JobRecord:
else:
raise Exception(f"No job record for job with id '{job_id}' registered.")

def create_job(
self, job_config: JobConfig, pipeline_metadata: Union[None, Mapping[str, Any]]
) -> uuid.UUID:
def create_job(self, job_config: JobConfig) -> uuid.UUID:

environments = {
env_name: env.instance_id
for env_name, env in self._kiara.current_environments.items()
}

if pipeline_metadata is None:
pipeline_metadata = {}

result_pedigree = ValuePedigree(
kiara_id=self._kiara.id,
module_type=job_config.module_type,
Expand Down Expand Up @@ -142,8 +143,9 @@ def create_job(
"job": job,
"module": module,
"outputs": outputs,
"job_metadata": pipeline_metadata,
}
job_details["pipeline_metadata"] = job_config.pipeline_metadata

self._created_jobs[job_id] = job_details

self._send_job_event(
Expand All @@ -159,10 +161,10 @@ def create_job(
or dev_settings.log.pre_run.internal_modules
):

is_pipeline_step = pipeline_metadata.get("is_pipeline_step", False)
is_pipeline_step = job_config.pipeline_metadata is not None
if is_pipeline_step:
if dev_settings.log.pre_run.pipeline_steps:
step_id = pipeline_metadata.get("step_id", None)
step_id = job_config.pipeline_metadata.step_id # type: ignore
assert step_id is not None
title = (
f"Pre-run information for pipeline step: [i]{step_id}[/i]"
Expand Down Expand Up @@ -332,15 +334,16 @@ def job_status_updated(
and not dev_config.log.post_run.internal_modules
):
skip = True
is_pipeline_step = details["job_metadata"].get(
"is_pipeline_step", False
)

pipeline_metadata = details.get("pipeline_metadata", None)
is_pipeline_step = pipeline_metadata is not None

if is_pipeline_step and not dev_config.log.post_run.pipeline_steps:
skip = True

if not skip:
if is_pipeline_step:
step_id = details["job_metadata"]["step_id"]
step_id = pipeline_metadata.step_id # type: ignore
title = f"Post-run information for pipeline step: {step_id}"
else:
title = f"Post-run information for module: {module.module_type_name}"
Expand Down
39 changes: 25 additions & 14 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ def find_matching_job_record(
def prepare_job_config(
self, manifest: Manifest, inputs: Mapping[str, Any]
) -> JobConfig:
"""Prepare a JobConfig instance from the manifest and inputs.
This involves creating (and therefor validating) a module instance, as well as making sure the inputs are valid.
"""

module = self._kiara.module_registry.create_module(manifest=manifest)

Expand All @@ -431,6 +435,7 @@ def execute(
inputs: Mapping[str, Any],
wait: bool = False,
) -> uuid.UUID:
"""Prepare a job config, then execute it."""

job_config = self.prepare_job_config(manifest=manifest, inputs=inputs)
return self.execute_job(job_config, wait=wait)
Expand All @@ -439,8 +444,13 @@ def execute_job(
self,
job_config: JobConfig,
wait: bool = False,
pipeline_metadata: Union[None, Any] = None,
) -> uuid.UUID:
"""Execute the job specified by the job config.
Arguments:
job_config: the job config
wait: whether to wait for the job to finish
"""

# from kiara.models.metadata import CommentMetadata
# if "comment" not in job_metadata.keys():
Expand Down Expand Up @@ -472,6 +482,14 @@ def execute_job(
)

stored_job = self.find_matching_job_record(inputs_manifest=job_config)

is_pipeline_step = False if job_config.pipeline_metadata is None else True
if is_pipeline_step:
pipeline_step_id: Union[None, str] = job_config.pipeline_metadata.step_id # type: ignore
pipeline_id: Union[None, uuid.UUID] = job_config.pipeline_metadata.pipeline_id # type: ignore
else:
pipeline_step_id = None
pipeline_id = None
if stored_job is not None:
log.debug(
"job.use_cached",
Expand All @@ -481,11 +499,8 @@ def execute_job(
if is_develop():

module = self._kiara.module_registry.create_module(manifest=job_config)
if pipeline_metadata and pipeline_metadata.get(
"is_pipeline_step", True
):
step_id = pipeline_metadata.get("step_id", None)
title = f"Using cached pipeline step: {step_id}"
if is_pipeline_step:
title = f"Using cached pipeline step: {pipeline_step_id}"
else:
title = f"Using cached job for: {module.module_type_name}"

Expand All @@ -511,21 +526,17 @@ def execute_job(

return stored_job

if pipeline_metadata is None:
pipeline_metadata = {}

is_pipeline_step = pipeline_metadata.get("is_pipeline_step", False)
dbg_data = {
"module_type": job_config.module_type,
"is_pipeline_step": is_pipeline_step,
}
if is_pipeline_step:
dbg_data["step_id"] = pipeline_metadata["step_id"]
dbg_data["step_id"] = pipeline_step_id
dbg_data["pipeline_id"] = str(pipeline_id)

log.debug("job.execute", **dbg_data)

job_id = self._processor.create_job(
job_config=job_config, pipeline_metadata=pipeline_metadata
)
job_id = self._processor.create_job(job_config=job_config)
self._active_jobs[job_config.job_hash] = job_id

try:
Expand Down
5 changes: 2 additions & 3 deletions src/kiara/registries/jobs/job_store/filesystem_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]
details_content = details_file.read_text()
details: Dict[str, Any] = orjson.loads(details_content)

dbg(details)

job_record = JobRecord(**details)
job_record._is_stored = True
return job_record
Expand Down Expand Up @@ -180,11 +182,8 @@ class FileSystemJobStore(FileSystemJobArchive, JobStore):
def store_job_record(self, job_record: JobRecord):

manifest_cid = job_record.manifest_cid
# inputs_hash = job_record.inputs_data_hash

# manifest_hash = job_record.manifest_hash
input_ids_hash = job_record.input_ids_hash
# inputs_hash = job_record.inputs_data_hash

base_path = self.job_store_path / MANIFEST_SUB_PATH
manifest_folder = base_path / str(manifest_cid)
Expand Down
10 changes: 10 additions & 0 deletions src/kiara/utils/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ def create_post_run_table(
else:
table.add_row("module_config", module.config)

if job_config.pipeline_metadata is not None:
pm_table = Table(show_header=False, box=box.SIMPLE)
pm_table.add_column("key")
pm_table.add_column("value")
pm_table.add_row("pipeline_id", str(job_config.pipeline_metadata.pipeline_id))
pm_table.add_row("step_id", job_config.pipeline_metadata.step_id)
table.add_row("pipeline_step_metadata", pm_table)
else:
table.add_row("pipeline_step_metadata", "-- not a pipeline step --")

inputs_details = dev_config.log.post_run.inputs_info
if inputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
if inputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
Expand Down

0 comments on commit 62c2847

Please sign in to comment.