Skip to content

Commit

Permalink
chore: ongoing work stores
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 29, 2024
1 parent 422224b commit 091a144
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 79 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ sqlite_alias_archive = "kiara.registries.aliases.sqlite_store:SqliteAliasArchive
sqlite_alias_store = "kiara.registries.aliases.sqlite_store:SqliteAliasStore"
sqlite_job_archive = "kiara.registries.jobs.job_store.sqlite_store:SqliteJobArchive"
sqlite_job_store = "kiara.registries.jobs.job_store.sqlite_store:SqliteJobStore"
sqlite_workflow_archive = "kiara.registries.workflows.sqlite_store:SqliteWorkflowArchive"
sqlite_workflow_store = "kiara.registries.workflows.sqlite_store:SqliteWorkflowStore"

[project.entry-points."kiara.cli_subcommands"]

Expand Down
45 changes: 23 additions & 22 deletions src/kiara/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# from alembic import command # type: ignore
from pydantic import Field

from kiara.context.config import KiaraArchiveReference, KiaraConfig, KiaraContextConfig
from kiara.context.config import KiaraConfig, KiaraContextConfig
from kiara.context.runtime_config import KiaraRuntimeConfig
from kiara.data_types import DataType
from kiara.exceptions import KiaraContextException
Expand Down Expand Up @@ -47,6 +47,7 @@
from kiara.utils import log_exception, log_message
from kiara.utils.class_loading import find_all_archive_types
from kiara.utils.operations import filter_operations
from kiara.utils.stores import check_external_archive

# Copyright (c) 2021, University of Luxembourg / DHARPA project
# Copyright (c) 2021, Markus Binsteiner
Expand Down Expand Up @@ -322,35 +323,35 @@ def register_external_archive(
self,
archive: Union[str, KiaraArchive, List[KiaraArchive], List[str]],
allow_write_access: bool = False,
):

if isinstance(archive, (KiaraArchive, str)):
_archives = [archive]
else:
_archives = archive

archive_instances = set()
for _archive in _archives:

if isinstance(_archive, KiaraArchive):
archive_instances.add(_archive)
# TODO: handle write access
continue
) -> Dict[str, str]:
"""Register one or several external archives with the context.
loaded = KiaraArchiveReference.load_existing_archive(
archive_uri=_archive, allow_write_access=allow_write_access
)
In case you provide KiaraArchive instances, they will be modified in case the provided 'allow_write_access' is different from the 'is_force_read_only' attribute of the archive.
"""

archive_instances.update(loaded)
archive_instances = check_external_archive(
archive=archive, allow_write_access=allow_write_access
)

for _archve_inst in archive_instances:
result = {}
for _archive_inst in archive_instances:
log_message(
"register.external.archive",
archive=_archve_inst.archive_alias,
archive=_archive_inst.archive_alias,
allow_write_access=allow_write_access,
)

return list(archive_instances)
_archive_inst.set_force_read_only(not allow_write_access)

supported_item_types = _archive_inst.supported_item_types()
if "data" in supported_item_types:
result["data"] = self.data_registry.register_data_archive(_archive_inst) # type: ignore
if "alias" in supported_item_types:
result["alias"] = self.alias_registry.register_archive(_archive_inst) # type: ignore
if "job_record" in supported_item_types:
result["job_record"] = self.job_registry.register_job_archive(_archive_inst) # type: ignore

return result

def create_manifest(
self, module_or_operation: str, config: Union[Mapping[str, Any], None] = None
Expand Down
111 changes: 76 additions & 35 deletions src/kiara/context/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ def load_existing_archive(
allow_write_access=allow_write_access,
**kwargs,
)
archive_config = archive_cls._config_cls(**data)
archive: KiaraArchive = archive_cls(
archive_alias=archive_alias, archive_config=data
archive_alias=archive_alias, archive_config=archive_config
)
archive_configs.append(data)
wrapped_archive_config = KiaraArchiveConfig(
archive_type=store_type, config=data
)
archive_configs.append(wrapped_archive_config)
archives.append(archive)
else:
for st in store_type:
Expand All @@ -117,25 +121,33 @@ def load_existing_archive(
allow_write_access=allow_write_access,
**kwargs,
)
archive_config = archive_cls._config_cls(**data)
archive: KiaraArchive = archive_cls(
archive_alias=archive_alias, archive_config=data
archive_alias=archive_alias, archive_config=archive_config
)
wrapped_archive_config = KiaraArchiveConfig(
archive_type=st, config=data
)
archive_configs.append(data)
archive_configs.append(wrapped_archive_config)
archives.append(archive)
else:
for archive_type, archive_cls in archive_types.items():

data = archive_cls.load_store_config(
store_uri=archive_uri,
allow_write_access=allow_write_access,
**kwargs,
)

if data is None:
continue
archive_config = archive_cls._config_cls(**data)
archive: KiaraArchive = archive_cls(
archive_alias=archive_alias, archive_config=data
archive_alias=archive_alias, archive_config=archive_config
)
wrapped_archive_config = KiaraArchiveConfig(
archive_type=archive_type, config=data
)
archive_configs.append(data)
archive_configs.append(wrapped_archive_config)
archives.append(archive)

if archives is None:
Expand Down Expand Up @@ -183,10 +195,14 @@ def archives(self) -> List["KiaraArchive"]:
)

archive_cls = archive_types[config.archive_type]
archive = archive_cls.load_store_config(
archive_config_data = archive_cls.load_store_config(
archive_uri=self.archive_uri,
allow_write_access=self.allow_write_access,
)
archive_config = archive_cls._config_cls(**archive_config_data)
archive = archive_cls(
archive_alias=self.archive_alias, archive_config=archive_config
)
result.append(archive)

self._archives = result
Expand Down Expand Up @@ -479,46 +495,82 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:
sqlite_base_path = os.path.join(self.stores_base_path, "sqlite_stores")
filesystem_base_path = os.path.join(self.stores_base_path, "filesystem_stores")

def create_default_sqlite_archive_config() -> Dict[str, Any]:

store_id = str(uuid.uuid4())
file_name = f"{store_id}.sqlite"
archive_path = Path(
os.path.abspath(os.path.join(sqlite_base_path, file_name))
)

if archive_path.exists():
raise Exception(
f"Archive path '{archive_path.as_posix()}' already exists."
)

archive_path.parent.mkdir(exist_ok=True, parents=True)

# Connect to the SQLite database (or create it if it doesn't exist)
import sqlite3

conn = sqlite3.connect(archive_path)

# Create a cursor object
c = conn.cursor()
# Create table
c.execute(
"""CREATE TABLE archive_metadata
(key text PRIMARY KEY , value text NOT NULL)"""
)
c.execute(
f"INSERT INTO archive_metadata VALUES ('archive_id','{store_id}')"

Check failure on line 526 in src/kiara/context/config.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (S608)

src/kiara/context/config.py:526:17: S608 Possible SQL injection vector through string-based query construction
)
conn.commit()
conn.close()

return {"sqlite_db_path": archive_path.as_posix()}

default_sqlite_config: Union[Dict[str, Any], None] = None

if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys():
# data_store_type = "filesystem_data_store"
# TODO: change that back
data_store_type = "sqlite_data_store"

data_store = create_default_store_config(
store_type=data_store_type,
stores_base_path=sqlite_base_path,
default_sqlite_config = create_default_sqlite_archive_config()

data_store = KiaraArchiveConfig(
archive_type="sqlite_data_store", config=default_sqlite_config
)

context_config.archives[DEFAULT_DATA_STORE_MARKER] = data_store
changed = True

if DEFAULT_JOB_STORE_MARKER not in context_config.archives.keys():

# job_store_type = "filesystem_job_store"
job_store_type = "sqlite_job_store"
if default_sqlite_config is None:
default_sqlite_config = create_default_sqlite_archive_config()

job_store = create_default_store_config(
store_type=job_store_type,
stores_base_path=sqlite_base_path,
job_store = KiaraArchiveConfig(
archive_type="sqlite_job_store", config=default_sqlite_config
)

context_config.archives[DEFAULT_JOB_STORE_MARKER] = job_store
changed = True

if DEFAULT_ALIAS_STORE_MARKER not in context_config.archives.keys():

alias_store_type = "filesystem_alias_store"
alias_store_type = "sqlite_alias_store"
if default_sqlite_config is None:
default_sqlite_config = create_default_sqlite_archive_config()

alias_store = create_default_store_config(
store_type=alias_store_type,
stores_base_path=self.stores_base_path,
alias_store = KiaraArchiveConfig(
archive_type="sqlite_alias_store", config=default_sqlite_config
)

context_config.archives[DEFAULT_ALIAS_STORE_MARKER] = alias_store
changed = True

if DEFAULT_WORKFLOW_STORE_MARKER not in context_config.archives.keys():

workflow_store_type = "filesystem_workflow_store"
# workflow_store_type = "sqlite_workflow_store"

workflow_store = create_default_store_config(
store_type=workflow_store_type,
Expand All @@ -527,17 +579,6 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:
context_config.archives[DEFAULT_WORKFLOW_STORE_MARKER] = workflow_store
changed = True

# if METADATA_DESTINY_STORE_MARKER not in context_config.archives.keys():
#
# destiny_store_type = "filesystem_destiny_store"
#
# destiny_store = create_default_store_config(
# store_type=destiny_store_type,
# stores_base_path=os.path.join(filesystem_base_path, "destinies"),
# )
# context_config.archives[METADATA_DESTINY_STORE_MARKER] = destiny_store
# changed = True

return changed

def create_context_config(
Expand Down
5 changes: 4 additions & 1 deletion src/kiara/models/events/alias_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)

import uuid
from typing import Iterable, Literal
from typing import Iterable, Literal, Union

from pydantic import Field

Expand All @@ -27,6 +27,9 @@ class AliasArchiveAddedEvent(RegistryEvent):
is_default_store: bool = Field(
description="Whether this store acts as default store."
)
mount_point: Union[str, None] = Field(
description="The mountpoint of this alias archive (if specified)."
)


class AliasPreStoreEvent(RegistryEvent):
Expand Down
10 changes: 8 additions & 2 deletions src/kiara/registries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class KiaraArchive(abc.ABC, typing.Generic[ARCHIVE_CONFIG_CLS]):
@classmethod
def _load_store_config(
cls, store_uri: str, allow_write_access: bool, **kwargs
) -> Union[ARCHIVE_CONFIG_CLS, None]:
) -> Union[typing.Dict[str, typing.Any], None]:
"""Tries to assemble an archive config from an uri (and optional paramters).
If the archive type supports the archive at the uri, then a valid config will be returned,
Expand All @@ -101,7 +101,7 @@ def _load_store_config(
@classmethod
def load_store_config(
cls, store_uri: str, allow_write_access: bool, **kwargs
) -> Union[ARCHIVE_CONFIG_CLS, None]:
) -> Union[typing.Dict[str, typing.Any], None]:

log_message(
"attempt_loading_existing_store",
Expand Down Expand Up @@ -160,6 +160,12 @@ def register_archive(self, kiara: "Kiara"):
def archive_alias(self) -> str:
return self._archive_alias

def is_force_read_only(self) -> bool:
return self._force_read_only

def set_force_read_only(self, force_read_only: bool):
self._force_read_only = force_read_only

def is_writeable(self) -> bool:
if self._force_read_only:
return False
Expand Down
Loading

0 comments on commit 091a144

Please sign in to comment.