Skip to content

Commit

Permalink
feat: make store type configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 30, 2024
1 parent 091a144 commit 7745b7b
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 76 deletions.
80 changes: 64 additions & 16 deletions src/kiara/context/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@
import os
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Type, Union
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Literal,
Mapping,
Type,
Union,
)

import structlog
from pydantic import BaseModel, ConfigDict, field_validator, model_validator
Expand Down Expand Up @@ -362,6 +372,10 @@ def load_from_file(cls, path: Union[Path, str, None] = None) -> "KiaraConfig":
description="The name of the default context to use if none is provided.",
default=DEFAULT_CONTEXT_NAME,
)
default_store_type: Literal["sqlite", "filesystem"] = Field(
description="The default store type to ues if not specified.",
default="filesystem",
)
auto_generate_contexts: bool = Field(
description="Whether to auto-generate requested contexts if they don't exist yet.",
default=True,
Expand Down Expand Up @@ -534,35 +548,69 @@ def create_default_sqlite_archive_config() -> Dict[str, Any]:

if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys():

default_sqlite_config = create_default_sqlite_archive_config()

data_store = KiaraArchiveConfig(
archive_type="sqlite_data_store", config=default_sqlite_config
)
if self.default_store_type == "sqlite":
default_sqlite_config = create_default_sqlite_archive_config()
data_store = KiaraArchiveConfig(
archive_type="sqlite_data_store", config=default_sqlite_config
)
elif self.default_store_type == "filesystem":
data_store_type = "filesystem_data_store"
data_store = create_default_store_config(
store_type=data_store_type,
stores_base_path=os.path.join(filesystem_base_path, "data"),
)
else:
raise Exception(
f"Can't create default data store: invalid default store type '{self.default_store_type}'."
)

context_config.archives[DEFAULT_DATA_STORE_MARKER] = data_store
changed = True

if DEFAULT_JOB_STORE_MARKER not in context_config.archives.keys():

if default_sqlite_config is None:
default_sqlite_config = create_default_sqlite_archive_config()
if self.default_store_type == "sqlite":

job_store = KiaraArchiveConfig(
archive_type="sqlite_job_store", config=default_sqlite_config
)
if default_sqlite_config is None:
default_sqlite_config = create_default_sqlite_archive_config()

job_store = KiaraArchiveConfig(
archive_type="sqlite_job_store", config=default_sqlite_config
)
elif self.default_store_type == "filesystem":
job_store_type = "filesystem_job_store"
job_store = create_default_store_config(
store_type=job_store_type,
stores_base_path=os.path.join(filesystem_base_path, "jobs"),
)
else:
raise Exception(
f"Can't create default job store: invalid default store type '{self.default_store_type}'."
)

context_config.archives[DEFAULT_JOB_STORE_MARKER] = job_store
changed = True

if DEFAULT_ALIAS_STORE_MARKER not in context_config.archives.keys():

if default_sqlite_config is None:
default_sqlite_config = create_default_sqlite_archive_config()
if self.default_store_type == "sqlite":

alias_store = KiaraArchiveConfig(
archive_type="sqlite_alias_store", config=default_sqlite_config
)
if default_sqlite_config is None:
default_sqlite_config = create_default_sqlite_archive_config()

alias_store = KiaraArchiveConfig(
archive_type="sqlite_alias_store", config=default_sqlite_config
)
elif self.default_store_type == "filesystem":
alias_store_type = "filesystem_alias_store"
alias_store = create_default_store_config(
store_type=alias_store_type,
stores_base_path=os.path.join(filesystem_base_path, "aliases"),
)
else:
raise Exception(
f"Can't create default alias store: invalid default store type '{self.default_store_type}'."
)

context_config.archives[DEFAULT_ALIAS_STORE_MARKER] = alias_store
changed = True
Expand Down
11 changes: 8 additions & 3 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,9 @@ def create_operation(
if not module_config:
raise Exception("Pipeline configuration can't be empty.")
assert module_config is None or isinstance(module_config, Mapping)
operation = create_operation("pipeline", operation_config=module_config)
operation = create_operation(
"pipeline", operation_config=module_config, kiara=self.context
)
return operation
else:
mc = Manifest(module_type=module_type, module_config=module_config)
Expand Down Expand Up @@ -1680,7 +1682,7 @@ def store_value(
def store_values(
self,
values: Mapping[str, Union[str, uuid.UUID, Value]],
alias_map: Mapping[str, Iterable[str]],
alias_map: Union[Mapping[str, Iterable[str]], None] = None,
) -> StoreValuesResult:
"""
Store multiple values into the (default) kiara value store.
Expand All @@ -1697,7 +1699,10 @@ def store_values(
"""
result = {}
for field_name, value in values.items():
aliases = alias_map.get(field_name)
if alias_map:
aliases = alias_map.get(field_name)
else:
aliases = None
value_obj = self.get_value(value)
store_result = self.store_value(value=value_obj, alias=aliases)
result[field_name] = store_result
Expand Down
57 changes: 44 additions & 13 deletions src/kiara/registries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@

import abc
import os
import typing
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Generic, Iterable, Type, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Iterable,
Mapping,
Type,
TypeVar,
Union,
)

import structlog
from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -56,10 +65,17 @@ class ArchiveDetails(BaseModel):
)


class ArchiveMetadta(BaseModel):

archive_id: Union[uuid.UUID, None] = Field(
description="The id of the stored archive.", default=None
)


NON_ARCHIVE_DETAILS = ArchiveDetails()


class KiaraArchive(abc.ABC, typing.Generic[ARCHIVE_CONFIG_CLS]):
class KiaraArchive(abc.ABC, Generic[ARCHIVE_CONFIG_CLS]):

_config_cls: Type[ARCHIVE_CONFIG_CLS] = None # type: ignore

Expand Down Expand Up @@ -89,7 +105,7 @@ class KiaraArchive(abc.ABC, typing.Generic[ARCHIVE_CONFIG_CLS]):
@classmethod
def _load_store_config(
cls, store_uri: str, allow_write_access: bool, **kwargs
) -> Union[typing.Dict[str, typing.Any], None]:
) -> Union[Dict[str, Any], None]:
"""Tries to assemble an archive config from an uri (and optional paramters).
If the archive type supports the archive at the uri, then a valid config will be returned,
Expand All @@ -101,7 +117,7 @@ def _load_store_config(
@classmethod
def load_store_config(
cls, store_uri: str, allow_write_access: bool, **kwargs
) -> Union[typing.Dict[str, typing.Any], None]:
) -> Union[Dict[str, Any], None]:

log_message(
"attempt_loading_existing_store",
Expand Down Expand Up @@ -140,7 +156,17 @@ def __init__(
self._archive_alias: str = archive_alias
self._config: ARCHIVE_CONFIG_CLS = archive_config
self._force_read_only: bool = force_read_only
self._archive_id: Union[uuid.UUID, None] = None

self._archive_metadata: Union[Mapping[str, Any], None] = None

@property
def archive_metadata(self) -> ArchiveMetadta:

if self._archive_metadata is None:
archive_metadata = self._retrieve_archive_metadata()
self._archive_metadata = ArchiveMetadta(**archive_metadata)

return self._archive_metadata

@classmethod
@abc.abstractmethod
Expand All @@ -156,6 +182,16 @@ def _is_writeable(cls) -> bool:
def register_archive(self, kiara: "Kiara"):
pass

@abc.abstractmethod
def _retrieve_archive_metadata(self) -> Mapping[str, Any]:
"""Retrieve metadata for the archive.
Must contain at least one key 'archive_id', with a uuid-able value that
uniquely identifies the archive.
"""

raise NotImplementedError()

@property
def archive_alias(self) -> str:
return self._archive_alias
Expand All @@ -175,15 +211,10 @@ def is_writeable(self) -> bool:
# def register_archive(self, kiara: "Kiara"):
# pass

@abc.abstractmethod
def _retrieve_archive_id(self) -> uuid.UUID:
raise NotImplementedError()

@property
def archive_id(self) -> uuid.UUID:
if self._archive_id is None:
self._archive_id = self._retrieve_archive_id()
return self._archive_id

return self.archive_metadata.archive_id

@property
def config(self) -> ARCHIVE_CONFIG_CLS:
Expand Down
47 changes: 43 additions & 4 deletions src/kiara/registries/aliases/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import shutil
import uuid
from pathlib import Path
from typing import Mapping, Set, Union
from typing import Any, Mapping, Set, Union

from orjson import orjson

from kiara.registries import ARCHIVE_CONFIG_CLS, FileSystemArchiveConfig
from kiara.registries.aliases import AliasArchive, AliasStore
Expand All @@ -21,11 +23,44 @@ class FileSystemAliasArchive(AliasArchive):
_archive_type_name = "filesystem_alias_archive"
_config_cls = FileSystemArchiveConfig # type: ignore

def __init__(self, archive_id: uuid.UUID, config: ARCHIVE_CONFIG_CLS):

super().__init__(archive_id=archive_id, config=config)
def __init__(
self,
archive_alias: str,
archive_config: ARCHIVE_CONFIG_CLS,
force_read_only: bool = False,
):

super().__init__(
archive_alias=archive_alias,
archive_config=archive_config,
force_read_only=force_read_only,
)

self._base_path: Union[Path, None] = None
self._archive_metadata: Union[Mapping[str, Any], None] = None

def _retrieve_archive_metadata(self) -> Mapping[str, Any]:

if self._archive_metadata is not None:
return self._archive_metadata

if not self.archive_metadata_path.is_file():
_archive_metadata = {}
else:
_archive_metadata = orjson.loads(self.archive_metadata_path.read_bytes())

archive_id = _archive_metadata.get("archive_id", None)
if not archive_id:
try:
_archive_id = uuid.UUID(self.alias_store_path.name)
_archive_metadata["archive_id"] = _archive_id
except Exception:
raise Exception(
f"Could not retrieve archive id for alias archive '{self.archive_alias}'."
)

self._archive_metadata = _archive_metadata
return self._archive_metadata

@property
def alias_store_path(self) -> Path:
Expand All @@ -38,6 +73,10 @@ def alias_store_path(self) -> Path:
self._base_path.mkdir(parents=True, exist_ok=True)
return self._base_path

@property
def archive_metadata_path(self) -> Path:
return self.alias_store_path / "store_metadata.json"

@property
def aliases_path(self) -> Path:
return self.alias_store_path / "aliases"
Expand Down
10 changes: 4 additions & 6 deletions src/kiara/registries/aliases/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig):
self._cached_engine: Union[Engine, None] = None
# self._lock: bool = True

def _retrieve_archive_id(self) -> uuid.UUID:
sql = text("SELECT value FROM archive_metadata WHERE key='archive_id'")
def _retrieve_archive_metadata(self) -> Mapping[str, Any]:

sql = text("SELECT key, value FROM archive_metadata")

with self.sqlite_engine.connect() as connection:
result = connection.execute(sql)
row = result.fetchone()
if row is None:
raise Exception("No archive ID found in metadata")
return uuid.UUID(row[0])
return {row[0]: row[1] for row in result}

@property
def sqlite_path(self):
Expand Down
Loading

0 comments on commit 7745b7b

Please sign in to comment.