diff --git a/src/kiara/interfaces/python_api/base_api.py b/src/kiara/interfaces/python_api/base_api.py index 2336cae57..aea442f82 100644 --- a/src/kiara/interfaces/python_api/base_api.py +++ b/src/kiara/interfaces/python_api/base_api.py @@ -1828,17 +1828,18 @@ def store_value( ) if store_related_metadata: - print("STORING_METADATA") from kiara.registries.metadata import MetadataMatcher - matcher = MetadataMatcher.create_matcher(reference_item_ids=[value_obj.job_id, value_obj.value_id]) + matcher = MetadataMatcher.create_matcher( + reference_item_ids=[value_obj.job_id, value_obj.value_id] + ) target_store = self.context.metadata_registry.get_archive(store) - matching_metadata = self.context.metadata_registry.find_metadata_items(matcher=matcher) + matching_metadata = self.context.metadata_registry.find_metadata_items( + matcher=matcher + ) target_store.store_metadata_and_ref_items(matching_metadata) - - if set_as_store_default: store_instance = self.context.data_registry.get_archive(store) store_instance.set_archive_metadata_value( @@ -1872,7 +1873,6 @@ def store_values( uuid.UUID, Mapping[str, Union[str, uuid.UUID, Value]], Iterable[Union[str, uuid.UUID, Value]], - ], alias_map: Union[Mapping[str, Iterable[str]], bool, str] = False, allow_alias_overwrite: bool = True, @@ -1933,7 +1933,6 @@ def store_values( """ - if isinstance(values, (str, uuid.UUID, Value)): values = [values] @@ -1995,7 +1994,7 @@ def store_values( alias=aliases, allow_overwrite=allow_alias_overwrite, store=store, - store_related_metadata=store_related_metadata + store_related_metadata=store_related_metadata, ) result[str(value_obj.value_id)] = store_result else: @@ -2021,8 +2020,7 @@ def store_values( alias=aliases_map, allow_overwrite=allow_alias_overwrite, store=store, - store_related_metadata=store_related_metadata - + store_related_metadata=store_related_metadata, ) result[field_name] = store_result @@ -2218,7 +2216,6 @@ def export_values( store_related_metadata=export_related_metadata, ) - if additional_archive_metadata: for k, v in additional_archive_metadata.items(): self.set_archive_metadata_value(target_archive_ref, k, v) diff --git a/src/kiara/registries/data/__init__.py b/src/kiara/registries/data/__init__.py index 1f5517001..2b3db68e9 100644 --- a/src/kiara/registries/data/__init__.py +++ b/src/kiara/registries/data/__init__.py @@ -86,8 +86,6 @@ from kiara.context import Kiara from kiara.models.module.destiny import Destiny from kiara.models.module.manifest import Manifest - from kiara.models.runtime_environment import RuntimeEnvironment - from kiara.registries.metadata import MetadataStore logger = structlog.getLogger() @@ -576,7 +574,7 @@ def store_value( self._event_callback(store_event) if _value.job_id: - self._kiara.job_registry.store_job_record(job_id=_value.job_id) + self._kiara.job_registry.store_job_record(job_id=_value.job_id, store=data_store) return persisted_value diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index 9ee5ae4e5..5fbf4d43d 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -14,7 +14,8 @@ from bidict import bidict from rich.console import Group -from kiara.defaults import ENVIRONMENT_MARKER_KEY +from kiara.defaults import ENVIRONMENT_MARKER_KEY, DEFAULT_DATA_STORE_MARKER, DEFAULT_JOB_STORE_MARKER, \ + DEFAULT_STORE_MARKER from kiara.exceptions import FailedJobException from kiara.models.events import KiaraEvent from kiara.models.events.job_registry import ( @@ -256,7 +257,7 @@ def default_job_store(self) -> str: def get_archive(self, store_id: Union[str, None, uuid.UUID] = None) -> JobArchive: - if store_id is None: + if store_id in [None, DEFAULT_DATA_STORE_MARKER, DEFAULT_JOB_STORE_MARKER, DEFAULT_STORE_MARKER]: store_id = self.default_job_store if store_id is None: raise Exception("Can't retrieve deafult job archive, none set (yet).") @@ -300,7 +301,7 @@ def _persist_environment(self, env_type: str, env_hash: str): ) self._env_cache.setdefault(env_type, {})[env_hash] = environment - def store_job_record(self, job_id: uuid.UUID): + def store_job_record(self, job_id: uuid.UUID, store: Union[str, None]=None): # TODO: allow to store job record to external store @@ -315,18 +316,10 @@ def store_job_record(self, job_id: uuid.UUID): ) return - store: JobStore = self.get_archive() # type: ignore + store: JobStore = self.get_archive(store) # type: ignore if not isinstance(store, JobStore): raise Exception("Can't store job record to archive: not writable.") - # if job_record.job_id in self._finished_jobs.values(): - # logger.debug( - # "ignore.store.job_record", - # reason="already stored in store", - # job_id=str(job_id), - # ) - # return - logger.debug( "store.job_record", job_hash=job_record.job_hash, @@ -424,7 +417,9 @@ def retrieve_all_job_records(self) -> Mapping[uuid.UUID, JobRecord]: for archive in self.job_archives.values(): all_record_ids = archive.retrieve_all_job_ids().keys() for r in all_record_ids: - assert r not in all_records.keys() + if r in all_records.keys(): + continue + job_record = archive.retrieve_record_for_job_id(r) assert job_record is not None all_records[r] = job_record diff --git a/src/kiara/registries/jobs/job_store/sqlite_store.py b/src/kiara/registries/jobs/job_store/sqlite_store.py index 80dae4b66..1d9ee2eb3 100644 --- a/src/kiara/registries/jobs/job_store/sqlite_store.py +++ b/src/kiara/registries/jobs/job_store/sqlite_store.py @@ -9,7 +9,7 @@ from sqlalchemy.engine import Engine from kiara.models.module.jobs import JobMatcher, JobRecord -from kiara.registries import SqliteArchiveConfig +from kiara.registries import SqliteArchiveConfig, ArchiveDetails from kiara.registries.jobs import JobArchive, JobStore from kiara.utils.db import create_archive_engine, delete_archive_db @@ -274,7 +274,19 @@ def _delete_archive(self): delete_archive_db(db_path=self.sqlite_path) + def get_archive_details(self) -> ArchiveDetails: + all_job_records_sql = text("SELECT COUNT(*) FROM job_records") + + with self.sqlite_engine.connect() as connection: + result = connection.execute(all_job_records_sql) + job_count = result.fetchone()[0] + + details = { + "no_job_records": job_count, + "dynamic_archive": False + } + return ArchiveDetails(**details) class SqliteJobStore(SqliteJobArchive, JobStore): _archive_type_name = "sqlite_job_store" diff --git a/src/kiara/registries/metadata/__init__.py b/src/kiara/registries/metadata/__init__.py index 1757cdfc8..f207a479b 100644 --- a/src/kiara/registries/metadata/__init__.py +++ b/src/kiara/registries/metadata/__init__.py @@ -1,10 +1,21 @@ # -*- coding: utf-8 -*- import uuid -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Mapping, Union, Generator, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Generator, + List, + Literal, + Mapping, + Tuple, + Union, +) from pydantic import Field, field_validator -from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER +from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER, DEFAULT_DATA_STORE_MARKER from kiara.models import KiaraModel from kiara.models.events import RegistryEvent from kiara.models.metadata import CommentMetadata, KiaraMetadata @@ -82,7 +93,7 @@ def __init__(self, kiara: "Kiara"): self._event_callback: Callable = self._kiara.event_registry.add_producer(self) self._metadata_archives: Dict[str, MetadataArchive] = {} - self._default_data_store: Union[str, None] = None + self._default_metadata_store: Union[str, None] = None # self._env_registry: EnvironmentRegistry = self._kiara.environment_registry @@ -114,14 +125,14 @@ def register_metadata_archive( if isinstance(archive, MetadataStore): is_store = True - if set_as_default_store and self._default_data_store is not None: + if set_as_default_store and self._default_metadata_store is not None: raise Exception( f"Can't set data store '{alias}' as default store: default store already set." ) - if self._default_data_store is None or set_as_default_store: + if self._default_metadata_store is None or set_as_default_store: is_default_store = True - self._default_data_store = alias + self._default_metadata_store = alias event = MetadataArchiveAddedEvent( kiara_id=self._kiara.id, @@ -135,10 +146,10 @@ def register_metadata_archive( return alias @property - def default_data_store(self) -> str: - if self._default_data_store is None: + def default_metadata_store(self) -> str: + if self._default_metadata_store is None: raise Exception("No default metadata store set.") - return self._default_data_store + return self._default_metadata_store @property def metadata_archives(self) -> Mapping[str, MetadataArchive]: @@ -152,8 +163,9 @@ def get_archive( None, DEFAULT_STORE_MARKER, DEFAULT_METADATA_STORE_MARKER, + DEFAULT_DATA_STORE_MARKER ): - archive_id_or_alias = self.default_data_store + archive_id_or_alias = self.default_metadata_store if archive_id_or_alias is None: raise Exception( "Can't retrieve default metadata archive, none set (yet)." @@ -186,7 +198,9 @@ def get_archive( f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered." ) - def find_metadata_items(self, matcher: MetadataMatcher) -> Generator[Tuple[Any, ...], None, None]: + def find_metadata_items( + self, matcher: MetadataMatcher + ) -> Generator[Tuple[Any, ...], None, None]: mounted_store: MetadataArchive = self.get_archive() diff --git a/src/kiara/registries/metadata/metadata_store/__init__.py b/src/kiara/registries/metadata/metadata_store/__init__.py index 303340944..2b9ec0d65 100644 --- a/src/kiara/registries/metadata/metadata_store/__init__.py +++ b/src/kiara/registries/metadata/metadata_store/__init__.py @@ -59,17 +59,19 @@ def find_matching_metadata_items( reference_item_result_fields=reference_item_result_fields, ) - def store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, None]): - - return self._store_metadata_and_ref_items(items) + def store_metadata_and_ref_items( + self, items: Generator[Tuple[Any, ...], None, None] + ): + return self._store_metadata_and_ref_items(items) @abc.abstractmethod - def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, None]): + def _store_metadata_and_ref_items( + self, items: Generator[Tuple[Any, ...], None, None] + ): pass - @abc.abstractmethod def _find_matching_metadata_and_ref_items( self, diff --git a/src/kiara/registries/metadata/metadata_store/sqlite_store.py b/src/kiara/registries/metadata/metadata_store/sqlite_store.py index c4493efb6..044327d2d 100644 --- a/src/kiara/registries/metadata/metadata_store/sqlite_store.py +++ b/src/kiara/registries/metadata/metadata_store/sqlite_store.py @@ -1,14 +1,14 @@ # -*- coding: utf-8 -*- import uuid from pathlib import Path -from typing import Any, Dict, Generator, Iterable, Mapping, Tuple, Union, List +from typing import Any, Dict, Generator, Iterable, List, Mapping, Tuple, Union import orjson from sqlalchemy import text from sqlalchemy.engine import Engine from kiara.exceptions import KiaraException -from kiara.registries import SqliteArchiveConfig +from kiara.registries import SqliteArchiveConfig, ArchiveDetails from kiara.registries.metadata import MetadataArchive, MetadataMatcher, MetadataStore from kiara.utils.dates import get_current_time_incl_timezone from kiara.utils.db import create_archive_engine, delete_archive_db @@ -140,7 +140,8 @@ def sqlite_engine(self) -> "Engine": reference_item_id TEXT NOT NULL, reference_created TEXT NOT NULL, metadata_item_id TEXT NOT NULL, - FOREIGN KEY (metadata_item_id) REFERENCES metadata (metadata_item_id) + FOREIGN KEY (metadata_item_id) REFERENCES metadata (metadata_item_id), + UNIQUE (reference_item_type, reference_item_key, reference_item_id, metadata_item_id, reference_created) ); """ @@ -153,10 +154,13 @@ def sqlite_engine(self) -> "Engine": # event.listen(self._cached_engine, "connect", _pragma_on_connect) return self._cached_engine - def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, None]): + def _store_metadata_and_ref_items( + self, items: Generator[Tuple[Any, ...], None, None] + ): insert_metadata_sql = text( - "INSERT OR IGNORE INTO metadata (metadata_item_id, metadata_item_created, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_created, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)") + "INSERT OR IGNORE INTO metadata (metadata_item_id, metadata_item_created, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_created, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)" + ) insert_ref_sql = text( "INSERT OR IGNORE INTO metadata_references (reference_item_type, reference_item_key, reference_item_id, reference_created, metadata_item_id) VALUES (:reference_item_type, :reference_item_key, :reference_item_id, :reference_created, :metadata_item_id)" @@ -167,6 +171,7 @@ def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, with self.sqlite_engine.connect() as conn: from sqlalchemy import Row + metadata_items: List[Row] = [] ref_items = [] @@ -174,25 +179,20 @@ def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, if item.result_type == "metadata_item": metadata_items.append(item._asdict()) elif item.result_type == "metadata_ref_item": - print(f"ADDING: {item}") ref_items.append(item._asdict()) else: raise KiaraException(f"Unknown result type '{item.result_type}'") if len(metadata_items) >= batch_size: - print(f"STOREING: {len(metadata_items)} metadata items") conn.execute(insert_metadata_sql, metadata_items) metadata_items.clear() if len(ref_items) >= batch_size: - print(f"STOREING: {len(ref_items)} reference items") conn.execute(insert_ref_sql, ref_items) ref_items.clear() if metadata_items: - print(f"STOREING: {len(metadata_items)} metadata items") conn.execute(insert_metadata_sql, metadata_items) if ref_items: - print(f"STOREING: {len(ref_items)} reference items") conn.execute(insert_ref_sql, ref_items) conn.commit() @@ -322,7 +322,6 @@ def _find_matching_metadata_and_ref_items( ref_sql = text(ref_sql_string) - with self.sqlite_engine.connect() as connection: result = connection.execute(sql, params) for row in result: @@ -375,6 +374,25 @@ def _delete_archive(self): delete_archive_db(db_path=self.sqlite_path) + def get_archive_details(self) -> ArchiveDetails: + + all_metadata_items_sql = text("SELECT COUNT(*) FROM metadata") + all_references_sql = text("SELECT COUNT(*) FROM metadata_references") + + with self.sqlite_engine.connect() as connection: + result = connection.execute(all_metadata_items_sql) + metadata_count = result.fetchone()[0] + + result = connection.execute(all_references_sql) + reference_count = result.fetchone()[0] + + details = { + "no_metadata_items": metadata_count, + "no_references": reference_count, + "dynamic_archive": False + } + return ArchiveDetails(**details) + class SqliteMetadataStore(SqliteMetadataArchive, MetadataStore):