Skip to content

Commit

Permalink
chore: start working on data export
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 9, 2024
1 parent 1312a5f commit 2462c2b
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 28 deletions.
26 changes: 26 additions & 0 deletions docs/development/stores.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# *kiara* stores

This page contains some information about how *kiara* stores work.

Practically, there are two types of stores in *kiara*:

- *archives*: stores that can only be read from, but not written to
- *stores*: atual 'stores', those are read as well as write

*kiara* has different store types, depending on what exactly is stored:

- *data stores*: stores that store actual data, those are the most important ones
- *alias stores*: stores that keep human readable references (aliases), and link them to actual data (using their value_id)
- *job stores*: stores details and records about past jobs that were run in a *kiara* instance

## Base store

All archives & stores inherit from the base class 'kiara.registries.BaseArchive', which manages basic attributes like thie stores id, it's configuration, and it holds a reference to the current kiara context.

As a developer, you probably won't be using this directly, but you will inherit from either a higher level abstract base class, in case of data-stores that would be:

- `kiara.registries.data.DataArchive`
- `kiara.registries.data.DataStore`

Depending on what you want to store, it's a good idea to check out the source code of those base classes, and look up which methods you need to implement.
Also, you can check out the default implementation of such a store/archive ('filesystem'-based in all cases), to get an idea what needs to happen in each of those methods.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ filesystem_destiny_archive = "kiara.registries.destinies.filesystem_store:FileSy
filesystem_destiny_store = "kiara.registries.destinies.filesystem_store:FileSystemDestinyStore"
filesystem_workflow_archive = "kiara.registries.workflows.archives:FileSystemWorkflowArchive"
filesystem_workflow_store = "kiara.registries.workflows.archives:FileSystemWorkflowStore"
sqlite_data_archive = "kiara.registries.data.data_store.sqlite_store:SqliteDataArchive"
sqlite_data_store = "kiara.registries.data.data_store.sqlite_store:SqliteDataStore"

[project.entry-points."kiara.cli_subcommands"]

Expand Down
4 changes: 3 additions & 1 deletion src/kiara/context/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:

changed = False
if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys():
data_store_type = "filesystem_data_store"
# data_store_type = "filesystem_data_store"
# TODO: change that back
data_store_type = "sqlite_data_store"
assert data_store_type in available_archives.item_infos.keys()

data_store_id = ID_REGISTRY.generate(comment="default data store id")
Expand Down
4 changes: 3 additions & 1 deletion src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,9 @@ def store_value(
result = StoreValueResult(
value=value_obj,
aliases=sorted(alias) if alias else [],
error=str(e),
error=str(e)
if str(e)
else f"Unknown error (type '{type(e).__name__}').",
persisted_data=persisted_data,
)

Expand Down
7 changes: 7 additions & 0 deletions src/kiara/registries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,10 @@ class FileSystemArchiveConfig(ArchiveConfig):
archive_path: str = Field(
description="The path where the data for this archive is stored."
)


class SqliteArchiveConfig(ArchiveConfig):

archive_path: str = Field(
description="The path where the data for this archive is stored."
)
90 changes: 75 additions & 15 deletions src/kiara/registries/data/data_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@


class DataArchive(BaseArchive):
"""Base class for data archiv implementationss."""

@classmethod
def is_writeable(cls) -> bool:
"""Archives are never writable."""
return False

@classmethod
def supported_item_types(cls) -> Iterable[str]:
"""This archive type only supports storing data."""

return ["data"]

Expand All @@ -42,6 +50,7 @@ def __init__(self, archive_id: uuid.UUID, config: ARCHIVE_CONFIG_CLS):
def retrieve_serialized_value(
self, value: Union[uuid.UUID, Value]
) -> PersistedData:
"""Retrieve a 'PersistedData' instance from a value id or value instance."""

if isinstance(value, Value):
value_id: uuid.UUID = value.value_id
Expand All @@ -64,9 +73,21 @@ def retrieve_serialized_value(

@abc.abstractmethod
def _retrieve_serialized_value(self, value: Value) -> PersistedData:
"""Retrieve a 'PersistedData' instance from a value instance.
This method basically implements the store-specific logic to serialize/deserialize the value data to/from disk.
Raise an exception if the value is not persisted in this archive.
"""
pass

Check failure on line 82 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:82:9: PIE790 Unnecessary `pass` statement

def retrieve_value(self, value_id: uuid.UUID) -> Value:
"""Retrieve the value for the specified value_id.
Looks up the value in the cache first, and if not found, calls the '_retrieve_value_details' method to retrieve
Raises an exception if the value is not persisted in this archive.
"""

cached = self._value_cache.get(value_id, None)
if cached is not None:
Expand Down Expand Up @@ -100,12 +121,18 @@ def retrieve_value(self, value_id: uuid.UUID) -> Value:

@abc.abstractmethod
def _retrieve_value_details(self, value_id: uuid.UUID) -> Mapping[str, Any]:
"""Retrieve the value details for the specified value_id from disk.
This method basically implements the store-specific logic to retrieve the value details from disk.
"""
pass

Check failure on line 129 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:129:9: PIE790 Unnecessary `pass` statement

@property
def value_ids(self) -> Union[None, Iterable[uuid.UUID]]:
return self._retrieve_all_value_ids()

@abc.abstractmethod
def _retrieve_all_value_ids(
self, data_type_name: Union[str, None] = None
) -> Union[None, Iterable[uuid.UUID]]:
Expand Down Expand Up @@ -153,6 +180,11 @@ def retrieve_environment_details(
def _retrieve_environment_details(
self, env_type: str, env_hash: str
) -> Mapping[str, Any]:
"""Retrieve the environment details with the specified type and hash.
Each store needs to implement this so environemnt details related to a value can be retrieved later on. Since in most cases the environment details will not change, a lookup is more efficient than having to store the full information with each value.
"""

pass

Check failure on line 188 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:188:9: PIE790 Unnecessary `pass` statement

def find_values(self, matcher: ValueMatcher) -> Iterable[Value]:
Expand All @@ -164,12 +196,16 @@ def find_values_with_hash(
value_size: Union[int, None] = None,
data_type_name: Union[str, None] = None,
) -> Set[uuid.UUID]:
"""Find all values that have data that match the specifid hash.
if data_type_name is not None:
raise NotImplementedError()
If the data type name is specified, only values of that type are considered, which should speed up the search. Same with 'value_size'. But both filters are not implemented yet.
"""

if value_size is not None:
raise NotImplementedError()
# if data_type_name is not None:
# raise NotImplementedError()
#
# if value_size is not None:
# raise NotImplementedError()

if value_hash in self._value_hash_index.keys():
value_ids: Union[Set[uuid.UUID], None] = self._value_hash_index[value_hash]
Expand All @@ -182,6 +218,9 @@ def find_values_with_hash(
self._value_hash_index[value_hash] = value_ids

assert value_ids is not None

# TODO: if data_type_name or value_size are specified, validate the results?

return value_ids

@abc.abstractmethod
Expand All @@ -191,11 +230,23 @@ def _find_values_with_hash(
value_size: Union[int, None] = None,
data_type_name: Union[str, None] = None,
) -> Union[Set[uuid.UUID], None]:
"""Find all values that have data that match the specifid hash.
If the data type name is specified, only values of that type are considered, which should speed up the search. Same with 'value_size'.
This needs to be implemented in the implementing store though, and might or might not be used.
"""

pass

Check failure on line 239 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:239:9: PIE790 Unnecessary `pass` statement

def find_destinies_for_value(
self, value_id: uuid.UUID, alias_filter: Union[str, None] = None
) -> Union[Mapping[str, uuid.UUID], None]:
"""Find all destinies for the specified value id.
TODO: explain destinies, and when they would be used.
For now, you can just return 'None' in your implementation.
"""

return self._find_destinies_for_value(
value_id=value_id, alias_filter=alias_filter
Expand All @@ -205,6 +256,13 @@ def find_destinies_for_value(
def _find_destinies_for_value(
self, value_id: uuid.UUID, alias_filter: Union[str, None] = None
) -> Union[Mapping[str, uuid.UUID], None]:
"""Find all destinies for the specified value id.
TODO: explain destinies, and when they would be used.
For now, you can just return 'None' in your implementation.
"""

pass

Check failure on line 266 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:266:9: PIE790 Unnecessary `pass` statement

@abc.abstractmethod
Expand All @@ -214,18 +272,11 @@ def retrieve_chunk(
as_file: Union[bool, str, None] = None,
symlink_ok: bool = True,
) -> Union[bytes, str]:
pass
"""Retrieve the chunk with the specified id.
# def retrieve_job_record(self, inputs_manifest: InputsManifest) -> Optional[JobRecord]:
# return self._retrieve_job_record(
# manifest_hash=inputs_manifest.manifest_hash, jobs_hash=inputs_manifest.jobs_hash
# )
#
# @abc.abstractmethod
# def _retrieve_job_record(
# self, manifest_hash: int, jobs_hash: int
# ) -> Optional[JobRecord]:
# pass
If 'as_file' is specified, the chunk is written to a file, and the file path is returned. Otherwise, the chunk is returned as 'bytes'.
"""
pass

Check failure on line 279 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:279:9: PIE790 Unnecessary `pass` statement


class DataStore(DataArchive):
Expand Down Expand Up @@ -255,6 +306,10 @@ class BaseDataStore(DataStore):

@abc.abstractmethod
def _persist_stored_value_info(self, value: Value, persisted_value: PersistedData):
"""Store the details about the persisted data.
This is used so an archive of this type can load the value data again later on. Value metadata is stored separately, later, using the '_persist_value_details' method.
"""
pass

Check failure on line 313 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:313:9: PIE790 Unnecessary `pass` statement

@abc.abstractmethod
Expand All @@ -263,6 +318,7 @@ def _persist_value_details(self, value: Value):

@abc.abstractmethod
def _persist_value_data(self, value: Value) -> PersistedData:
"""Persist the actual value data."""
pass

Check failure on line 322 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:322:9: PIE790 Unnecessary `pass` statement

@abc.abstractmethod
Expand All @@ -278,6 +334,10 @@ def _persist_value_pedigree(self, value: Value):
def _persist_environment_details(
self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
):
"""Persist the environment details.
Each store type needs to store this for lookup purposes.
"""
pass

Check failure on line 341 in src/kiara/registries/data/data_store/__init__.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (PIE790)

src/kiara/registries/data/data_store/__init__.py:341:9: PIE790 Unnecessary `pass` statement

@abc.abstractmethod
Expand Down
12 changes: 1 addition & 11 deletions src/kiara/registries/data/data_store/filesystem_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
)
from kiara.registries import ArchiveDetails, FileSystemArchiveConfig
from kiara.registries.data.data_store import BaseDataStore, DataArchive
from kiara.registries.jobs import JobArchive
from kiara.utils import log_message
from kiara.utils.hashfs import HashAddress, HashFS
from kiara.utils.json import orjson_dumps
Expand Down Expand Up @@ -53,22 +52,13 @@ class EntityType(Enum):
DEFAULT_HASH_FS_ALGORITHM = "sha256"


class FileSystemDataArchive(DataArchive, JobArchive):
class FileSystemDataArchive(DataArchive):

"""Data store that loads data from the local filesystem."""

_archive_type_name = "filesystem_data_archive"
_config_cls = FileSystemArchiveConfig # type: ignore

# @classmethod
# def supported_item_types(cls) -> Iterable[str]:
#
# return ["data", "job_record"]

@classmethod
def is_writeable(cls) -> bool:
return False

def __init__(self, archive_id: uuid.UUID, config: FileSystemArchiveConfig):

DataArchive.__init__(self, archive_id=archive_id, config=config)
Expand Down
Loading

0 comments on commit 2462c2b

Please sign in to comment.