Skip to content

Commit

Permalink
refactor: move environment storage to new metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 20, 2024
1 parent 1568e38 commit 7aabc08
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 89 deletions.
1 change: 1 addition & 0 deletions src/kiara/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def context_info(self) -> "KiaraContextInfo":

@property
def environment_registry(self) -> EnvironmentRegistry:

return self._env_mgmt

@property
Expand Down
8 changes: 4 additions & 4 deletions src/kiara/models/values/value.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,10 +780,10 @@ class Value(ValueDetails):
environment_hashes: Mapping[str, Mapping[str, str]] = Field(
description="Hashes for the environments this value was created in."
)
enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
description="Information about the environments this value was created in.",
default=None,
)
# enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
# description="Information about the environments this value was created in.",
# default=None,
# )
property_links: Mapping[str, uuid.UUID] = Field(
description="Links to values that are properties of this value.",
default_factory=dict,
Expand Down
59 changes: 58 additions & 1 deletion src/kiara/registries/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def __init__(self, kiara: "Kiara"):
self._cached_data[NOT_SET_VALUE_ID] = SpecialValue.NOT_SET
self._registered_values[NOT_SET_VALUE_ID] = self._not_set_value
self._persisted_value_descs[NOT_SET_VALUE_ID] = NONE_PERSISTED_DATA
self._env_cache: Dict[str, Dict[str, Mapping[str, Any]]] = {}

self._none_value: Value = Value(
value_id=NONE_VALUE_ID,
Expand Down Expand Up @@ -500,6 +501,33 @@ def get_value(self, value: Union[uuid.UUID, ValueLink, str, Path]) -> Value:
self._registered_values[_value_id] = stored_value
return self._registered_values[_value_id]

def _persist_environment(self, env_type: str, env_hash: str):

cached = self._env_cache.get(env_type, {}).get(env_hash, None)
if cached is not None:
return

environment = self._kiara.environment_registry.get_environment_for_cid(
env_hash
)
# env_type = environment.get_environment_type_name()
# env_hash = str(environment.instance_cid)
#
# env = self._env_cache.get(env_type, {}).get(env_hash, None)
# if env is not None:
# return

env_data = environment.as_dict_with_schema()
ENVIRONMENT_MARKER_KEY = "environment"
self._kiara.metadata_registry.register_metadata_item(key = ENVIRONMENT_MARKER_KEY, item=environment)
# self._persist_environment_details(
# env_type=env_type, env_hash=env_hash, env_data=env_data
# )
self._env_cache.setdefault(env_type, {})[env_hash] = env_data




def store_value(
self,
value: Union[ValueLink, uuid.UUID, str],
Expand All @@ -510,10 +538,39 @@ def store_value(
If 'data_store' is not provided, the default data store is used. If the 'data_store' argument is of
type uuid, the archive_id is used, if string, first it will be converted to an uuid, if that works,
again, the archive_id is used, if not, the string is used as the archive alias.
"""
"""

_value = self.get_value(value)

# first, persist environment information
for env_type, env_hash in _value.pedigree.environments.items():

self._persist_environment(env_type, env_hash, _value)
# cached = self._env_cache.get(env_type, {}).get(env_hash, None)
# if cached is not None:
# continue
#
# environment = self.kiara_context.environment_registry.get_environment_for_cid(
# env_hash
# )
# env_type = environment.get_environment_type_name()
# env_hash = str(environment.instance_cid)
#
# env = self._env_cache.get(env_type, {}).get(env_hash, None)
# if env is not None:
# return
#
# env_data = environment.as_dict_with_schema()
# self._persist_environment_details(
# env_type=env_type, env_hash=env_hash, env_data=env_data
# )
# self._env_cache.setdefault(env_type, {})[env_hash] = env_data
#
# self.persist_environment(env)



store: DataStore = self.get_archive(archive_id_or_alias=data_store) # type: ignore
if not store.is_writeable():
if data_store:
Expand Down
74 changes: 37 additions & 37 deletions src/kiara/registries/data/data_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,14 @@ def _persist_value_pedigree(self, value: Value):
to the job that produced it is preserved.
"""

@abc.abstractmethod
def _persist_environment_details(
self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
):
"""Persist the environment details.
Each store type needs to store this for lookup purposes.
"""
# @abc.abstractmethod
# def _persist_environment_details(
# self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
# ):
# """Persist the environment details.
#
# Each store type needs to store this for lookup purposes.
# """

@abc.abstractmethod
def _persist_destiny_backlinks(self, value: Value):
Expand All @@ -395,16 +395,16 @@ def store_value(self, value: Value) -> PersistedData:
value_hash=value.value_hash,
)

# first, persist environment information
for env_type, env_hash in value.pedigree.environments.items():
cached = self._env_cache.get(env_type, {}).get(env_hash, None)
if cached is not None:
continue

env = self.kiara_context.environment_registry.get_environment_for_cid(
env_hash
)
self.persist_environment(env)
# # first, persist environment information
# for env_type, env_hash in value.pedigree.environments.items():
# cached = self._env_cache.get(env_type, {}).get(env_hash, None)
# if cached is not None:
# continue
#
# env = self.kiara_context.environment_registry.get_environment_for_cid(
# env_hash
# )
# self.persist_environment(env)

# save the value data and metadata
persisted_value = self._persist_value(value)
Expand Down Expand Up @@ -544,25 +544,25 @@ def _persist_value(self, value: Value) -> PersistedData:

return persisted_value_info

def persist_environment(self, environment: RuntimeEnvironment):
"""
Persist the specified environment.
The environment is stored as a dictionary, including it's schema, not as the actual Python model.
This is to make sure it can still be loaded later on, in case the Python model has changed in later versions.
"""
env_type = environment.get_environment_type_name()
env_hash = str(environment.instance_cid)

env = self._env_cache.get(env_type, {}).get(env_hash, None)
if env is not None:
return

env_data = environment.as_dict_with_schema()
self._persist_environment_details(
env_type=env_type, env_hash=env_hash, env_data=env_data
)
self._env_cache.setdefault(env_type, {})[env_hash] = env_data
# def persist_environment(self, environment: RuntimeEnvironment):
# """
# Persist the specified environment.
#
# The environment is stored as a dictionary, including it's schema, not as the actual Python model.
# This is to make sure it can still be loaded later on, in case the Python model has changed in later versions.
# """
# env_type = environment.get_environment_type_name()
# env_hash = str(environment.instance_cid)
#
# env = self._env_cache.get(env_type, {}).get(env_hash, None)
# if env is not None:
# return
#
# env_data = environment.as_dict_with_schema()
# self._persist_environment_details(
# env_type=env_type, env_hash=env_hash, env_data=env_data
# )
# self._env_cache.setdefault(env_type, {})[env_hash] = env_data

def create_renderable(self, **config: Any) -> RenderableType:
"""Create a renderable for this module configuration."""
Expand Down
18 changes: 9 additions & 9 deletions src/kiara/registries/data/data_store/filesystem_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,15 +411,15 @@ class FilesystemDataStore(FileSystemDataArchive, BaseDataStore):

_archive_type_name = "filesystem_data_store"

def _persist_environment_details(
self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
):

base_path = self.get_path(entity_type=EntityType.ENVIRONMENT)
env_details_file = base_path / f"{env_type}_{env_hash}.json"

if not env_details_file.exists():
env_details_file.write_text(orjson_dumps(env_data))
# def _persist_environment_details(
# self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
# ):
#
# base_path = self.get_path(entity_type=EntityType.ENVIRONMENT)
# env_details_file = base_path / f"{env_type}_{env_hash}.json"
#
# if not env_details_file.exists():
# env_details_file.write_text(orjson_dumps(env_data))

def _persist_stored_value_info(self, value: Value, persisted_value: PersistedData):

Expand Down
40 changes: 20 additions & 20 deletions src/kiara/registries/data/data_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,26 +552,26 @@ def _set_archive_metadata_value(self, key: str, value: Any):
conn.execute(sql, params)
conn.commit()

def _persist_environment_details(
self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
):

sql = text(
"INSERT OR IGNORE INTO environments (environment_type, environment_hash, environment_data) VALUES (:environment_type, :environment_hash, :environment_data)"
)
env_data_json = orjson_dumps(env_data)
with self.sqlite_engine.connect() as conn:
params = {
"environment_type": env_type,
"environment_hash": env_hash,
"environment_data": env_data_json,
}
conn.execute(sql, params)
conn.commit()
# print(env_type)
# print(env_hash)
# print(env_data_json)
# raise NotImplementedError()
# def _persist_environment_details(
# self, env_type: str, env_hash: str, env_data: Mapping[str, Any]
# ):
#
# sql = text(
# "INSERT OR IGNORE INTO environments (environment_type, environment_hash, environment_data) VALUES (:environment_type, :environment_hash, :environment_data)"
# )
# env_data_json = orjson_dumps(env_data)
# with self.sqlite_engine.connect() as conn:
# params = {
# "environment_type": env_type,
# "environment_hash": env_hash,
# "environment_data": env_data_json,
# }
# conn.execute(sql, params)
# conn.commit()
# # print(env_type)
# # print(env_hash)
# # print(env_data_json)
# # raise NotImplementedError()

# def _persist_value_data(self, value: Value) -> PersistedData:
#
Expand Down
1 change: 1 addition & 0 deletions src/kiara/registries/environment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def instance(cls) -> "EnvironmentRegistry":
def __init__(
self,
) -> None:

self._environments: Union[Dict[str, RuntimeEnvironment], None] = None
self._environment_hashes: Union[Dict[str, Mapping[str, str]], None] = None

Expand Down
46 changes: 30 additions & 16 deletions src/kiara/registries/metadata/metadata_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def __init__(
archive_config=archive_config,
force_read_only=force_read_only,
)
self._schema_stored_cache: Dict[str, Any] = {}
self._schema_stored_item: Dict[str, Any] = {}


def retrieve_metadata_item(
self,
Expand Down Expand Up @@ -79,7 +82,6 @@ def __init__(
archive_config=archive_config,
force_read_only=force_read_only,
)
self._schema_stored_cache: Dict[str, Any] = {}

@classmethod
def _is_writeable(cls) -> bool:
Expand All @@ -100,6 +102,11 @@ def store_metadata_item(
force: bool = False,
store: Union[str, uuid.UUID, None] = None,
) -> uuid.UUID:
"""Store a metadata item into the store.
If `reference_item_type` and `reference_item_id` are set, the stored metadata item will
be linked to the stored metadata item, to enable lokoups later on.
"""

if store:
raise NotImplementedError(
Expand All @@ -109,27 +116,34 @@ def store_metadata_item(
# TODO: check if already stored
model_type = item.model_type_id
model_schema_hash = str(item.get_schema_cid())
model_item_schema = item.model_json_schema()
model_item_schema_str = json.dumps(model_item_schema)

self._store_metadata_schema(
model_schema_hash=model_schema_hash,
model_type_id=model_type,
model_schema=model_item_schema_str,
)
if model_schema_hash not in self._schema_stored_cache.keys():

model_item_schema = item.model_json_schema()
model_item_schema_str = json.dumps(model_item_schema)

self._store_metadata_schema(
model_schema_hash=model_schema_hash,
model_type_id=model_type,
model_schema=model_item_schema_str,
)
self._schema_stored_cache[model_schema_hash] = model_item_schema

# data = item.model_dump()
data_json = item.model_dump_json()
data_hash = str(item.instance_cid)

metadata_item_id = self._store_metadata_item(
key=key,
value_json=data_json,
value_hash=data_hash,
model_type_id=model_type,
model_schema_hash=model_schema_hash,
force=force,
)
if data_hash not in self._schema_stored_item.keys():

metadata_item_id = self._store_metadata_item(
key=key,
value_json=data_json,
value_hash=data_hash,
model_type_id=model_type,
model_schema_hash=model_schema_hash,
force=force,
)
self._schema_stored_item[data_hash] = metadata_item_id

if (reference_item_id and not reference_item_type) or (
reference_item_type and not reference_item_id
Expand Down
5 changes: 3 additions & 2 deletions src/kiara/registries/metadata/metadata_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def sqlite_engine(self) -> "Engine":
model_type_id TEXT NOT NULL,
model_schema_hash TEXT NOT NULL,
metadata_value TEXT NOT NULL,
FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash)
FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash),
UNIQUE (metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash)
);
CREATE TABLE IF NOT EXISTS metadata_references (
reference_item_type TEXT NOT NULL,
Expand Down Expand Up @@ -247,7 +248,7 @@ def _store_metadata_item(
)
else:
sql = text(
"INSERT INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)"
"INSERT OR IGNORE INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)"
)

metadata_item_id = ID_REGISTRY.generate(comment="new metadata item id")
Expand Down

0 comments on commit 7aabc08

Please sign in to comment.