From f7abd322e6bd04f33ed9c82c280878d7e2f06656 Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Tue, 5 Mar 2024 14:04:27 +0100 Subject: [PATCH] feat: add metadata stores and registry classes --- pyproject.toml | 2 + src/kiara/context/__init__.py | 25 +- src/kiara/context/config.py | 83 ++++-- src/kiara/defaults.py | 3 + src/kiara/interfaces/python_api/__init__.py | 42 +++- .../interfaces/python_api/models/archive.py | 46 ++++ src/kiara/models/__init__.py | 23 +- src/kiara/models/archives.py | 2 + src/kiara/models/metadata/__init__.py | 18 ++ src/kiara/registries/jobs/__init__.py | 48 +--- .../registries/jobs/job_store/__init__.py | 52 ++++ src/kiara/registries/metadata/__init__.py | 155 ++++++++++++ .../metadata/metadata_store/__init__.py | 134 ++++++++++ .../metadata/metadata_store/sqlite_store.py | 238 ++++++++++++++++++ 14 files changed, 793 insertions(+), 78 deletions(-) create mode 100644 src/kiara/models/metadata/__init__.py create mode 100644 src/kiara/registries/metadata/__init__.py create mode 100644 src/kiara/registries/metadata/metadata_store/__init__.py create mode 100644 src/kiara/registries/metadata/metadata_store/sqlite_store.py diff --git a/pyproject.toml b/pyproject.toml index 8a356db48..a6d883311 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,8 @@ filesystem_workflow_archive = "kiara.registries.workflows.archives:FileSystemWor filesystem_workflow_store = "kiara.registries.workflows.archives:FileSystemWorkflowStore" sqlite_data_archive = "kiara.registries.data.data_store.sqlite_store:SqliteDataArchive" sqlite_data_store = "kiara.registries.data.data_store.sqlite_store:SqliteDataStore" +sqlite_metadata_archive = "kiara.registries.metadata.metadata_store.sqlite_store:SqliteMetadataArchive" +sqlite_metadata_store = "kiara.registries.metadata.metadata_store.sqlite_store:SqliteMetadataStore" sqlite_alias_archive = "kiara.registries.aliases.sqlite_store:SqliteAliasArchive" sqlite_alias_store = "kiara.registries.aliases.sqlite_store:SqliteAliasStore" sqlite_job_archive = "kiara.registries.jobs.job_store.sqlite_store:SqliteJobArchive" diff --git a/src/kiara/context/__init__.py b/src/kiara/context/__init__.py index f5e1af620..f23a2a837 100644 --- a/src/kiara/context/__init__.py +++ b/src/kiara/context/__init__.py @@ -38,6 +38,7 @@ from kiara.registries.events.registry import EventRegistry from kiara.registries.ids import ID_REGISTRY from kiara.registries.jobs import JobRegistry +from kiara.registries.metadata import MetadataRegistry from kiara.registries.models import ModelRegistry from kiara.registries.modules import ModuleRegistry from kiara.registries.operations import OperationRegistry @@ -129,9 +130,12 @@ def __init__( self._config: KiaraContextConfig = config self._runtime_config: KiaraRuntimeConfig = runtime_config + self._env_mgmt: EnvironmentRegistry = EnvironmentRegistry.instance() + self._event_registry: EventRegistry = EventRegistry(kiara=self) self._type_registry: TypeRegistry = TypeRegistry(self) self._data_registry: DataRegistry = DataRegistry(kiara=self) + self._metadata_registry: MetadataRegistry = MetadataRegistry(kiara=self) self._job_registry: JobRegistry = JobRegistry(kiara=self) self._module_registry: ModuleRegistry = ModuleRegistry(kiara=self) self._operation_registry: OperationRegistry = OperationRegistry(kiara=self) @@ -145,8 +149,6 @@ def __init__( self._render_registry = RenderRegistry(kiara=self) - self._env_mgmt: Union[EnvironmentRegistry, None] = None - metadata_augmenter = CreateMetadataDestinies(kiara=self) self._event_registry.add_listener( metadata_augmenter, *metadata_augmenter.supported_event_types() @@ -175,6 +177,8 @@ def __init__( archive_config = config_cls(**archive.config) archive_obj = archive_cls(archive_name=archive_alias, archive_config=archive_config) # type: ignore for supported_type in archive_obj.supported_item_types(): + if supported_type == "metadata": + self.metadata_registry.register_metadata_archive(archive_obj) # type: ignore if supported_type == "data": self.data_registry.register_data_archive( archive_obj, # type: ignore @@ -241,10 +245,6 @@ def context_info(self) -> "KiaraContextInfo": @property def environment_registry(self) -> EnvironmentRegistry: - if self._env_mgmt is not None: - return self._env_mgmt - - self._env_mgmt = EnvironmentRegistry.instance() return self._env_mgmt @property @@ -280,6 +280,10 @@ def operation_registry(self) -> OperationRegistry: def data_registry(self) -> DataRegistry: return self._data_registry + @property + def metadata_registry(self) -> MetadataRegistry: + return self._metadata_registry + @property def workflow_registry(self) -> WorkflowRegistry: return self._workflow_registry @@ -349,6 +353,13 @@ def register_external_archive( archive=_archive_inst.archive_name, archive_type="data", ) + elif archive_type == "metadata": + result["metadata"] = self.metadata_registry.register_metadata_archive(_archive_inst) # type: ignore + log_message( + "archive.registered", + archive=_archive_inst.archive_name, + archive_type="metadata", + ) elif archive_type == "alias": result["alias"] = self.alias_registry.register_archive(_archive_inst) # type: ignore log_message( @@ -484,6 +495,8 @@ def get_all_archives(self) -> Dict[KiaraArchive, Set[str]]: result: Dict[KiaraArchive, Set[str]] = {} archive: KiaraArchive + for alias, archive in self.metadata_registry.metadata_archives.items(): + result.setdefault(archive, set()).add(alias) for alias, archive in self.data_registry.data_archives.items(): result.setdefault(archive, set()).add(alias) for alias, archive in self.alias_registry.alias_archives.items(): diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index 25df37fda..f5f8382ec 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -32,6 +32,7 @@ DEFAULT_CONTEXT_NAME, DEFAULT_DATA_STORE_MARKER, DEFAULT_JOB_STORE_MARKER, + DEFAULT_METADATA_STORE_MARKER, DEFAULT_WORKFLOW_STORE_MARKER, KIARA_CONFIG_FILE_NAME, KIARA_MAIN_CONFIG_FILE, @@ -388,11 +389,7 @@ def create_default_store_config( return data_store -# if windows, we want sqlite as default, because although it's slower, it does not -# need the user to enable developer mode -DEFAULT_STORE_TYPE: Literal["sqlite", "filesystem"] = ( - "sqlite" if os.name == "nt" else "filesystem" -) +DEFAULT_STORE_TYPE: Literal["auto"] = "auto" class KiaraConfig(BaseSettings): @@ -460,8 +457,8 @@ def load_from_file(cls, path: Union[Path, str, None] = None) -> "KiaraConfig": description="The name of the default context to use if none is provided.", default=DEFAULT_CONTEXT_NAME, ) - default_store_type: Literal["sqlite", "filesystem"] = Field( - description="The default store type to ues if not specified.", + default_store_type: Literal["auto", "sqlite", "filesystem"] = Field( + description="The default store type to use when creating new stores.", default=DEFAULT_STORE_TYPE, ) auto_generate_contexts: bool = Field( @@ -634,14 +631,64 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: default_sqlite_config: Union[Dict[str, Any], None] = None - if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys(): + if self.default_store_type == "auto": + + # if windows, we want sqlite as default, because although it's slower, it does not + # need the user to enable developer mode + if os.name == "nt": + data_store_type = "sqlite" + else: + data_store_type = "filesystem" + + metadata_store_type = "sqlite" + alias_store_type = "sqlite" + job_store_type = "sqlite" + workflow_store_type = "sqlite" + elif self.default_store_type == "filesystem": + metadata_store_type = "filesystem" + data_store_type = "filesystem" + alias_store_type = "filesystem" + job_store_type = "filesystem" + workflow_store_type = "filesystem" + elif self.default_store_type == "sqlite": + metadata_store_type = "sqlite" + data_store_type = "sqlite" + alias_store_type = "sqlite" + job_store_type = "sqlite" + workflow_store_type = "sqlite" + else: + raise Exception(f"Unknown store type: {self.default_store_type}") + + if DEFAULT_METADATA_STORE_MARKER not in context_config.archives.keys(): - if self.default_store_type == "sqlite": + if metadata_store_type == "sqlite": default_sqlite_config = create_default_sqlite_archive_config() + metaddata_store = KiaraArchiveConfig( + archive_type="sqlite_metadata_store", config=default_sqlite_config + ) + elif metadata_store_type == "filesystem": + default_sqlite_config = create_default_sqlite_archive_config() + metaddata_store = KiaraArchiveConfig( + archive_type="sqlite_metadata_store", config=default_sqlite_config + ) + else: + raise Exception( + f"Can't create default metadata store: invalid default store type '{metadata_store_type}'" + ) + + context_config.archives[DEFAULT_METADATA_STORE_MARKER] = metaddata_store + changed = True + + if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys(): + + if data_store_type == "sqlite": + if default_sqlite_config is None: + default_sqlite_config = create_default_sqlite_archive_config() + data_store = KiaraArchiveConfig( archive_type="sqlite_data_store", config=default_sqlite_config ) - elif self.default_store_type == "filesystem": + elif data_store_type == "filesystem": data_store_type = "filesystem_data_store" data_store = create_default_store_config( store_type=data_store_type, @@ -649,7 +696,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: ) else: raise Exception( - f"Can't create default data store: invalid default store type '{self.default_store_type}'." + f"Can't create default data store: invalid default store type '{data_store_type}'." ) context_config.archives[DEFAULT_DATA_STORE_MARKER] = data_store @@ -657,7 +704,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: if DEFAULT_JOB_STORE_MARKER not in context_config.archives.keys(): - if self.default_store_type == "sqlite": + if job_store_type == "sqlite": if default_sqlite_config is None: default_sqlite_config = create_default_sqlite_archive_config() @@ -665,7 +712,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: job_store = KiaraArchiveConfig( archive_type="sqlite_job_store", config=default_sqlite_config ) - elif self.default_store_type == "filesystem": + elif job_store_type == "filesystem": job_store_type = "filesystem_job_store" job_store = create_default_store_config( store_type=job_store_type, @@ -673,7 +720,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: ) else: raise Exception( - f"Can't create default job store: invalid default store type '{self.default_store_type}'." + f"Can't create default job store: invalid default store type '{job_store_type}'." ) context_config.archives[DEFAULT_JOB_STORE_MARKER] = job_store @@ -681,7 +728,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: if DEFAULT_ALIAS_STORE_MARKER not in context_config.archives.keys(): - if self.default_store_type == "sqlite": + if alias_store_type == "sqlite": if default_sqlite_config is None: default_sqlite_config = create_default_sqlite_archive_config() @@ -689,7 +736,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: alias_store = KiaraArchiveConfig( archive_type="sqlite_alias_store", config=default_sqlite_config ) - elif self.default_store_type == "filesystem": + elif alias_store_type == "filesystem": alias_store_type = "filesystem_alias_store" alias_store = create_default_store_config( store_type=alias_store_type, @@ -697,7 +744,7 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: ) else: raise Exception( - f"Can't create default alias store: invalid default store type '{self.default_store_type}'." + f"Can't create default alias store: invalid default store type '{alias_store_type}'." ) context_config.archives[DEFAULT_ALIAS_STORE_MARKER] = alias_store @@ -705,6 +752,8 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]: if DEFAULT_WORKFLOW_STORE_MARKER not in context_config.archives.keys(): + # TODO: impolement sqlite type, or remove workflows entirely + workflow_store_type = "filesystem_workflow_store" # workflow_store_type = "sqlite_workflow_store" diff --git a/src/kiara/defaults.py b/src/kiara/defaults.py index 837c85ac3..5c8d223b8 100644 --- a/src/kiara/defaults.py +++ b/src/kiara/defaults.py @@ -94,6 +94,9 @@ DEFAULT_DATA_STORE_MARKER = "default_data_store" """Name for the default context data store.""" +DEFAULT_METADATA_STORE_MARKER = "default_metadata_store" +"""Name for the default context metadata store.""" + DEFAULT_JOB_STORE_MARKER = "default_job_store" """Name for the default context job store.""" diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index f2baa4249..40ef04fdb 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -187,7 +187,7 @@ def list_available_plugin_names( Get a list of all available plugins. Arguments: - regex: an optional regex to indicate the plugin naming scheme (default: /$kiara[_-]plugin\..*/) + regex: an optional regex to indicate the plugin naming scheme (default: /$kiara[_-]plugin\\..*/) Returns: a list of plugin names @@ -2542,6 +2542,46 @@ def assemble_filter_pipeline_config( return pipeline_config + # ------------------------------------------------------------------------------------------------------------------ + # metadata-related methods + + def register_metadata( + self, key: str, value: str, force: bool = False, store: Union[str, None] = None + ) -> uuid.UUID: + """Register a comment into the specified metadata store. + + Currently, this allows you to store comments within the default kiara context. You can use any string, + as key, for example a stringified `job_id`, or `value_id`, or any other string that makes sense in + the context you are using this in. + + If you use the store argument, the store needs to be mounted into the current *kiara* context. For now, + you can ignore this and not provide any value here, since this area is still in flux. If you need + to store a metadata item into an external context, and you can't figure out how to do it, + let me know. + + Note: this is preliminary and subject to change based on your input, so please provide your thoughts + + Arguments: + key: the key under which to store the metadata (can be anything you can think of) + value: the comment you want to store + force: overwrite the existing value if its key already exists in the store + store: the store to use, by default the context default is used + + Returns: + a globally unique identifier for the metadata item + """ + + if not value: + raise KiaraException("Cannot store empty metadata item.") + + from kiara.models.metadata import CommentMetadata + + item = CommentMetadata(comment=value) + + return self.context.metadata_registry.register_metadata_item( + key=key, item=item, force=force, store=store + ) + # ------------------------------------------------------------------------------------------------------------------ # render-related methods diff --git a/src/kiara/interfaces/python_api/models/archive.py b/src/kiara/interfaces/python_api/models/archive.py index 00443ecc0..5f964a1de 100644 --- a/src/kiara/interfaces/python_api/models/archive.py +++ b/src/kiara/interfaces/python_api/models/archive.py @@ -12,6 +12,7 @@ from kiara.context import Kiara from kiara.registries.aliases import AliasArchive, AliasStore from kiara.registries.data import DataArchive, DataStore + from kiara.registries.metadata import MetadataArchive, MetadataStore class KiArchive(KiaraModel): @@ -40,6 +41,13 @@ def load_kiarchive( archive_name=archive_name, ) + if "metadata" in archives.keys(): + metadata_archive: Union[MetadataArchive, None] = archives["metadata"] # type: ignore + metadata_archive_config: Union[Mapping[str, Any], None] = metadata_archive.config.model_dump() # type: ignore + else: + metadata_archive_config = None + metadata_archive = None + if "data" in archives.keys(): data_archive: Union[DataArchive, None] = archives["data"] # type: ignore data_archive_config: Union[Mapping[str, Any], None] = data_archive.config.model_dump() # type: ignore @@ -81,6 +89,7 @@ def load_kiarchive( kiarchive = KiArchive( archive_id=archive_id, archive_name=archive_alias, + metadata_archive_config=metadata_archive_config, data_archive_config=data_archive_config, alias_archive_config=alias_archive_config, archive_base_path=archive_path.parent.as_posix(), @@ -88,6 +97,7 @@ def load_kiarchive( allow_write_access=allow_write_access, ) + kiarchive._metadata_archive = metadata_archive kiarchive._data_archive = data_archive kiarchive._alias_archive = alias_archive kiarchive._kiara = kiara @@ -137,6 +147,16 @@ def create_kiarchive( allow_write_access=allow_write_access, ) data_store_config = data_store.config + + metadata_store: MetadataStore = create_new_archive( # type: ignore + archive_name=archive_name, + store_base_path=archive_base_path, + store_type="sqlite_metadata_store", + file_name=archive_file_name, + allow_write_access=True, + ) + metadata_store_config = metadata_store.config + alias_store: AliasStore = create_new_archive( # type: ignore archive_name=archive_name, store_base_path=archive_base_path, @@ -148,16 +168,19 @@ def create_kiarchive( kiarchive_id = data_store.archive_id assert alias_store.archive_id == kiarchive_id + assert metadata_store.archive_id == kiarchive_id kiarchive = KiArchive( archive_id=kiarchive_id, archive_name=archive_name, archive_base_path=archive_base_path, archive_file_name=archive_file_name, + metadata_archive_config=metadata_store_config.model_dump(), data_archive_config=data_store_config.model_dump(), alias_archive_config=alias_store_config.model_dump(), allow_write_access=allow_write_access, ) + kiarchive._metadata_archive = metadata_store kiarchive._data_archive = data_store kiarchive._alias_archive = alias_store kiarchive._kiara = kiara @@ -173,6 +196,9 @@ def create_kiarchive( allow_write_access: bool = Field( description="Whether the store allows write access.", default=False ) + metadata_archive_config: Union[Mapping[str, Any], None] = Field( + description="The archive to store metadata in.", default=None + ) data_archive_config: Union[Mapping[str, Any], None] = Field( description="The archive to store the data in.", default=None ) @@ -180,10 +206,30 @@ def create_kiarchive( description="The archive to store aliases in.", default=None ) + _metadata_archive: Union["MetadataArchive", None] = PrivateAttr(default=None) _data_archive: Union["DataArchive", None] = PrivateAttr(default=None) _alias_archive: Union["AliasArchive", None] = PrivateAttr(default=None) _kiara: Union["Kiara", None] = PrivateAttr(default=None) + @property + def metadata_archive(self) -> "MetadataArchive": + + if self._metadata_archive: + return self._metadata_archive + + from kiara.utils.stores import create_new_archive + + metadata_archive: MetadataArchive = create_new_archive( # type: ignore + archive_name=self.archive_name, + store_base_path=self.archive_base_path, + store_type="sqlite_metadata_store", + file_name=self.archive_file_name, + allow_write_access=True, + **self.metadata_archive_config, + ) + self._metadata_archive = metadata_archive + return self._metadata_archive + @property def data_archive(self) -> "DataArchive": diff --git a/src/kiara/models/__init__.py b/src/kiara/models/__init__.py index 80555835d..79750e968 100644 --- a/src/kiara/models/__init__.py +++ b/src/kiara/models/__init__.py @@ -10,7 +10,6 @@ import networkx as nx from dag_cbor import IPLDKind -from deepdiff import DeepHash from multiformats import CID from pydantic import ConfigDict from pydantic.fields import PrivateAttr @@ -30,7 +29,7 @@ from kiara.registries.templates import TemplateRegistry from kiara.utils.class_loading import _default_id_func from kiara.utils.develop import log_dev_message -from kiara.utils.hashing import KIARA_HASH_FUNCTION, compute_cid +from kiara.utils.hashing import compute_cid from kiara.utils.json import orjson_dumps from kiara.utils.models import ( assemble_subcomponent_graph, @@ -58,13 +57,23 @@ class KiaraModel(ABC, BaseModel, JupyterMixin): # return to_camel_case(cls._kiara_model_name) @classmethod - def get_schema_hash(cls) -> int: + def get_schema_cid(cls) -> CID: if cls._schema_hash_cache is not None: return cls._schema_hash_cache - obj = cls.model_json_schema() - h = DeepHash(obj, hasher=KIARA_HASH_FUNCTION) - cls._schema_hash_cache = h[obj] + model_schema = cls.model_json_schema() + try: + _, cid = compute_cid(data=model_schema) + except Exception as e: + from kiara.utils.output import extract_renderable + + msg = "Failed to compute cid for model schema instance." + item = extract_renderable(model_schema) + renderable = Group(msg, item, extract_renderable(e)) + log_dev_message(renderable, title="cid computation error") + raise e + + cls._schema_hash_cache = cid return cls._schema_hash_cache _graph_cache: Union[nx.DiGraph, None] = PrivateAttr(default=None) @@ -72,7 +81,7 @@ def get_schema_hash(cls) -> int: _dynamic_subcomponents: Dict[str, "KiaraModel"] = PrivateAttr(default_factory=dict) _id_cache: Union[str, None] = PrivateAttr(default=None) _category_id_cache: Union[str, None] = PrivateAttr(default=None) - _schema_hash_cache: ClassVar[Union[None, int]] = None + _schema_hash_cache: ClassVar[Union[None, CID]] = None _cid_cache: Union[CID, None] = PrivateAttr(default=None) _dag_cache: Union[bytes, None] = PrivateAttr(default=None) _size_cache: Union[int, None] = PrivateAttr(default=None) diff --git a/src/kiara/models/archives.py b/src/kiara/models/archives.py index 41183f29a..51d47ab47 100644 --- a/src/kiara/models/archives.py +++ b/src/kiara/models/archives.py @@ -290,6 +290,7 @@ def create_renderable(self, **config: Any) -> RenderableType: table.add_column("archive id") table.add_column("alias", style="i") table.add_column("item type(s)", style="i") + table.add_column("archive type", style="i") if show_config: table.add_column("config") if show_details: @@ -301,6 +302,7 @@ def create_renderable(self, **config: Any) -> RenderableType: row.append(str(archive.archive_id)) row.append(archive.archive_alias) row.append("\n".join(archive.archive_type_info.supported_item_types)) + row.append(archive.archive_type_info.type_name) if show_config: config_json = Syntax( diff --git a/src/kiara/models/metadata/__init__.py b/src/kiara/models/metadata/__init__.py new file mode 100644 index 000000000..e5b9ac9f2 --- /dev/null +++ b/src/kiara/models/metadata/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +from typing import Any, ClassVar + +from pydantic import Field + +from kiara.models import KiaraModel + + +class KiaraMetadata(KiaraModel): + def _retrieve_data_to_hash(self) -> Any: + return {"metadata": self.model_dump(), "schema": self.schema_json()} + + +class CommentMetadata(KiaraMetadata): + + _kiara_model_id: ClassVar = "instance.kiara_metadata.comment" + + comment: str = Field(description="A note/comment.") diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index 32722b518..026ac419b 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -25,7 +25,7 @@ from kiara.models.values.value import ValueMap, ValueMapReadOnly from kiara.processing import ModuleProcessor from kiara.processing.synchronous import SynchronousProcessor -from kiara.registries import BaseArchive +from kiara.registries.jobs.job_store import JobArchive, JobStore from kiara.utils import get_dev_config, is_develop if TYPE_CHECKING: @@ -37,52 +37,6 @@ MANIFEST_SUB_PATH = "manifests" -class JobArchive(BaseArchive): - # @abc.abstractmethod - # def find_matching_job_record( - # self, inputs_manifest: InputsManifest - # ) -> Optional[JobRecord]: - # pass - - @classmethod - def supported_item_types(cls) -> Iterable[str]: - return ["job_record"] - - @abc.abstractmethod - def retrieve_all_job_hashes( - self, - manifest_hash: Union[str, None] = None, - inputs_id_hash: Union[str, None] = None, - inputs_data_hash: Union[str, None] = None, - ) -> Iterable[str]: - """ - Retrieve a list of all job record hashes (cids) that match the given filter arguments. - - A job record hash includes information about the module type used in the job, the module configuration, as well as input field names and value ids for the values used in those inputs. - - If the job archive retrieves its jobs in a dynamic way, this will return 'None'. - """ - - @abc.abstractmethod - def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: - pass - - def retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: - - job_record = self._retrieve_record_for_job_hash(job_hash=job_hash) - return job_record - - -class JobStore(JobArchive): - @classmethod - def _is_writeable(cls) -> bool: - return True - - @abc.abstractmethod - def store_job_record(self, job_record: JobRecord): - pass - - class JobMatcher(abc.ABC): def __init__(self, kiara: "Kiara"): diff --git a/src/kiara/registries/jobs/job_store/__init__.py b/src/kiara/registries/jobs/job_store/__init__.py index e69de29bb..449590dbd 100644 --- a/src/kiara/registries/jobs/job_store/__init__.py +++ b/src/kiara/registries/jobs/job_store/__init__.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +import abc +from typing import Iterable, Union + +from kiara.models.module.jobs import JobRecord +from kiara.registries import BaseArchive + + +class JobArchive(BaseArchive): + # @abc.abstractmethod + # def find_matching_job_record( + # self, inputs_manifest: InputsManifest + # ) -> Optional[JobRecord]: + # pass + + @classmethod + def supported_item_types(cls) -> Iterable[str]: + return ["job_record"] + + @abc.abstractmethod + def retrieve_all_job_hashes( + self, + manifest_hash: Union[str, None] = None, + inputs_id_hash: Union[str, None] = None, + inputs_data_hash: Union[str, None] = None, + ) -> Iterable[str]: + """ + Retrieve a list of all job record hashes (cids) that match the given filter arguments. + + A job record hash includes information about the module type used in the job, the module configuration, as well as input field names and value ids for the values used in those inputs. + + If the job archive retrieves its jobs in a dynamic way, this will return 'None'. + """ + + @abc.abstractmethod + def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: + pass + + def retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: + + job_record = self._retrieve_record_for_job_hash(job_hash=job_hash) + return job_record + + +class JobStore(JobArchive): + @classmethod + def _is_writeable(cls) -> bool: + return True + + @abc.abstractmethod + def store_job_record(self, job_record: JobRecord): + pass diff --git a/src/kiara/registries/metadata/__init__.py b/src/kiara/registries/metadata/__init__.py new file mode 100644 index 000000000..988c8bc90 --- /dev/null +++ b/src/kiara/registries/metadata/__init__.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +import uuid +from typing import TYPE_CHECKING, Callable, Dict, Literal, Mapping, Union + +from pydantic import Field + +from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER +from kiara.models.events import RegistryEvent +from kiara.models.metadata import KiaraMetadata +from kiara.registries.metadata.metadata_store import MetadataArchive, MetadataStore + +if TYPE_CHECKING: + from kiara.context import Kiara + from kiara.registries.environment import EnvironmentRegistry + + +class MetadataArchiveAddedEvent(RegistryEvent): + + event_type: Literal["metadata_archive_added"] = "metadata_archive_added" + metadata_archive_id: uuid.UUID = Field( + description="The unique id of this metadata archive." + ) + metadata_archive_alias: str = Field( + description="The alias this metadata archive was added as." + ) + is_store: bool = Field( + description="Whether this archive supports write operations (aka implements the 'MetadataStore' interface)." + ) + is_default_store: bool = Field( + description="Whether this store acts as default store." + ) + + +class MetadataRegistry(object): + def __init__(self, kiara: "Kiara"): + + self._kiara: Kiara = kiara + self._event_callback: Callable = self._kiara.event_registry.add_producer(self) + + self._metadata_archives: Dict[str, MetadataArchive] = {} + self._default_data_store: Union[str, None] = None + + self._env_registry: EnvironmentRegistry = self._kiara.environment_registry + + @property + def kiara_id(self) -> uuid.UUID: + return self._kiara.id + + def register_metadata_archive( + self, + archive: MetadataArchive, + set_as_default_store: Union[bool, None] = None, + ) -> str: + + alias = archive.archive_name + + if not alias: + raise Exception("Invalid data archive alias: can't be empty.") + + if alias in self._metadata_archives.keys(): + raise Exception( + f"Can't add data archive, alias '{alias}' already registered." + ) + + archive.register_archive(kiara=self._kiara) + + self._metadata_archives[alias] = archive + is_store = False + is_default_store = False + if isinstance(archive, MetadataStore): + is_store = True + + if set_as_default_store and self._default_data_store is not None: + raise Exception( + f"Can't set data store '{alias}' as default store: default store already set." + ) + + if self._default_data_store is None or set_as_default_store: + is_default_store = True + self._default_data_store = alias + + event = MetadataArchiveAddedEvent( + kiara_id=self._kiara.id, + metadata_archive_id=archive.archive_id, + metadata_archive_alias=alias, + is_store=is_store, + is_default_store=is_default_store, + ) + self._event_callback(event) + + return alias + + @property + def default_data_store(self) -> str: + if self._default_data_store is None: + raise Exception("No default metadata store set.") + return self._default_data_store + + @property + def metadata_archives(self) -> Mapping[str, MetadataArchive]: + return self._metadata_archives + + def get_archive( + self, archive_id_or_alias: Union[None, uuid.UUID, str] = None + ) -> MetadataArchive: + + if archive_id_or_alias in ( + None, + DEFAULT_STORE_MARKER, + DEFAULT_METADATA_STORE_MARKER, + ): + archive_id_or_alias = self.default_data_store + if archive_id_or_alias is None: + raise Exception( + "Can't retrieve default metadata archive, none set (yet)." + ) + + if isinstance(archive_id_or_alias, uuid.UUID): + for archive in self._metadata_archives.values(): + if archive.archive_id == archive_id_or_alias: + return archive + + raise Exception( + f"Can't retrieve metadata archive with id '{archive_id_or_alias}': no archive with that id registered." + ) + + if archive_id_or_alias in self._metadata_archives.keys(): + return self._metadata_archives[archive_id_or_alias] + else: + try: + _archive_id = uuid.UUID(archive_id_or_alias) + for archive in self._metadata_archives.values(): + if archive.archive_id == _archive_id: + return archive + raise Exception( + f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered." + ) + except Exception: + pass + + raise Exception( + f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered." + ) + + def register_metadata_item( + self, + key: str, + item: KiaraMetadata, + force: bool = False, + store: Union[str, uuid.UUID, None] = None, + ) -> uuid.UUID: + + mounted_store: MetadataStore = self.get_archive(archive_id_or_alias=store) # type: ignore + + return mounted_store.store_metadata_item(key=key, item=item, force=force) diff --git a/src/kiara/registries/metadata/metadata_store/__init__.py b/src/kiara/registries/metadata/metadata_store/__init__.py new file mode 100644 index 000000000..f7b543c10 --- /dev/null +++ b/src/kiara/registries/metadata/metadata_store/__init__.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +import abc +import json +import uuid +from typing import Any, Dict, Generic, Iterable, Union + +from kiara.models.metadata import KiaraMetadata +from kiara.registries import ARCHIVE_CONFIG_CLS, BaseArchive + + +class MetadataArchive(BaseArchive[ARCHIVE_CONFIG_CLS], Generic[ARCHIVE_CONFIG_CLS]): + """Base class for data archiv implementationss.""" + + @classmethod + def supported_item_types(cls) -> Iterable[str]: + """This archive type only supports storing data.""" + + return ["metadata"] + + def __init__( + self, + archive_name: str, + archive_config: ARCHIVE_CONFIG_CLS, + force_read_only: bool = False, + ): + + super().__init__( + archive_name=archive_name, + archive_config=archive_config, + force_read_only=force_read_only, + ) + + def retrieve_metadata_value( + self, + key: str, + metadata_model: Union[str, None] = None, + reference_id: Union[str, None] = None, + ) -> Any: + + pass + + +class MetadataStore(MetadataArchive): + def __init__( + self, + archive_name: str, + archive_config: ARCHIVE_CONFIG_CLS, + force_read_only: bool = False, + ): + + super().__init__( + archive_name=archive_name, + archive_config=archive_config, + force_read_only=force_read_only, + ) + self._schema_stored_cache: Dict[str, Any] = {} + + @classmethod + def _is_writeable(cls) -> bool: + return True + + @abc.abstractmethod + def _store_metadata_schema( + self, model_schema_hash: str, model_type_id: str, model_schema: str + ): + """Store the metadata schema for the specified model.""" + + def store_metadata_item( + self, + key: str, + item: KiaraMetadata, + reference_item: Any = None, + force: bool = False, + store: Union[str, uuid.UUID, None] = None, + ) -> uuid.UUID: + + if reference_item: + raise NotImplementedError( + "Cannot store metadata item with reference item, not implemented yet." + ) + + GLOBAL_REFERENCE_TYPE = "global" + DEFAULT_GLOBAL_REFERENCE_ID = "default" + + reference_item_type = GLOBAL_REFERENCE_TYPE + reference_item_id = DEFAULT_GLOBAL_REFERENCE_ID + + if store: + raise NotImplementedError( + "Cannot store metadata item with store, not implemented yet." + ) + + # TODO: check if already stored + model_type = item.model_type_id + model_schema_hash = str(item.get_schema_cid()) + model_item_schema = item.model_json_schema() + model_item_schema_str = json.dumps(model_item_schema) + + self._store_metadata_schema( + model_schema_hash=model_schema_hash, + model_type_id=model_type, + model_schema=model_item_schema_str, + ) + + # data = item.model_dump() + data_json = item.model_dump_json() + data_hash = str(item.instance_cid) + + metadata_item_id = self._store_metadata_item( + key=key, + value_json=data_json, + value_hash=data_hash, + model_type_id=model_type, + model_schema_hash=model_schema_hash, + reference_item_type=reference_item_type, + reference_item_id=reference_item_id, + force=force, + ) + + return metadata_item_id + + @abc.abstractmethod + def _store_metadata_item( + self, + key: str, + value_json: str, + value_hash: str, + model_type_id: str, + model_schema_hash: str, + reference_item_type: str, + reference_item_id: str, + force: bool = False, + ) -> uuid.UUID: + pass diff --git a/src/kiara/registries/metadata/metadata_store/sqlite_store.py b/src/kiara/registries/metadata/metadata_store/sqlite_store.py new file mode 100644 index 000000000..a077f03fe --- /dev/null +++ b/src/kiara/registries/metadata/metadata_store/sqlite_store.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +import uuid +from pathlib import Path +from typing import Any, Dict, Mapping, Union + +from sqlalchemy import text +from sqlalchemy.engine import Engine, create_engine + +from kiara.registries import SqliteArchiveConfig +from kiara.registries.metadata import MetadataArchive, MetadataStore + +REQUIRED_METADATA_TABLES = { + "metadata", +} + + +class SqliteMetadataArchive(MetadataArchive): + + _archive_type_name = "sqlite_metadata_archive" + _config_cls = SqliteArchiveConfig + + @classmethod + def _load_archive_config( + cls, archive_uri: str, allow_write_access: bool, **kwargs + ) -> Union[Dict[str, Any], None]: + + if allow_write_access: + return None + + if not Path(archive_uri).is_file(): + return None + + import sqlite3 + + con = sqlite3.connect(archive_uri) + + cursor = con.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {x[0] for x in cursor.fetchall()} + con.close() + + if not REQUIRED_METADATA_TABLES.issubset(tables): + return None + + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": archive_uri} + + def __init__( + self, + archive_name: str, + archive_config: SqliteArchiveConfig, + force_read_only: bool = False, + ): + + super().__init__( + archive_name=archive_name, + archive_config=archive_config, + force_read_only=force_read_only, + ) + self._db_path: Union[Path, None] = None + self._cached_engine: Union[Engine, None] = None + # self._lock: bool = True + + # def _retrieve_archive_id(self) -> uuid.UUID: + # sql = text("SELECT value FROM archive_metadata WHERE key='archive_id'") + # + # with self.sqlite_engine.connect() as connection: + # result = connection.execute(sql) + # row = result.fetchone() + # if row is None: + # raise Exception("No archive ID found in metadata") + # return uuid.UUID(row[0]) + + def _retrieve_archive_metadata(self) -> Mapping[str, Any]: + + sql = text("SELECT key, value FROM archive_metadata") + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql) + return {row[0]: row[1] for row in result} + + @property + def sqlite_path(self): + + if self._db_path is not None: + return self._db_path + + db_path = Path(self.config.sqlite_db_path).resolve() + # self._db_path = fix_windows_longpath(db_path) + self._db_path = db_path + + if self._db_path.exists(): + return self._db_path + + self._db_path.parent.mkdir(parents=True, exist_ok=True) + return self._db_path + + @property + def db_url(self) -> str: + return f"sqlite:///{self.sqlite_path}" + + @property + def sqlite_engine(self) -> "Engine": + + if self._cached_engine is not None: + return self._cached_engine + + # def _pragma_on_connect(dbapi_con, con_record): + # dbapi_con.execute("PRAGMA query_only = ON") + + self._cached_engine = create_engine(self.db_url, future=True) + create_table_sql = """ +CREATE TABLE IF NOT EXISTS metadata_schemas ( + model_schema_hash TEXT PRIMARY KEY, + model_type_id TEXT NOT NULL, + model_schema TEXT NOT NULL +); +CREATE TABLE IF NOT EXISTS metadata ( + metadata_item_id TEXT PRIMARY KEY, + metadata_item_key TEXT NOT NULL, + metadata_item_hash TEXT NOT NULL, + model_type_id TEXT NOT NULL, + model_schema_hash TEXT NOT NULL, + reference_item_type TEXT NOT NULL, + reference_item_id TEXT NOT NULL, + metadata_value TEXT NOT NULL, + FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash), + UNIQUE (metadata_item_key, reference_item_type, reference_item_id) +); +""" + + with self._cached_engine.begin() as connection: + for statement in create_table_sql.split(";"): + if statement.strip(): + connection.execute(text(statement)) + + # if self._lock: + # event.listen(self._cached_engine, "connect", _pragma_on_connect) + return self._cached_engine + + +class SqliteMetadataStore(SqliteMetadataArchive, MetadataStore): + + _archive_type_name = "sqlite_metadata_store" + + @classmethod + def _load_archive_config( + cls, archive_uri: str, allow_write_access: bool, **kwargs + ) -> Union[Dict[str, Any], None]: + + if not allow_write_access: + return None + + if not Path(archive_uri).is_file(): + return None + + import sqlite3 + + con = sqlite3.connect(archive_uri) + + cursor = con.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {x[0] for x in cursor.fetchall()} + con.close() + + if not REQUIRED_METADATA_TABLES.issubset(tables): + return None + + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": archive_uri} + + def _set_archive_metadata_value(self, key: str, value: Any): + """Set custom metadata for the archive.""" + + sql = text( + "INSERT OR REPLACE INTO archive_metadata (key, value) VALUES (:key, :value)" + ) + with self.sqlite_engine.connect() as conn: + params = {"key": key, "value": value} + conn.execute(sql, params) + conn.commit() + + def _store_metadata_schema( + self, model_schema_hash: str, model_type_id: str, model_schema: str + ): + + sql = text( + "INSERT OR IGNORE INTO metadata_schemas (model_schema_hash, model_type_id, model_schema) VALUES (:model_schema_hash, :model_type_id, :model_schema)" + ) + params = { + "model_schema_hash": model_schema_hash, + "model_type_id": model_type_id, + "model_schema": model_schema, + } + with self.sqlite_engine.connect() as conn: + conn.execute(sql, params) + conn.commit() + + def _store_metadata_item( + self, + key: str, + value_json: str, + value_hash: str, + model_type_id: str, + model_schema_hash: str, + reference_item_type: str, + reference_item_id: str, + force: bool = False, + ) -> uuid.UUID: + + from kiara.registries.ids import ID_REGISTRY + + if force: + sql = text( + "INSERT OR REPLACE INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, reference_item_type, reference_item_id, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :reference_item_type, :reference_item_id, :metadata_value)" + ) + else: + sql = text( + "INSERT INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, reference_item_type, reference_item_id, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :reference_item_type, :reference_item_id, :metadata_value)" + ) + + metadata_item_id = ID_REGISTRY.generate(comment="new metadata item id") + + params = { + "metadata_item_id": str(metadata_item_id), + "metadata_item_key": key, + "metadata_item_hash": value_hash, + "model_type_id": model_type_id, + "model_schema_hash": model_schema_hash, + "reference_item_type": reference_item_type, + "reference_item_id": reference_item_id, + "metadata_value": value_json, + } + with self.sqlite_engine.connect() as conn: + conn.execute(sql, params) + conn.commit() + + return metadata_item_id