Skip to content

Commit

Permalink
chore: fixes for metadata export
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 30, 2024
1 parent d1a7fb9 commit 759df43
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 55 deletions.
19 changes: 8 additions & 11 deletions src/kiara/interfaces/python_api/base_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1828,17 +1828,18 @@ def store_value(
)

if store_related_metadata:
print("STORING_METADATA")

from kiara.registries.metadata import MetadataMatcher
matcher = MetadataMatcher.create_matcher(reference_item_ids=[value_obj.job_id, value_obj.value_id])
matcher = MetadataMatcher.create_matcher(
reference_item_ids=[value_obj.job_id, value_obj.value_id]
)

target_store = self.context.metadata_registry.get_archive(store)
matching_metadata = self.context.metadata_registry.find_metadata_items(matcher=matcher)
matching_metadata = self.context.metadata_registry.find_metadata_items(
matcher=matcher
)
target_store.store_metadata_and_ref_items(matching_metadata)



if set_as_store_default:
store_instance = self.context.data_registry.get_archive(store)
store_instance.set_archive_metadata_value(
Expand Down Expand Up @@ -1872,7 +1873,6 @@ def store_values(
uuid.UUID,
Mapping[str, Union[str, uuid.UUID, Value]],
Iterable[Union[str, uuid.UUID, Value]],

],
alias_map: Union[Mapping[str, Iterable[str]], bool, str] = False,
allow_alias_overwrite: bool = True,
Expand Down Expand Up @@ -1933,7 +1933,6 @@ def store_values(
"""


if isinstance(values, (str, uuid.UUID, Value)):
values = [values]

Expand Down Expand Up @@ -1995,7 +1994,7 @@ def store_values(
alias=aliases,
allow_overwrite=allow_alias_overwrite,
store=store,
store_related_metadata=store_related_metadata
store_related_metadata=store_related_metadata,
)
result[str(value_obj.value_id)] = store_result
else:
Expand All @@ -2021,8 +2020,7 @@ def store_values(
alias=aliases_map,
allow_overwrite=allow_alias_overwrite,
store=store,
store_related_metadata=store_related_metadata

store_related_metadata=store_related_metadata,
)
result[field_name] = store_result

Expand Down Expand Up @@ -2218,7 +2216,6 @@ def export_values(
store_related_metadata=export_related_metadata,
)


if additional_archive_metadata:
for k, v in additional_archive_metadata.items():
self.set_archive_metadata_value(target_archive_ref, k, v)
Expand Down
4 changes: 1 addition & 3 deletions src/kiara/registries/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@
from kiara.context import Kiara
from kiara.models.module.destiny import Destiny
from kiara.models.module.manifest import Manifest
from kiara.models.runtime_environment import RuntimeEnvironment
from kiara.registries.metadata import MetadataStore


logger = structlog.getLogger()
Expand Down Expand Up @@ -576,7 +574,7 @@ def store_value(
self._event_callback(store_event)

if _value.job_id:
self._kiara.job_registry.store_job_record(job_id=_value.job_id)
self._kiara.job_registry.store_job_record(job_id=_value.job_id, store=data_store)

return persisted_value

Expand Down
21 changes: 8 additions & 13 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from bidict import bidict
from rich.console import Group

from kiara.defaults import ENVIRONMENT_MARKER_KEY
from kiara.defaults import ENVIRONMENT_MARKER_KEY, DEFAULT_DATA_STORE_MARKER, DEFAULT_JOB_STORE_MARKER, \
DEFAULT_STORE_MARKER
from kiara.exceptions import FailedJobException
from kiara.models.events import KiaraEvent
from kiara.models.events.job_registry import (
Expand Down Expand Up @@ -256,7 +257,7 @@ def default_job_store(self) -> str:

def get_archive(self, store_id: Union[str, None, uuid.UUID] = None) -> JobArchive:

if store_id is None:
if store_id in [None, DEFAULT_DATA_STORE_MARKER, DEFAULT_JOB_STORE_MARKER, DEFAULT_STORE_MARKER]:
store_id = self.default_job_store
if store_id is None:
raise Exception("Can't retrieve deafult job archive, none set (yet).")
Expand Down Expand Up @@ -300,7 +301,7 @@ def _persist_environment(self, env_type: str, env_hash: str):
)
self._env_cache.setdefault(env_type, {})[env_hash] = environment

def store_job_record(self, job_id: uuid.UUID):
def store_job_record(self, job_id: uuid.UUID, store: Union[str, None]=None):

# TODO: allow to store job record to external store

Expand All @@ -315,18 +316,10 @@ def store_job_record(self, job_id: uuid.UUID):
)
return

store: JobStore = self.get_archive() # type: ignore
store: JobStore = self.get_archive(store) # type: ignore
if not isinstance(store, JobStore):
raise Exception("Can't store job record to archive: not writable.")

# if job_record.job_id in self._finished_jobs.values():
# logger.debug(
# "ignore.store.job_record",
# reason="already stored in store",
# job_id=str(job_id),
# )
# return

logger.debug(
"store.job_record",
job_hash=job_record.job_hash,
Expand Down Expand Up @@ -424,7 +417,9 @@ def retrieve_all_job_records(self) -> Mapping[uuid.UUID, JobRecord]:
for archive in self.job_archives.values():
all_record_ids = archive.retrieve_all_job_ids().keys()
for r in all_record_ids:
assert r not in all_records.keys()
if r in all_records.keys():
continue

job_record = archive.retrieve_record_for_job_id(r)
assert job_record is not None
all_records[r] = job_record
Expand Down
14 changes: 13 additions & 1 deletion src/kiara/registries/jobs/job_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy.engine import Engine

from kiara.models.module.jobs import JobMatcher, JobRecord
from kiara.registries import SqliteArchiveConfig
from kiara.registries import SqliteArchiveConfig, ArchiveDetails
from kiara.registries.jobs import JobArchive, JobStore
from kiara.utils.db import create_archive_engine, delete_archive_db

Expand Down Expand Up @@ -274,7 +274,19 @@ def _delete_archive(self):

delete_archive_db(db_path=self.sqlite_path)

def get_archive_details(self) -> ArchiveDetails:

all_job_records_sql = text("SELECT COUNT(*) FROM job_records")

with self.sqlite_engine.connect() as connection:
result = connection.execute(all_job_records_sql)
job_count = result.fetchone()[0]

details = {
"no_job_records": job_count,
"dynamic_archive": False
}
return ArchiveDetails(**details)
class SqliteJobStore(SqliteJobArchive, JobStore):

_archive_type_name = "sqlite_job_store"
Expand Down
36 changes: 25 additions & 11 deletions src/kiara/registries/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
# -*- coding: utf-8 -*-
import uuid
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Mapping, Union, Generator, Tuple
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
List,
Literal,
Mapping,
Tuple,
Union,
)

from pydantic import Field, field_validator

from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER
from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER, DEFAULT_DATA_STORE_MARKER
from kiara.models import KiaraModel
from kiara.models.events import RegistryEvent
from kiara.models.metadata import CommentMetadata, KiaraMetadata
Expand Down Expand Up @@ -82,7 +93,7 @@ def __init__(self, kiara: "Kiara"):
self._event_callback: Callable = self._kiara.event_registry.add_producer(self)

self._metadata_archives: Dict[str, MetadataArchive] = {}
self._default_data_store: Union[str, None] = None
self._default_metadata_store: Union[str, None] = None

# self._env_registry: EnvironmentRegistry = self._kiara.environment_registry

Expand Down Expand Up @@ -114,14 +125,14 @@ def register_metadata_archive(
if isinstance(archive, MetadataStore):
is_store = True

if set_as_default_store and self._default_data_store is not None:
if set_as_default_store and self._default_metadata_store is not None:
raise Exception(
f"Can't set data store '{alias}' as default store: default store already set."
)

if self._default_data_store is None or set_as_default_store:
if self._default_metadata_store is None or set_as_default_store:
is_default_store = True
self._default_data_store = alias
self._default_metadata_store = alias

event = MetadataArchiveAddedEvent(
kiara_id=self._kiara.id,
Expand All @@ -135,10 +146,10 @@ def register_metadata_archive(
return alias

@property
def default_data_store(self) -> str:
if self._default_data_store is None:
def default_metadata_store(self) -> str:
if self._default_metadata_store is None:
raise Exception("No default metadata store set.")
return self._default_data_store
return self._default_metadata_store

@property
def metadata_archives(self) -> Mapping[str, MetadataArchive]:
Expand All @@ -152,8 +163,9 @@ def get_archive(
None,
DEFAULT_STORE_MARKER,
DEFAULT_METADATA_STORE_MARKER,
DEFAULT_DATA_STORE_MARKER
):
archive_id_or_alias = self.default_data_store
archive_id_or_alias = self.default_metadata_store
if archive_id_or_alias is None:
raise Exception(
"Can't retrieve default metadata archive, none set (yet)."
Expand Down Expand Up @@ -186,7 +198,9 @@ def get_archive(
f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered."
)

def find_metadata_items(self, matcher: MetadataMatcher) -> Generator[Tuple[Any, ...], None, None]:
def find_metadata_items(
self, matcher: MetadataMatcher
) -> Generator[Tuple[Any, ...], None, None]:

mounted_store: MetadataArchive = self.get_archive()

Expand Down
12 changes: 7 additions & 5 deletions src/kiara/registries/metadata/metadata_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,19 @@ def find_matching_metadata_items(
reference_item_result_fields=reference_item_result_fields,
)

def store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, None]):

return self._store_metadata_and_ref_items(items)
def store_metadata_and_ref_items(
self, items: Generator[Tuple[Any, ...], None, None]
):

return self._store_metadata_and_ref_items(items)

@abc.abstractmethod
def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, None]):
def _store_metadata_and_ref_items(
self, items: Generator[Tuple[Any, ...], None, None]
):

pass


@abc.abstractmethod
def _find_matching_metadata_and_ref_items(
self,
Expand Down
40 changes: 29 additions & 11 deletions src/kiara/registries/metadata/metadata_store/sqlite_store.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
import uuid
from pathlib import Path
from typing import Any, Dict, Generator, Iterable, Mapping, Tuple, Union, List
from typing import Any, Dict, Generator, Iterable, List, Mapping, Tuple, Union

import orjson
from sqlalchemy import text
from sqlalchemy.engine import Engine

from kiara.exceptions import KiaraException
from kiara.registries import SqliteArchiveConfig
from kiara.registries import SqliteArchiveConfig, ArchiveDetails
from kiara.registries.metadata import MetadataArchive, MetadataMatcher, MetadataStore
from kiara.utils.dates import get_current_time_incl_timezone
from kiara.utils.db import create_archive_engine, delete_archive_db
Expand Down Expand Up @@ -140,7 +140,8 @@ def sqlite_engine(self) -> "Engine":
reference_item_id TEXT NOT NULL,
reference_created TEXT NOT NULL,
metadata_item_id TEXT NOT NULL,
FOREIGN KEY (metadata_item_id) REFERENCES metadata (metadata_item_id)
FOREIGN KEY (metadata_item_id) REFERENCES metadata (metadata_item_id),
UNIQUE (reference_item_type, reference_item_key, reference_item_id, metadata_item_id, reference_created)
);
"""

Expand All @@ -153,10 +154,13 @@ def sqlite_engine(self) -> "Engine":
# event.listen(self._cached_engine, "connect", _pragma_on_connect)
return self._cached_engine

def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None, None]):
def _store_metadata_and_ref_items(
self, items: Generator[Tuple[Any, ...], None, None]
):

insert_metadata_sql = text(
"INSERT OR IGNORE INTO metadata (metadata_item_id, metadata_item_created, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_created, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)")
"INSERT OR IGNORE INTO metadata (metadata_item_id, metadata_item_created, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_created, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)"
)

insert_ref_sql = text(
"INSERT OR IGNORE INTO metadata_references (reference_item_type, reference_item_key, reference_item_id, reference_created, metadata_item_id) VALUES (:reference_item_type, :reference_item_key, :reference_item_id, :reference_created, :metadata_item_id)"
Expand All @@ -167,32 +171,28 @@ def _store_metadata_and_ref_items(self, items: Generator[Tuple[Any, ...], None,
with self.sqlite_engine.connect() as conn:

from sqlalchemy import Row

metadata_items: List[Row] = []
ref_items = []

for item in items:
if item.result_type == "metadata_item":
metadata_items.append(item._asdict())
elif item.result_type == "metadata_ref_item":
print(f"ADDING: {item}")
ref_items.append(item._asdict())
else:
raise KiaraException(f"Unknown result type '{item.result_type}'")

if len(metadata_items) >= batch_size:
print(f"STOREING: {len(metadata_items)} metadata items")
conn.execute(insert_metadata_sql, metadata_items)
metadata_items.clear()
if len(ref_items) >= batch_size:
print(f"STOREING: {len(ref_items)} reference items")
conn.execute(insert_ref_sql, ref_items)
ref_items.clear()

if metadata_items:
print(f"STOREING: {len(metadata_items)} metadata items")
conn.execute(insert_metadata_sql, metadata_items)
if ref_items:
print(f"STOREING: {len(ref_items)} reference items")
conn.execute(insert_ref_sql, ref_items)

conn.commit()
Expand Down Expand Up @@ -322,7 +322,6 @@ def _find_matching_metadata_and_ref_items(

ref_sql = text(ref_sql_string)


with self.sqlite_engine.connect() as connection:
result = connection.execute(sql, params)
for row in result:
Expand Down Expand Up @@ -375,6 +374,25 @@ def _delete_archive(self):

delete_archive_db(db_path=self.sqlite_path)

def get_archive_details(self) -> ArchiveDetails:

all_metadata_items_sql = text("SELECT COUNT(*) FROM metadata")
all_references_sql = text("SELECT COUNT(*) FROM metadata_references")

with self.sqlite_engine.connect() as connection:
result = connection.execute(all_metadata_items_sql)
metadata_count = result.fetchone()[0]

result = connection.execute(all_references_sql)
reference_count = result.fetchone()[0]

details = {
"no_metadata_items": metadata_count,
"no_references": reference_count,
"dynamic_archive": False
}
return ArchiveDetails(**details)


class SqliteMetadataStore(SqliteMetadataArchive, MetadataStore):

Expand Down

0 comments on commit 759df43

Please sign in to comment.