diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index f5f8382ec..97e2618cc 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -393,6 +393,7 @@ def create_default_store_config( class KiaraConfig(BaseSettings): + model_config = SettingsConfigDict( env_prefix="kiara_", extra="forbid", use_enum_values=True ) @@ -890,18 +891,23 @@ def save(self, path: Union[Path, None] = None): path.parent.mkdir(parents=True, exist_ok=True) + data = self.model_dump( + exclude={ + "context", + "auto_generate_contexts", + "stores_base_path", + "context_search_paths", + "default_context", + "runtime_config", + } + ) + + if data["default_store_type"] == DEFAULT_STORE_TYPE: + data.pop("default_store_type") + with path.open("wt") as f: yaml.dump( - self.model_dump( - exclude={ - "context", - "auto_generate_contexts", - "stores_base_path", - "context_search_paths", - "default_context", - "runtime_config", - } - ), + data, f, ) diff --git a/src/kiara/context/runtime_config.py b/src/kiara/context/runtime_config.py index 806c19db7..0efb59c95 100644 --- a/src/kiara/context/runtime_config.py +++ b/src/kiara/context/runtime_config.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from enum import Enum +from typing import Literal from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict @@ -32,6 +33,11 @@ class KiaraRuntimeConfig(BaseSettings): lock_context: bool = Field( description="Whether to lock context(s) on creation.", default=False ) + runtime_profile: Literal["default", "dharpa"] = Field( + description="The runtime profile to use, this determines for example whether comments need to be provided when running a job.", + default="dharpa", + ) + # ignore_errors: bool = Field( # description="If set, kiara will try to ignore most errors (that can be ignored).", # default=False, diff --git a/src/kiara/interfaces/__init__.py b/src/kiara/interfaces/__init__.py index 51898d6a7..079ad4080 100644 --- a/src/kiara/interfaces/__init__.py +++ b/src/kiara/interfaces/__init__.py @@ -334,6 +334,8 @@ def kiara_config(self) -> "KiaraConfig": else: kiara_config = KiaraConfig.load_from_file(config_file_path) + kiara_config.runtime_config.runtime_profile = "default" + self._kiara_config = kiara_config return self._kiara_config diff --git a/src/kiara/interfaces/cli/__init__.py b/src/kiara/interfaces/cli/__init__.py index 6d227f16e..2a60d60d1 100644 --- a/src/kiara/interfaces/cli/__init__.py +++ b/src/kiara/interfaces/cli/__init__.py @@ -14,9 +14,6 @@ import rich_click as click import structlog -from kiara.defaults import ( - SYMLINK_ISSUE_MSG, -) from kiara.interfaces import KiaraAPIWrap from kiara.utils import is_debug, log_message from kiara.utils.class_loading import find_all_cli_subcommands @@ -95,15 +92,14 @@ def cli( For more information, visit the [i][b]kiara[/b] homepage[/i]: https://dharpa.org/kiara.documentation . """ # check if windows symlink work - from kiara.utils.windows import check_symlink_works - - if not check_symlink_works(): - - terminal_print() - from rich.markdown import Markdown - terminal_print(Markdown(SYMLINK_ISSUE_MSG)) - sys.exit(1) + # if not check_symlink_works(): + # + # terminal_print() + # from rich.markdown import Markdown + # + # terminal_print(Markdown(SYMLINK_ISSUE_MSG)) + # sys.exit(1) context_subcommand = ctx.invoked_subcommand == "context" if context_subcommand and use_background_service: diff --git a/src/kiara/interfaces/cli/run.py b/src/kiara/interfaces/cli/run.py index f5760426b..a06313b87 100644 --- a/src/kiara/interfaces/cli/run.py +++ b/src/kiara/interfaces/cli/run.py @@ -44,6 +44,9 @@ @click.option( "--output", "-o", help="The output format and configuration.", multiple=True ) +@click.option( + "--comment", "-c", help="Add comment metadata to the job you run.", required=False +) @click.option( "--save", "-s", @@ -67,6 +70,7 @@ def run( module_config: Iterable[str], inputs: Iterable[str], output: Iterable[str], + comment: Union[str, None], explain: bool, save: Iterable[str], print_properties: bool, @@ -249,6 +253,7 @@ def run( api=api, operation=kiara_op, inputs=inputs_value_map, + comment=comment, silent=silent, save_results=bool(final_aliases), aliases=final_aliases, diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index 40ef04fdb..89574b2cf 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -59,7 +59,6 @@ from kiara.interfaces.python_api.models.job import JobDesc from kiara.interfaces.python_api.value import StoreValueResult, StoreValuesResult from kiara.models.context import ContextInfo, ContextInfos -from kiara.models.module.jobs import ActiveJob from kiara.models.module.manifest import Manifest from kiara.models.module.operation import Operation from kiara.models.rendering import RenderValueResult @@ -101,6 +100,7 @@ ) from kiara.interfaces.python_api.workflow import Workflow from kiara.models.archives import KiArchiveInfo + from kiara.models.module.jobs import ActiveJob, JobRecord from kiara.models.module.pipeline import PipelineConfig, PipelineStructure from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo from kiara.registries import KiaraArchive @@ -1322,7 +1322,7 @@ def list_value_ids(self, **matcher_params) -> List[uuid.UUID]: """ List all available value ids for this kiara context. - This method exists mainly so frontend can retrieve a list of all value_ids that exists on the backend without + This method exists mainly so frontends can retrieve a list of all value_ids that exists on the backend without having to look up the details of each value (like [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] does). This method can also be used with a matcher, but in this case the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] would be preferable in most cases, because it is called under the hood, and the performance advantage of not @@ -1385,6 +1385,16 @@ def get_value(self, value: Union[str, Value, uuid.UUID, Path]) -> Value: return self.context.data_registry.get_value(value=value) def get_values(self, **values: Union[str, Value, uuid.UUID]) -> ValueMapReadOnly: + """Retrieve Value instances for the specified value ids or aliases. + + This is a convenience method to get fully 'hydrated' `Value` objects from references to them. + + Arguments: + values: a dictionary with value ids or aliases as keys, and value instances as values + + Returns: + a mapping with value_id as key, and [kiara.models.values.value.Value] as value + """ return self.context.data_registry.load_values(values=values) @@ -2226,6 +2236,11 @@ def set_archive_metadata_value( value: Any, archive_type: Literal["data", "alias"] = "data", ) -> None: + """Add metadata to an archive. + + Note that this is different to adding metadata to a context, since it is attached directly + to a special section of the archive itself. + """ if archive_type == "data": _archive: Union[ @@ -2759,7 +2774,10 @@ def assemble_render_pipeline( # ------------------------------------------------------------------------------------------------------------------ # job-related methods def queue_manifest( - self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None + self, + manifest: Manifest, + inputs: Union[None, Mapping[str, Any]] = None, + **job_metadata: Any, ) -> uuid.UUID: """ Queue a job using the provided manifest to describe the module and config that should be executed. @@ -2773,6 +2791,20 @@ def queue_manifest( Returns: a result value map instance """ + + if self.context.runtime_config.runtime_profile == "dharpa": + if not job_metadata: + raise Exception( + "No job metadata provided. You need to provide a 'comment' argument when running your job." + ) + + if "comment" not in job_metadata.keys(): + raise KiaraException(msg="You need to provide a 'comment' for the job.") + + save_values = True + else: + save_values = False + if inputs is None: inputs = {} @@ -2781,12 +2813,21 @@ def queue_manifest( ) job_id = self.context.job_registry.execute_job( - job_config=job_config, wait=False + job_config=job_config, wait=False, auto_save_result=save_values ) + + if job_metadata: + self.context.metadata_registry.register_job_metadata_items( + job_id=job_id, items=job_metadata + ) + return job_id def run_manifest( - self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None + self, + manifest: Manifest, + inputs: Union[None, Mapping[str, Any]] = None, + **job_metadata: Any, ) -> ValueMapReadOnly: """ Run a job using the provided manifest to describe the module and config that should be executed. @@ -2796,11 +2837,12 @@ def run_manifest( Arguments: manifest: the manifest inputs: the job inputs (can be either references to values, or raw inputs + job_metadata: additional metadata to store with the job Returns: a result value map instance """ - job_id = self.queue_manifest(manifest=manifest, inputs=inputs) + job_id = self.queue_manifest(manifest=manifest, inputs=inputs, **job_metadata) return self.context.job_registry.retrieve_result(job_id=job_id) def queue_job( @@ -2808,6 +2850,7 @@ def queue_job( operation: Union[str, Path, Manifest, OperationInfo, JobDesc], inputs: Union[Mapping[str, Any], None], operation_config: Union[None, Mapping[str, Any]] = None, + **job_metadata: Any, ) -> uuid.UUID: """ Queue a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result. @@ -2821,6 +2864,7 @@ def queue_job( operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found).. inputs: the operation inputs operation_config: the (optional) module config in case 'operation' is a module name + job_metadata: additional metadata to store with the job Returns: the queued job id @@ -2894,7 +2938,8 @@ def queue_job( else: manifest = _operation - job_id = self.queue_manifest(manifest=manifest, inputs=inputs) + job_id = self.queue_manifest(manifest=manifest, inputs=inputs, **job_metadata) + return job_id def run_job( @@ -2902,6 +2947,7 @@ def run_job( operation: Union[str, Path, Manifest, OperationInfo, JobDesc], inputs: Union[None, Mapping[str, Any]] = None, operation_config: Union[None, Mapping[str, Any]] = None, + **job_metadata, ) -> ValueMapReadOnly: """ Run a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result. @@ -2918,6 +2964,7 @@ def run_job( operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found).. inputs: the operation inputs operation_config: the (optional) module config in case 'operation' is a module name + **job_metadata: additional metadata to store with the job Returns: the job result value map @@ -2927,11 +2974,14 @@ def run_job( inputs = {} job_id = self.queue_job( - operation=operation, inputs=inputs, operation_config=operation_config + operation=operation, + inputs=inputs, + operation_config=operation_config, + **job_metadata, ) return self.context.job_registry.retrieve_result(job_id=job_id) - def get_job(self, job_id: Union[str, uuid.UUID]) -> ActiveJob: + def get_job(self, job_id: Union[str, uuid.UUID]) -> "ActiveJob": """Retrieve the status of the job with the provided id.""" if isinstance(job_id, str): job_id = uuid.UUID(job_id) @@ -2947,6 +2997,97 @@ def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly: result = self.context.job_registry.retrieve_result(job_id=job_id) return result + def list_job_record_ids(self, **matcher_params) -> List[uuid.UUID]: + """List all available job ids in this kiara context, ordered from newest to oldest. + + This method exists mainly so frontends can retrieve a list of all job ids in order, without having + to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could + also just use `list_jobs` and take the keys from the result. + + You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. + + Arguments: + matcher_params: additional parameters to pass to the job matcher + + Returns: + a list of job ids, ordered from latest to earliest + """ + + if matcher_params: + job_ids = list(self.list_job_records(**matcher_params).keys()) + else: + job_ids = self.context.job_registry.retrieve_all_job_record_ids() + + return job_ids + + def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]: + """List all available job ids in this kiara context, ordered from newest to oldest. + + This method exists mainly so frontends can retrieve a list of all job ids in order, without having + to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could + also just use `list_jobs` and take the keys from the result. + + You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. + + Arguments: + matcher_params: additional parameters to pass to the job matcher + + Returns: + a list of job details, ordered from latest to earliest + + """ + + if matcher_params: + raise NotImplementedError("Job matching is not implemented yet") + from kiara.models.module.jobs import JobMatcher + + matcher = JobMatcher(**matcher_params) + job_records = self.context.job_registry.find_job_records(matcher=matcher) + else: + job_records = self.context.job_registry.retrieve_all_job_records() + + return job_records + + def get_job_record(self, job_id: Union[str, uuid.UUID]) -> Union["JobRecord", None]: + + if isinstance(job_id, str): + job_id = uuid.UUID(job_id) + + job_record = self.context.job_registry.get_job_record(job_id=job_id) + return job_record + + def get_job_comment(self, job_id: Union[str, uuid.UUID]) -> Union[str, None]: + """Retrieve the comment for the specified job. + + Returns 'None' if the job_id does not exist, or the job does not have a comment attached to it. + + Arguments: + job_id: the job id + + Returns: + the comment as string, or None + """ + + from kiara.models.metadata import CommentMetadata + + if isinstance(job_id, str): + job_id = uuid.UUID(job_id) + + metadata: Union[ + None, CommentMetadata + ] = self.context.metadata_registry.retrieve_job_metadata_item( # type: ignore + job_id=job_id, key="comment" + ) + + if not metadata: + return None + + if not isinstance(metadata, CommentMetadata): + raise KiaraException( + msg=f"Metadata item 'comment' for job '{job_id}' is not a comment." + ) + return metadata.comment + def render_value( self, value: Union[str, uuid.UUID, Value], diff --git a/src/kiara/interfaces/python_api/workflow.py b/src/kiara/interfaces/python_api/workflow.py index 69355c0ee..971d6a7a2 100644 --- a/src/kiara/interfaces/python_api/workflow.py +++ b/src/kiara/interfaces/python_api/workflow.py @@ -659,7 +659,7 @@ def _apply_inputs(self) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]] is_resolved=step_details.step.module.manifest.is_resolved, inputs=step_details.inputs, ) - match = self._kiara.job_registry.find_matching_job_record( + match = self._kiara.job_registry.find_job_record_for_manifest( inputs_manifest=job_config ) if match: diff --git a/src/kiara/models/module/jobs.py b/src/kiara/models/module/jobs.py index 3f6b4e332..8d20623f1 100644 --- a/src/kiara/models/module/jobs.py +++ b/src/kiara/models/module/jobs.py @@ -12,7 +12,6 @@ from enum import Enum from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Union -from pydantic import field_validator from pydantic.fields import Field, PrivateAttr from pydantic.main import BaseModel from rich import box @@ -22,6 +21,7 @@ from kiara.exceptions import InvalidValuesException, KiaraException from kiara.models import KiaraModel from kiara.models.module.manifest import InputsManifest +from kiara.utils.dates import get_current_time_incl_timezone if TYPE_CHECKING: from kiara.context import DataRegistry, Kiara @@ -74,6 +74,12 @@ def add_log(self, msg: str, log_level: int = logging.DEBUG): self.log.append(_msg) +class PipelineMetadata(BaseModel): + + pipeline_id: uuid.UUID = Field(description="The id of the pipeline.") + step_id: str = Field(description="The id of the step in the pipeline.") + + class JobConfig(InputsManifest): _kiara_model_id: ClassVar = "instance.job_config" @@ -113,6 +119,13 @@ def create_from_module( def _retrieve_data_to_hash(self) -> Any: return {"manifest": self.manifest_cid, "inputs": self.inputs_cid} + pipeline_metadata: Union[PipelineMetadata, None] = Field( + description="Metadata for the pipeline this job is part of.", default=None + ) + # job_metadata: Mapping[str, Any] = Field( + # description="Optional metadata for this job.", default_factory=dict + # ) + class ActiveJob(KiaraModel): @@ -126,7 +139,8 @@ class ActiveJob(KiaraModel): ) job_log: JobLog = Field(description="The lob jog.") submitted: datetime = Field( - description="When the job was submitted.", default_factory=datetime.now + description="When the job was submitted.", + default_factory=get_current_time_incl_timezone, ) started: Union[datetime, None] = Field( description="When the job was started.", default=None @@ -238,6 +252,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): job_record = JobRecord( job_id=active_job.job_id, + job_submitted=active_job.submitted, module_type=active_job.job_config.module_type, module_config=active_job.job_config.module_config, is_resolved=active_job.job_config.is_resolved, @@ -255,6 +270,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): return job_record job_id: uuid.UUID = Field(description="The globally unique id for this job.") + job_submitted: datetime = Field(description="When the job was submitted.") environment_hashes: Mapping[str, Mapping[str, str]] = Field( description="Hashes for the environments this value was created in." ) @@ -273,20 +289,20 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): runtime_details: Union[JobRuntimeDetails, None] = Field( description="Runtime details for the job." ) - job_metadata: Mapping[str, Any] = Field( - description="Optional metadata for this job.", default_factory=dict - ) + # job_metadata: Mapping[str, Any] = Field( + # description="Optional metadata for this job.", default_factory=dict + # ) _is_stored: bool = PrivateAttr(default=None) _outputs_hash: Union[int, None] = PrivateAttr(default=None) - @field_validator("job_metadata", mode="before") - @classmethod - def validate_metadata(cls, value): - - if value is None: - value = {} - return value + # @field_validator("job_metadata", mode="before") + # @classmethod + # def validate_metadata(cls, value): + # + # if value is None: + # value = {} + # return value def _retrieve_data_to_hash(self) -> Any: return { @@ -324,3 +340,30 @@ def create_renderable(self, **config: Any) -> RenderableType: # h = DeepHash(obj, hasher=KIARA_HASH_FUNCTION) # self._outputs_hash = h[obj] # return self._outputs_hash + + +class JobMatcher(KiaraModel): + @classmethod + def create_matcher(self, **match_options: Any): + + m = JobMatcher(**match_options) + return m + + job_ids: List[uuid.UUID] = Field( + description="A list of job ids, if specified, only jobs with one of these ids will be included.", + default_factory=list, + ) + earliest: Union[None, datetime] = Field( + description="The earliest time when the job was created.", default=None + ) + latest: Union[None, datetime] = Field( + description="The latest time when the job was created.", default=None + ) + operation_inputs: List[uuid.UUID] = Field( + description="A list of value ids, if specified, only jobs that use one of them will be included.", + default_factory=list, + ) + produced_outputs: List[uuid.UUID] = Field( + description="A list of value ids, if specified, only jobs that produced one of them will be included.", + default_factory=list, + ) diff --git a/src/kiara/models/module/pipeline/controller.py b/src/kiara/models/module/pipeline/controller.py index 0f329a7de..b3a49f46f 100644 --- a/src/kiara/models/module/pipeline/controller.py +++ b/src/kiara/models/module/pipeline/controller.py @@ -132,12 +132,23 @@ def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID: --------- step_id: the id of the step that should be started """ + + from kiara.models.module.jobs import PipelineMetadata + job_config = self.pipeline.create_job_config_for_step(step_id) - job_metadata = {"is_pipeline_step": True, "step_id": step_id} - job_id = self._job_registry.execute_job( - job_config=job_config, job_metadata=job_metadata + # pipeline_metadata = { + # "is_pipeline_step": True, + # "step_id": step_id, + # "pipeline_id": self.pipeline.pipeline_id, + # } + + pipeline_metadata = PipelineMetadata( + pipeline_id=self.pipeline.pipeline_id, step_id=step_id ) + job_config.pipeline_metadata = pipeline_metadata + + job_id = self._job_registry.execute_job(job_config=job_config) # job_id = self._processor.create_job(job_config=job_config) # self._processor.queue_job(job_id=job_id) diff --git a/src/kiara/models/values/value.py b/src/kiara/models/values/value.py index 53bac471b..c349a501d 100644 --- a/src/kiara/models/values/value.py +++ b/src/kiara/models/values/value.py @@ -792,6 +792,10 @@ class Value(ValueDetails): description="Backlinks to values that this value acts as destiny/or property for.", default_factory=dict, ) + job_id: Union[uuid.UUID, None] = Field( + description="The id of the job that created this value (if applicable).", + default=None, + ) def add_property( self, diff --git a/src/kiara/processing/__init__.py b/src/kiara/processing/__init__.py index 5e605070a..10d6da1ba 100644 --- a/src/kiara/processing/__init__.py +++ b/src/kiara/processing/__init__.py @@ -7,13 +7,19 @@ import abc import uuid -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, Protocol, Union +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Protocol, Set, Union import structlog from pydantic import BaseModel -from kiara.exceptions import KiaraProcessingException -from kiara.models.module.jobs import ActiveJob, JobConfig, JobLog, JobRecord, JobStatus +from kiara.exceptions import KiaraException, KiaraProcessingException +from kiara.models.module.jobs import ( + ActiveJob, + JobConfig, + JobLog, + JobRecord, + JobStatus, +) from kiara.models.values.value import ( ValueMap, ValueMapReadOnly, @@ -57,6 +63,7 @@ def __init__(self, kiara: "Kiara"): self._finished_jobs: Dict[uuid.UUID, ActiveJob] = {} self._output_refs: Dict[uuid.UUID, ValueMapWritable] = {} self._job_records: Dict[uuid.UUID, JobRecord] = {} + self._auto_save_jobs: Set[uuid.UUID] = set() self._listeners: List[JobStatusListener] = [] @@ -100,7 +107,7 @@ def get_job_record(self, job_id: uuid.UUID) -> JobRecord: raise Exception(f"No job record for job with id '{job_id}' registered.") def create_job( - self, job_config: JobConfig, job_metadata: Union[None, Mapping[str, Any]] + self, job_config: JobConfig, auto_save_result: bool = False ) -> uuid.UUID: environments = { @@ -108,9 +115,6 @@ def create_job( for env_name, env in self._kiara.current_environments.items() } - if job_metadata is None: - job_metadata = {} - result_pedigree = ValuePedigree( kiara_id=self._kiara.id, module_type=job_config.module_type, @@ -137,13 +141,14 @@ def create_job( ID_REGISTRY.update_metadata(job_id, obj=job) job.job_log.add_log("job created") - job_details = { + job_details: Dict[str, Any] = { "job_config": job_config, "job": job, "module": module, "outputs": outputs, - "job_metadata": job_metadata, } + job_details["pipeline_metadata"] = job_config.pipeline_metadata + self._created_jobs[job_id] = job_details self._send_job_event( @@ -159,10 +164,10 @@ def create_job( or dev_settings.log.pre_run.internal_modules ): - is_pipeline_step = job_metadata.get("is_pipeline_step", False) + is_pipeline_step = job_config.pipeline_metadata is not None if is_pipeline_step: if dev_settings.log.pre_run.pipeline_steps: - step_id = job_metadata.get("step_id", None) + step_id = job_config.pipeline_metadata.step_id # type: ignore assert step_id is not None title = ( f"Pre-run information for pipeline step: [i]{step_id}[/i]" @@ -184,6 +189,9 @@ def create_job( ) log_dev_message(table, title=title) + if auto_save_result: + self._auto_save_jobs.add(job_id) + return job_id def queue_job(self, job_id: uuid.UUID) -> ActiveJob: @@ -252,15 +260,20 @@ def job_status_updated( old_status = job.status + result_values = None + if status == JobStatus.SUCCESS: self._active_jobs.pop(job_id) job.job_log.add_log("job finished successfully") job.status = JobStatus.SUCCESS job.finished = get_current_time_incl_timezone() - values = self._output_refs[job_id] + result_values = self._output_refs[job_id] try: - values.sync_values() - value_ids = values.get_all_value_ids() + result_values.sync_values() + for field, val in result_values.items(): + val.job_id = job_id + + value_ids = result_values.get_all_value_ids() job.results = value_ids job.job_log.percent_finished = 100 job_record = JobRecord.from_active_job( @@ -332,15 +345,16 @@ def job_status_updated( and not dev_config.log.post_run.internal_modules ): skip = True - is_pipeline_step = details["job_metadata"].get( - "is_pipeline_step", False - ) + + pipeline_metadata = details.get("pipeline_metadata", None) + is_pipeline_step = pipeline_metadata is not None + if is_pipeline_step and not dev_config.log.post_run.pipeline_steps: skip = True if not skip: if is_pipeline_step: - step_id = details["job_metadata"]["step_id"] + step_id = pipeline_metadata.step_id # type: ignore title = f"Post-run information for pipeline step: {step_id}" else: title = f"Post-run information for module: {module.module_type_name}" @@ -362,6 +376,19 @@ def job_status_updated( job_id=job_id, old_status=old_status, new_status=job.status ) + if status is JobStatus.SUCCESS: + if job_id in self._auto_save_jobs: + assert result_values is not None + try: + for val in result_values.values(): + self._kiara.data_registry.store_value(val) + except Exception as e: + log_exception(e) + raise KiaraException( + msg=f"Failed to auto-save job results for job: {job_id}", + parent=e, + ) + def wait_for(self, *job_ids: uuid.UUID): """Wait for the jobs with the specified ids, also optionally sync their outputs with the pipeline value state.""" self._wait_for(*job_ids) diff --git a/src/kiara/registries/data/data_store/__init__.py b/src/kiara/registries/data/data_store/__init__.py index b19d09cf4..52fe8280c 100644 --- a/src/kiara/registries/data/data_store/__init__.py +++ b/src/kiara/registries/data/data_store/__init__.py @@ -129,9 +129,18 @@ def retrieve_value(self, value_id: uuid.UUID) -> Value: value_created = get_earliest_time_incl_timezone() pedigree = ValuePedigree(**value_data["pedigree"]) + + job_id_str = value_data.get("job_id", None) + # TODO: check for this to be not-Null at some stage, once we can be sure it's always set (after release) + if job_id_str is not None: + job_id: Union[None, uuid.UUID] = uuid.UUID(job_id_str) + else: + job_id = None + value = Value( value_id=value_data["value_id"], kiara_id=self.kiara_context.id, + job_id=job_id, value_schema=value_schema, value_created=value_created, value_status=value_data["value_status"], diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index 026ac419b..db3bdb57b 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -7,7 +7,8 @@ import abc import uuid -from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Type, Union +from datetime import datetime +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Type, Union import structlog from bidict import bidict @@ -350,22 +351,55 @@ def get_job_record(self, job_id: uuid.UUID) -> Union[JobRecord, None]: raise NotImplementedError() - def retrieve_all_job_records(self) -> Mapping[str, JobRecord]: + def find_job_records(self, matcher: JobMatcher) -> Mapping[uuid.UUID, JobRecord]: - all_records: Dict[str, JobRecord] = {} + raise NotImplementedError("Job matching is Not implemented yet.") + + def retrieve_all_job_record_ids(self) -> List[uuid.UUID]: + """Retrieve a list of all available job record ids, sorted from latest to earliest.""" + + all_records: Dict[uuid.UUID, datetime] = {} + for archive in self.job_archives.values(): + all_record_ids = archive.retrieve_all_job_ids() + # TODO: check for duplicates and mismatching datetimes + all_records.update(all_record_ids) + + all_ids_sorted = [ + uuid + for uuid, _ in sorted( + all_records.items(), key=lambda item: item[1], reverse=True + ) + ] + + return all_ids_sorted + + def retrieve_all_job_records(self) -> Mapping[uuid.UUID, JobRecord]: + """Retrieves all job records from all job archives. + + Returns: + a map of job-id/job-record pairs, sorted by job submission time, from latest to earliest + """ + + all_records: Dict[uuid.UUID, JobRecord] = {} for archive in self.job_archives.values(): - all_record_ids = archive.retrieve_all_job_hashes() - if all_record_ids is None: - return {} + all_record_ids = archive.retrieve_all_job_ids().keys() for r in all_record_ids: assert r not in all_records.keys() - job_record = archive.retrieve_record_for_job_hash(r) + job_record = archive.retrieve_record_for_job_id(r) assert job_record is not None all_records[r] = job_record - return all_records + all_records_sorted = dict( + sorted( + all_records.items(), + key=lambda item: item[1].job_submitted, + reverse=True, + ) + ) + + return all_records_sorted - def find_matching_job_record( + def find_job_record_for_manifest( self, inputs_manifest: InputsManifest ) -> Union[uuid.UUID, None]: """ @@ -416,6 +450,10 @@ def find_matching_job_record( def prepare_job_config( self, manifest: Manifest, inputs: Mapping[str, Any] ) -> JobConfig: + """Prepare a JobConfig instance from the manifest and inputs. + + This involves creating (and therefor validating) a module instance, as well as making sure the inputs are valid. + """ module = self._kiara.module_registry.create_module(manifest=manifest) @@ -430,18 +468,35 @@ def execute( manifest: Manifest, inputs: Mapping[str, Any], wait: bool = False, - job_metadata: Union[None, Any] = None, ) -> uuid.UUID: + """Prepare a job config, then execute it.""" job_config = self.prepare_job_config(manifest=manifest, inputs=inputs) - return self.execute_job(job_config, wait=wait, job_metadata=job_metadata) + return self.execute_job(job_config, wait=wait) def execute_job( - self, - job_config: JobConfig, - wait: bool = False, - job_metadata: Union[None, Any] = None, + self, job_config: JobConfig, wait: bool = False, auto_save_result=False ) -> uuid.UUID: + """Execute the job specified by the job config. + + Arguments: + job_config: the job config + wait: whether to wait for the job to finish + auto_save_result: whether to automatically save the job's outputs to the data registry once the job finished successfully + """ + + # from kiara.models.metadata import CommentMetadata + # if "comment" not in job_metadata.keys(): + # raise KiaraException("You need to provide a 'comment' for the job.") + # + # comment = job_metadata.get("comment") + # if not isinstance(comment, str): + # raise KiaraException("The 'comment' must be a string.") + # + # comment_metadata = CommentMetadata(comment=comment) + # self.context.metadata_registry.register_metadata_item( + # key="comment", item=comment_metadata, force=False, store=None + # ) if job_config.module_type != "pipeline": log = logger.bind( @@ -459,7 +514,16 @@ def execute_job( job_hash=job_config.job_hash, ) - stored_job = self.find_matching_job_record(inputs_manifest=job_config) + stored_job = self.find_job_record_for_manifest(inputs_manifest=job_config) + + is_pipeline_step = False if job_config.pipeline_metadata is None else True + if is_pipeline_step: + pipeline_step_id: Union[None, str] = job_config.pipeline_metadata.step_id # type: ignore + pipeline_id: Union[None, uuid.UUID] = job_config.pipeline_metadata.pipeline_id # type: ignore + else: + pipeline_step_id = None + pipeline_id = None + if stored_job is not None: log.debug( "job.use_cached", @@ -469,9 +533,8 @@ def execute_job( if is_develop(): module = self._kiara.module_registry.create_module(manifest=job_config) - if job_metadata and job_metadata.get("is_pipeline_step", True): - step_id = job_metadata.get("step_id", None) - title = f"Using cached pipeline step: {step_id}" + if is_pipeline_step: + title = f"Using cached pipeline step: {pipeline_step_id}" else: title = f"Using cached job for: {module.module_type_name}" @@ -495,22 +558,22 @@ def execute_job( panel = Group(table, table_job_record) log_dev_message(panel, title=title) - return stored_job + # TODO: in this case, and if 'auto_save_result' is True, we should also verify the outputs are saved? - if job_metadata is None: - job_metadata = {} + return stored_job - is_pipeline_step = job_metadata.get("is_pipeline_step", False) dbg_data = { "module_type": job_config.module_type, "is_pipeline_step": is_pipeline_step, } if is_pipeline_step: - dbg_data["step_id"] = job_metadata["step_id"] + dbg_data["step_id"] = pipeline_step_id + dbg_data["pipeline_id"] = str(pipeline_id) + log.debug("job.execute", **dbg_data) job_id = self._processor.create_job( - job_config=job_config, job_metadata=job_metadata + job_config=job_config, auto_save_result=auto_save_result ) self._active_jobs[job_config.job_hash] = job_id diff --git a/src/kiara/registries/jobs/job_store/__init__.py b/src/kiara/registries/jobs/job_store/__init__.py index 449590dbd..0527208a5 100644 --- a/src/kiara/registries/jobs/job_store/__init__.py +++ b/src/kiara/registries/jobs/job_store/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import abc -from typing import Iterable, Union +import uuid +from datetime import datetime +from typing import Iterable, Mapping, Union from kiara.models.module.jobs import JobRecord from kiara.registries import BaseArchive @@ -32,6 +34,24 @@ def retrieve_all_job_hashes( If the job archive retrieves its jobs in a dynamic way, this will return 'None'. """ + @abc.abstractmethod + def _retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]: + """ + Retrieve a list of all job record ids in the archive, along with when they where submitted. + """ + + def retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]: + """Retrieve a list of all job ids in the archive, along with when they where submitted.""" + return self._retrieve_all_job_ids() + + @abc.abstractmethod + def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]: + pass + + def retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]: + job_record = self._retrieve_record_for_job_id(job_id=job_id) + return job_record + @abc.abstractmethod def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: pass diff --git a/src/kiara/registries/jobs/job_store/filesystem_store.py b/src/kiara/registries/jobs/job_store/filesystem_store.py index 048d77f4b..790b619dc 100644 --- a/src/kiara/registries/jobs/job_store/filesystem_store.py +++ b/src/kiara/registries/jobs/job_store/filesystem_store.py @@ -180,11 +180,8 @@ class FileSystemJobStore(FileSystemJobArchive, JobStore): def store_job_record(self, job_record: JobRecord): manifest_cid = job_record.manifest_cid - # inputs_hash = job_record.inputs_data_hash - # manifest_hash = job_record.manifest_hash input_ids_hash = job_record.input_ids_hash - # inputs_hash = job_record.inputs_data_hash base_path = self.job_store_path / MANIFEST_SUB_PATH manifest_folder = base_path / str(manifest_cid) diff --git a/src/kiara/registries/jobs/job_store/sqlite_store.py b/src/kiara/registries/jobs/job_store/sqlite_store.py index 2b8b39697..82e60307c 100644 --- a/src/kiara/registries/jobs/job_store/sqlite_store.py +++ b/src/kiara/registries/jobs/job_store/sqlite_store.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import uuid +from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, Mapping, Union @@ -114,6 +116,7 @@ def sqlite_engine(self) -> "Engine": CREATE TABLE IF NOT EXISTS job_records ( job_id TEXT PRIMARY KEY, job_hash TEXT TEXT NOT NULL, + job_submitted TEXT NOT NULL, manifest_hash TEXT NOT NULL, input_ids_hash TEXT NOT NULL, inputs_data_hash TEXT NOT NULL, @@ -146,6 +149,36 @@ def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None] job_record = JobRecord(**job_record_data) return job_record + def _retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]: + """ + Retrieve a list of all job record ids in the archive. + """ + + sql = text( + "SELECT job_id, job_submitted FROM job_records ORDER BY job_submitted DESC;" + ) + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql) + return {uuid.UUID(row[0]): datetime.fromisoformat(row[1]) for row in result} + + def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]: + + sql = text("SELECT job_metadata FROM job_records WHERE job_id = :job_id") + + params = {"job_id": str(job_id)} + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql, params) + row = result.fetchone() + if not row: + return None + + job_record_json = row[0] + job_record_data = orjson.loads(job_record_json) + job_record = JobRecord(**job_record_data) + return job_record + def retrieve_all_job_hashes( self, manifest_hash: Union[str, None] = None, @@ -222,11 +255,14 @@ def store_job_record(self, job_record: JobRecord): job_record_json = job_record.model_dump_json() + job_submitted = job_record.job_submitted.isoformat() + sql = text( - "INSERT OR IGNORE INTO job_records(job_id, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)" + "INSERT OR IGNORE INTO job_records(job_id, job_submitted, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_submitted, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)" ) params = { "job_id": str(job_record.job_id), + "job_submitted": job_submitted, "job_hash": job_hash, "manifest_hash": manifest_hash, "input_ids_hash": input_ids_hash, diff --git a/src/kiara/registries/metadata/__init__.py b/src/kiara/registries/metadata/__init__.py index 988c8bc90..dc802f1aa 100644 --- a/src/kiara/registries/metadata/__init__.py +++ b/src/kiara/registries/metadata/__init__.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- import uuid -from typing import TYPE_CHECKING, Callable, Dict, Literal, Mapping, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Mapping, Union from pydantic import Field from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER from kiara.models.events import RegistryEvent -from kiara.models.metadata import KiaraMetadata +from kiara.models.metadata import CommentMetadata, KiaraMetadata from kiara.registries.metadata.metadata_store import MetadataArchive, MetadataStore if TYPE_CHECKING: @@ -142,14 +142,83 @@ def get_archive( f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered." ) + def retrieve_metadata_item( + self, + key: str, + reference_item_type: Union[str, None] = None, + reference_item_id: Union[str, None] = None, + store: Union[str, uuid.UUID, None] = None, + ) -> Union[KiaraMetadata, None]: + """Retrieves a metadata item.""" + + mounted_store: MetadataStore = self.get_archive(archive_id_or_alias=store) # type: ignore + + result = mounted_store.retrieve_metadata_item( + key=key, reference_type=reference_item_type, reference_id=reference_item_id + ) + + if result is None: + return None + + model_type_id, data = result + model_cls = self._kiara.kiara_model_registry.get_model_cls( + kiara_model_id=model_type_id, required_subclass=KiaraMetadata + ) + + model_instance = model_cls(**data) + return model_instance # type: ignore + def register_metadata_item( self, key: str, item: KiaraMetadata, + reference_item_type: Union[str, None] = None, + reference_item_id: Union[str, None] = None, force: bool = False, store: Union[str, uuid.UUID, None] = None, ) -> uuid.UUID: mounted_store: MetadataStore = self.get_archive(archive_id_or_alias=store) # type: ignore - return mounted_store.store_metadata_item(key=key, item=item, force=force) + return mounted_store.store_metadata_item( + key=key, + item=item, + reference_item_type=reference_item_type, + reference_item_id=reference_item_id, + force=force, + ) + + def register_job_metadata_items( + self, + job_id: uuid.UUID, + items: Mapping[str, Any], + store: Union[str, uuid.UUID, None] = None, + ) -> None: + + for key, value in items.items(): + if isinstance(value, str): + value = CommentMetadata(comment=value) + elif not isinstance(value, KiaraMetadata): + raise Exception(f"Invalid metadata value for key '{key}': {value}") + self.register_metadata_item( + key=key, + item=value, + reference_item_type="job", + reference_item_id=str(job_id), + store=store, + ) + + def retrieve_job_metadata_items(self, job_id: uuid.UUID): + + pass + + def retrieve_job_metadata_item( + self, job_id: uuid.UUID, key: str, store: Union[str, uuid.UUID, None] = None + ) -> Union[KiaraMetadata, None]: + + return self.retrieve_metadata_item( + key=key, + reference_item_type="job", + reference_item_id=str(job_id), + store=store, + ) diff --git a/src/kiara/registries/metadata/metadata_store/__init__.py b/src/kiara/registries/metadata/metadata_store/__init__.py index f7b543c10..6f32bf233 100644 --- a/src/kiara/registries/metadata/metadata_store/__init__.py +++ b/src/kiara/registries/metadata/metadata_store/__init__.py @@ -2,8 +2,9 @@ import abc import json import uuid -from typing import Any, Dict, Generic, Iterable, Union +from typing import Any, Dict, Generic, Iterable, Mapping, Tuple, Union +from kiara.exceptions import KiaraException from kiara.models.metadata import KiaraMetadata from kiara.registries import ARCHIVE_CONFIG_CLS, BaseArchive @@ -30,14 +31,39 @@ def __init__( force_read_only=force_read_only, ) - def retrieve_metadata_value( + def retrieve_metadata_item( self, key: str, - metadata_model: Union[str, None] = None, + reference_type: Union[str, None] = None, reference_id: Union[str, None] = None, - ) -> Any: + ) -> Union[Tuple[str, Mapping[str, Any]], None]: - pass + if reference_id and not reference_type: + raise ValueError( + "If reference_id is set, reference_type must be set as well." + ) + if reference_type: + if reference_id is None: + raise KiaraException( + msg="reference_id must set also if reference_type is set." + ) + result = self._retrieve_referenced_metadata_item_data( + key=key, reference_type=reference_type, reference_id=reference_id + ) + if result is None: + return None + else: + return result + else: + raise NotImplementedError( + "Retrieving metadata item without reference not implemented yet." + ) + + @abc.abstractmethod + def _retrieve_referenced_metadata_item_data( + self, key: str, reference_type: str, reference_id: str + ) -> Union[Tuple[str, Mapping[str, Any]], None]: + """Return the model type id and model data for the specified referenced metadata item.""" class MetadataStore(MetadataArchive): @@ -69,22 +95,12 @@ def store_metadata_item( self, key: str, item: KiaraMetadata, - reference_item: Any = None, + reference_item_type: Union[str, None] = None, + reference_item_id: Union[str, None] = None, force: bool = False, store: Union[str, uuid.UUID, None] = None, ) -> uuid.UUID: - if reference_item: - raise NotImplementedError( - "Cannot store metadata item with reference item, not implemented yet." - ) - - GLOBAL_REFERENCE_TYPE = "global" - DEFAULT_GLOBAL_REFERENCE_ID = "default" - - reference_item_type = GLOBAL_REFERENCE_TYPE - reference_item_id = DEFAULT_GLOBAL_REFERENCE_ID - if store: raise NotImplementedError( "Cannot store metadata item with store, not implemented yet." @@ -112,13 +128,30 @@ def store_metadata_item( value_hash=data_hash, model_type_id=model_type, model_schema_hash=model_schema_hash, - reference_item_type=reference_item_type, - reference_item_id=reference_item_id, force=force, ) + if (reference_item_id and not reference_item_type) or ( + reference_item_type and not reference_item_id + ): + raise ValueError( + "If reference_item_id is set, reference_item_type must be set as well." + ) + + if reference_item_type: + assert reference_item_id is not None + self._store_metadata_reference( + reference_item_type, reference_item_id, str(metadata_item_id) + ) + return metadata_item_id + @abc.abstractmethod + def _store_metadata_reference( + self, reference_item_type: str, reference_item_id: str, metadata_item_id: str + ) -> None: + pass + @abc.abstractmethod def _store_metadata_item( self, @@ -127,8 +160,6 @@ def _store_metadata_item( value_hash: str, model_type_id: str, model_schema_hash: str, - reference_item_type: str, - reference_item_id: str, force: bool = False, ) -> uuid.UUID: pass diff --git a/src/kiara/registries/metadata/metadata_store/sqlite_store.py b/src/kiara/registries/metadata/metadata_store/sqlite_store.py index a077f03fe..18ffa389d 100644 --- a/src/kiara/registries/metadata/metadata_store/sqlite_store.py +++ b/src/kiara/registries/metadata/metadata_store/sqlite_store.py @@ -1,8 +1,9 @@ # -*- coding: utf-8 -*- import uuid from pathlib import Path -from typing import Any, Dict, Mapping, Union +from typing import Any, Dict, Mapping, Tuple, Union +import orjson from sqlalchemy import text from sqlalchemy.engine import Engine, create_engine @@ -121,11 +122,14 @@ def sqlite_engine(self) -> "Engine": metadata_item_hash TEXT NOT NULL, model_type_id TEXT NOT NULL, model_schema_hash TEXT NOT NULL, + metadata_value TEXT NOT NULL, + FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash) +); +CREATE TABLE IF NOT EXISTS metadata_references ( reference_item_type TEXT NOT NULL, reference_item_id TEXT NOT NULL, - metadata_value TEXT NOT NULL, - FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash), - UNIQUE (metadata_item_key, reference_item_type, reference_item_id) + metadata_item_id TEXT NOT NULL, + FOREIGN KEY (metadata_item_id) REFERENCES metadata (metadata_item_id) ); """ @@ -138,6 +142,35 @@ def sqlite_engine(self) -> "Engine": # event.listen(self._cached_engine, "connect", _pragma_on_connect) return self._cached_engine + def _retrieve_referenced_metadata_item_data( + self, key: str, reference_type: str, reference_id: str + ) -> Union[Tuple[str, Mapping[str, Any]], None]: + + sql = text( + """ + SELECT m.model_type_id, m.metadata_value + FROM metadata m + JOIN metadata_references r ON m.metadata_item_id = r.metadata_item_id + WHERE r.reference_item_type = :reference_type AND r.reference_item_id = :reference_id and m.metadata_item_key = :key + """ + ) + + with self.sqlite_engine.connect() as connection: + parmas = { + "reference_type": reference_type, + "reference_id": reference_id, + "key": key, + } + result = connection.execute(sql, parmas) + row = result.fetchone() + if row is None: + return None + + data_str = row[1] + data = orjson.loads(data_str) + + return (row[0], data) + class SqliteMetadataStore(SqliteMetadataArchive, MetadataStore): @@ -203,8 +236,6 @@ def _store_metadata_item( value_hash: str, model_type_id: str, model_schema_hash: str, - reference_item_type: str, - reference_item_id: str, force: bool = False, ) -> uuid.UUID: @@ -212,11 +243,11 @@ def _store_metadata_item( if force: sql = text( - "INSERT OR REPLACE INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, reference_item_type, reference_item_id, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :reference_item_type, :reference_item_id, :metadata_value)" + "INSERT OR REPLACE INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)" ) else: sql = text( - "INSERT INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, reference_item_type, reference_item_id, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :reference_item_type, :reference_item_id, :metadata_value)" + "INSERT INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)" ) metadata_item_id = ID_REGISTRY.generate(comment="new metadata item id") @@ -227,12 +258,27 @@ def _store_metadata_item( "metadata_item_hash": value_hash, "model_type_id": model_type_id, "model_schema_hash": model_schema_hash, - "reference_item_type": reference_item_type, - "reference_item_id": reference_item_id, "metadata_value": value_json, } + with self.sqlite_engine.connect() as conn: conn.execute(sql, params) conn.commit() return metadata_item_id + + def _store_metadata_reference( + self, reference_item_type: str, reference_item_id: str, metadata_item_id: str + ) -> None: + + sql = text( + "INSERT INTO metadata_references (reference_item_type, reference_item_id, metadata_item_id) VALUES (:reference_item_type, :reference_item_id, :metadata_item_id)" + ) + params = { + "reference_item_type": reference_item_type, + "reference_item_id": reference_item_id, + "metadata_item_id": metadata_item_id, + } + with self.sqlite_engine.connect() as conn: + conn.execute(sql, params) + conn.commit() diff --git a/src/kiara/resources/templates/render/pipeline/python_script.py.j2 b/src/kiara/resources/templates/render/pipeline/python_script.py.j2 index 2aeea6951..177f29ebe 100644 --- a/src/kiara/resources/templates/render/pipeline/python_script.py.j2 +++ b/src/kiara/resources/templates/render/pipeline/python_script.py.j2 @@ -32,10 +32,10 @@ inputs_{{ step_id }} = { {% endfor -%} } {% if not step.manifest_src.module_config -%} -results_{{ step_id }} = kiara.run_job('{{ step.manifest_src.module_type }}', inputs=inputs_{{ step_id }}) +results_{{ step_id }} = kiara.run_job('{{ step.manifest_src.module_type }}', inputs=inputs_{{ step_id }}, comment="") {% else -%} step_config_{{ step_id }} = {{ step.manifest_src.module_config }} -results_{{ step_id }} = kiara.run_job('{{ step.manifest_src.module_type }}', operation_config=step_config_{{ step_id }}, inputs=inputs_{{ step_id }}) +results_{{ step_id }} = kiara.run_job('{{ step.manifest_src.module_type }}', operation_config=step_config_{{ step_id }}, inputs=inputs_{{ step_id }}, comment="") {% endif -%} {% endfor -%} {% endfor %} diff --git a/src/kiara/utils/cli/run.py b/src/kiara/utils/cli/run.py index 354b7c214..099561e58 100644 --- a/src/kiara/utils/cli/run.py +++ b/src/kiara/utils/cli/run.py @@ -333,9 +333,17 @@ def execute_job( save_results: bool, aliases: Union[None, Mapping[str, List[str]]], properties: bool = False, + comment: Union[str, None] = None, ) -> uuid.UUID: """Execute the job.""" - job_id = api.queue_job(operation=operation, inputs=inputs) + + job_metadata = {} + if comment is not None: + job_metadata["comment"] = comment + + job_id = api.queue_job( + operation=operation, inputs=inputs, operation_config=None, **job_metadata + ) try: outputs = api.get_job_result(job_id=job_id) diff --git a/src/kiara/utils/debug.py b/src/kiara/utils/debug.py index a620584c9..1f26e4347 100644 --- a/src/kiara/utils/debug.py +++ b/src/kiara/utils/debug.py @@ -151,6 +151,16 @@ def create_post_run_table( else: table.add_row("module_config", module.config) + if job_config.pipeline_metadata is not None: + pm_table = Table(show_header=False, box=box.SIMPLE) + pm_table.add_column("key") + pm_table.add_column("value") + pm_table.add_row("pipeline_id", str(job_config.pipeline_metadata.pipeline_id)) + pm_table.add_row("step_id", job_config.pipeline_metadata.step_id) + table.add_row("pipeline_step_metadata", pm_table) + else: + table.add_row("pipeline_step_metadata", "-- not a pipeline step --") + inputs_details = dev_config.log.post_run.inputs_info if inputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]: if inputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]: diff --git a/tests/conftest.py b/tests/conftest.py index fc20e89a8..2371f5051 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -90,6 +90,8 @@ def kiara() -> Kiara: instance_path = create_temp_dir() kc = KiaraConfig.create_in_folder(instance_path) + kc.runtime_config.runtime_profile = "default" + kiara = kc.create_context() return kiara @@ -99,6 +101,7 @@ def api() -> KiaraAPI: instance_path = create_temp_dir() kc = KiaraConfig.create_in_folder(instance_path) + kc.runtime_config.runtime_profile = "default" api = KiaraAPI(kc) return api @@ -111,6 +114,7 @@ def presseeded_data_store_minimal() -> Kiara: pipeline_file = os.path.join(PIPELINES_FOLDER, "table_import.json") kc = KiaraConfig.create_in_folder(instance_path) + kc.runtime_config.runtime_profile = "default" kiara = kc.create_context() @@ -133,6 +137,8 @@ def preseeded_data_store() -> Kiara: instance_path = create_temp_dir() kc = KiaraConfig.create_in_folder(instance_path) + kc.runtime_config.runtime_profile = "default" + kiara = kc.create_context() pipeline = os.path.join(PIPELINES_FOLDER, "test_preseed_1.yaml")