From 7aabc08757e628687535dc3ee53f1f7b315b20fd Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Wed, 20 Mar 2024 15:34:37 +0100 Subject: [PATCH] refactor: move environment storage to new metadata store --- src/kiara/context/__init__.py | 1 + src/kiara/models/values/value.py | 8 +- src/kiara/registries/data/__init__.py | 59 ++++++++++++++- .../registries/data/data_store/__init__.py | 74 +++++++++---------- .../data/data_store/filesystem_store.py | 18 ++--- .../data/data_store/sqlite_store.py | 40 +++++----- src/kiara/registries/environment/__init__.py | 1 + .../metadata/metadata_store/__init__.py | 46 ++++++++---- .../metadata/metadata_store/sqlite_store.py | 5 +- 9 files changed, 163 insertions(+), 89 deletions(-) diff --git a/src/kiara/context/__init__.py b/src/kiara/context/__init__.py index 53eaa3038..dfe2a5ce1 100644 --- a/src/kiara/context/__init__.py +++ b/src/kiara/context/__init__.py @@ -264,6 +264,7 @@ def context_info(self) -> "KiaraContextInfo": @property def environment_registry(self) -> EnvironmentRegistry: + return self._env_mgmt @property diff --git a/src/kiara/models/values/value.py b/src/kiara/models/values/value.py index c349a501d..01fa5fd5e 100644 --- a/src/kiara/models/values/value.py +++ b/src/kiara/models/values/value.py @@ -780,10 +780,10 @@ class Value(ValueDetails): environment_hashes: Mapping[str, Mapping[str, str]] = Field( description="Hashes for the environments this value was created in." ) - enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field( - description="Information about the environments this value was created in.", - default=None, - ) + # enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field( + # description="Information about the environments this value was created in.", + # default=None, + # ) property_links: Mapping[str, uuid.UUID] = Field( description="Links to values that are properties of this value.", default_factory=dict, diff --git a/src/kiara/registries/data/__init__.py b/src/kiara/registries/data/__init__.py index 62456592f..27c32cfb5 100644 --- a/src/kiara/registries/data/__init__.py +++ b/src/kiara/registries/data/__init__.py @@ -259,6 +259,7 @@ def __init__(self, kiara: "Kiara"): self._cached_data[NOT_SET_VALUE_ID] = SpecialValue.NOT_SET self._registered_values[NOT_SET_VALUE_ID] = self._not_set_value self._persisted_value_descs[NOT_SET_VALUE_ID] = NONE_PERSISTED_DATA + self._env_cache: Dict[str, Dict[str, Mapping[str, Any]]] = {} self._none_value: Value = Value( value_id=NONE_VALUE_ID, @@ -500,6 +501,33 @@ def get_value(self, value: Union[uuid.UUID, ValueLink, str, Path]) -> Value: self._registered_values[_value_id] = stored_value return self._registered_values[_value_id] + def _persist_environment(self, env_type: str, env_hash: str): + + cached = self._env_cache.get(env_type, {}).get(env_hash, None) + if cached is not None: + return + + environment = self._kiara.environment_registry.get_environment_for_cid( + env_hash + ) + # env_type = environment.get_environment_type_name() + # env_hash = str(environment.instance_cid) + # + # env = self._env_cache.get(env_type, {}).get(env_hash, None) + # if env is not None: + # return + + env_data = environment.as_dict_with_schema() + ENVIRONMENT_MARKER_KEY = "environment" + self._kiara.metadata_registry.register_metadata_item(key = ENVIRONMENT_MARKER_KEY, item=environment) + # self._persist_environment_details( + # env_type=env_type, env_hash=env_hash, env_data=env_data + # ) + self._env_cache.setdefault(env_type, {})[env_hash] = env_data + + + + def store_value( self, value: Union[ValueLink, uuid.UUID, str], @@ -510,10 +538,39 @@ def store_value( If 'data_store' is not provided, the default data store is used. If the 'data_store' argument is of type uuid, the archive_id is used, if string, first it will be converted to an uuid, if that works, again, the archive_id is used, if not, the string is used as the archive alias. - """ + """ + _value = self.get_value(value) + # first, persist environment information + for env_type, env_hash in _value.pedigree.environments.items(): + + self._persist_environment(env_type, env_hash, _value) + # cached = self._env_cache.get(env_type, {}).get(env_hash, None) + # if cached is not None: + # continue + # + # environment = self.kiara_context.environment_registry.get_environment_for_cid( + # env_hash + # ) + # env_type = environment.get_environment_type_name() + # env_hash = str(environment.instance_cid) + # + # env = self._env_cache.get(env_type, {}).get(env_hash, None) + # if env is not None: + # return + # + # env_data = environment.as_dict_with_schema() + # self._persist_environment_details( + # env_type=env_type, env_hash=env_hash, env_data=env_data + # ) + # self._env_cache.setdefault(env_type, {})[env_hash] = env_data + # + # self.persist_environment(env) + + + store: DataStore = self.get_archive(archive_id_or_alias=data_store) # type: ignore if not store.is_writeable(): if data_store: diff --git a/src/kiara/registries/data/data_store/__init__.py b/src/kiara/registries/data/data_store/__init__.py index 52fe8280c..4d2851a59 100644 --- a/src/kiara/registries/data/data_store/__init__.py +++ b/src/kiara/registries/data/data_store/__init__.py @@ -373,14 +373,14 @@ def _persist_value_pedigree(self, value: Value): to the job that produced it is preserved. """ - @abc.abstractmethod - def _persist_environment_details( - self, env_type: str, env_hash: str, env_data: Mapping[str, Any] - ): - """Persist the environment details. - - Each store type needs to store this for lookup purposes. - """ + # @abc.abstractmethod + # def _persist_environment_details( + # self, env_type: str, env_hash: str, env_data: Mapping[str, Any] + # ): + # """Persist the environment details. + # + # Each store type needs to store this for lookup purposes. + # """ @abc.abstractmethod def _persist_destiny_backlinks(self, value: Value): @@ -395,16 +395,16 @@ def store_value(self, value: Value) -> PersistedData: value_hash=value.value_hash, ) - # first, persist environment information - for env_type, env_hash in value.pedigree.environments.items(): - cached = self._env_cache.get(env_type, {}).get(env_hash, None) - if cached is not None: - continue - - env = self.kiara_context.environment_registry.get_environment_for_cid( - env_hash - ) - self.persist_environment(env) + # # first, persist environment information + # for env_type, env_hash in value.pedigree.environments.items(): + # cached = self._env_cache.get(env_type, {}).get(env_hash, None) + # if cached is not None: + # continue + # + # env = self.kiara_context.environment_registry.get_environment_for_cid( + # env_hash + # ) + # self.persist_environment(env) # save the value data and metadata persisted_value = self._persist_value(value) @@ -544,25 +544,25 @@ def _persist_value(self, value: Value) -> PersistedData: return persisted_value_info - def persist_environment(self, environment: RuntimeEnvironment): - """ - Persist the specified environment. - - The environment is stored as a dictionary, including it's schema, not as the actual Python model. - This is to make sure it can still be loaded later on, in case the Python model has changed in later versions. - """ - env_type = environment.get_environment_type_name() - env_hash = str(environment.instance_cid) - - env = self._env_cache.get(env_type, {}).get(env_hash, None) - if env is not None: - return - - env_data = environment.as_dict_with_schema() - self._persist_environment_details( - env_type=env_type, env_hash=env_hash, env_data=env_data - ) - self._env_cache.setdefault(env_type, {})[env_hash] = env_data + # def persist_environment(self, environment: RuntimeEnvironment): + # """ + # Persist the specified environment. + # + # The environment is stored as a dictionary, including it's schema, not as the actual Python model. + # This is to make sure it can still be loaded later on, in case the Python model has changed in later versions. + # """ + # env_type = environment.get_environment_type_name() + # env_hash = str(environment.instance_cid) + # + # env = self._env_cache.get(env_type, {}).get(env_hash, None) + # if env is not None: + # return + # + # env_data = environment.as_dict_with_schema() + # self._persist_environment_details( + # env_type=env_type, env_hash=env_hash, env_data=env_data + # ) + # self._env_cache.setdefault(env_type, {})[env_hash] = env_data def create_renderable(self, **config: Any) -> RenderableType: """Create a renderable for this module configuration.""" diff --git a/src/kiara/registries/data/data_store/filesystem_store.py b/src/kiara/registries/data/data_store/filesystem_store.py index f7c0fd28a..9acba8a6a 100644 --- a/src/kiara/registries/data/data_store/filesystem_store.py +++ b/src/kiara/registries/data/data_store/filesystem_store.py @@ -411,15 +411,15 @@ class FilesystemDataStore(FileSystemDataArchive, BaseDataStore): _archive_type_name = "filesystem_data_store" - def _persist_environment_details( - self, env_type: str, env_hash: str, env_data: Mapping[str, Any] - ): - - base_path = self.get_path(entity_type=EntityType.ENVIRONMENT) - env_details_file = base_path / f"{env_type}_{env_hash}.json" - - if not env_details_file.exists(): - env_details_file.write_text(orjson_dumps(env_data)) + # def _persist_environment_details( + # self, env_type: str, env_hash: str, env_data: Mapping[str, Any] + # ): + # + # base_path = self.get_path(entity_type=EntityType.ENVIRONMENT) + # env_details_file = base_path / f"{env_type}_{env_hash}.json" + # + # if not env_details_file.exists(): + # env_details_file.write_text(orjson_dumps(env_data)) def _persist_stored_value_info(self, value: Value, persisted_value: PersistedData): diff --git a/src/kiara/registries/data/data_store/sqlite_store.py b/src/kiara/registries/data/data_store/sqlite_store.py index edc4c37bd..2012ffe5f 100644 --- a/src/kiara/registries/data/data_store/sqlite_store.py +++ b/src/kiara/registries/data/data_store/sqlite_store.py @@ -552,26 +552,26 @@ def _set_archive_metadata_value(self, key: str, value: Any): conn.execute(sql, params) conn.commit() - def _persist_environment_details( - self, env_type: str, env_hash: str, env_data: Mapping[str, Any] - ): - - sql = text( - "INSERT OR IGNORE INTO environments (environment_type, environment_hash, environment_data) VALUES (:environment_type, :environment_hash, :environment_data)" - ) - env_data_json = orjson_dumps(env_data) - with self.sqlite_engine.connect() as conn: - params = { - "environment_type": env_type, - "environment_hash": env_hash, - "environment_data": env_data_json, - } - conn.execute(sql, params) - conn.commit() - # print(env_type) - # print(env_hash) - # print(env_data_json) - # raise NotImplementedError() + # def _persist_environment_details( + # self, env_type: str, env_hash: str, env_data: Mapping[str, Any] + # ): + # + # sql = text( + # "INSERT OR IGNORE INTO environments (environment_type, environment_hash, environment_data) VALUES (:environment_type, :environment_hash, :environment_data)" + # ) + # env_data_json = orjson_dumps(env_data) + # with self.sqlite_engine.connect() as conn: + # params = { + # "environment_type": env_type, + # "environment_hash": env_hash, + # "environment_data": env_data_json, + # } + # conn.execute(sql, params) + # conn.commit() + # # print(env_type) + # # print(env_hash) + # # print(env_data_json) + # # raise NotImplementedError() # def _persist_value_data(self, value: Value) -> PersistedData: # diff --git a/src/kiara/registries/environment/__init__.py b/src/kiara/registries/environment/__init__.py index d3ce62f86..d709435b3 100644 --- a/src/kiara/registries/environment/__init__.py +++ b/src/kiara/registries/environment/__init__.py @@ -31,6 +31,7 @@ def instance(cls) -> "EnvironmentRegistry": def __init__( self, ) -> None: + self._environments: Union[Dict[str, RuntimeEnvironment], None] = None self._environment_hashes: Union[Dict[str, Mapping[str, str]], None] = None diff --git a/src/kiara/registries/metadata/metadata_store/__init__.py b/src/kiara/registries/metadata/metadata_store/__init__.py index 6f32bf233..e949413d6 100644 --- a/src/kiara/registries/metadata/metadata_store/__init__.py +++ b/src/kiara/registries/metadata/metadata_store/__init__.py @@ -30,6 +30,9 @@ def __init__( archive_config=archive_config, force_read_only=force_read_only, ) + self._schema_stored_cache: Dict[str, Any] = {} + self._schema_stored_item: Dict[str, Any] = {} + def retrieve_metadata_item( self, @@ -79,7 +82,6 @@ def __init__( archive_config=archive_config, force_read_only=force_read_only, ) - self._schema_stored_cache: Dict[str, Any] = {} @classmethod def _is_writeable(cls) -> bool: @@ -100,6 +102,11 @@ def store_metadata_item( force: bool = False, store: Union[str, uuid.UUID, None] = None, ) -> uuid.UUID: + """Store a metadata item into the store. + + If `reference_item_type` and `reference_item_id` are set, the stored metadata item will + be linked to the stored metadata item, to enable lokoups later on. + """ if store: raise NotImplementedError( @@ -109,27 +116,34 @@ def store_metadata_item( # TODO: check if already stored model_type = item.model_type_id model_schema_hash = str(item.get_schema_cid()) - model_item_schema = item.model_json_schema() - model_item_schema_str = json.dumps(model_item_schema) - self._store_metadata_schema( - model_schema_hash=model_schema_hash, - model_type_id=model_type, - model_schema=model_item_schema_str, - ) + if model_schema_hash not in self._schema_stored_cache.keys(): + + model_item_schema = item.model_json_schema() + model_item_schema_str = json.dumps(model_item_schema) + + self._store_metadata_schema( + model_schema_hash=model_schema_hash, + model_type_id=model_type, + model_schema=model_item_schema_str, + ) + self._schema_stored_cache[model_schema_hash] = model_item_schema # data = item.model_dump() data_json = item.model_dump_json() data_hash = str(item.instance_cid) - metadata_item_id = self._store_metadata_item( - key=key, - value_json=data_json, - value_hash=data_hash, - model_type_id=model_type, - model_schema_hash=model_schema_hash, - force=force, - ) + if data_hash not in self._schema_stored_item.keys(): + + metadata_item_id = self._store_metadata_item( + key=key, + value_json=data_json, + value_hash=data_hash, + model_type_id=model_type, + model_schema_hash=model_schema_hash, + force=force, + ) + self._schema_stored_item[data_hash] = metadata_item_id if (reference_item_id and not reference_item_type) or ( reference_item_type and not reference_item_id diff --git a/src/kiara/registries/metadata/metadata_store/sqlite_store.py b/src/kiara/registries/metadata/metadata_store/sqlite_store.py index 18ffa389d..f504bcfcf 100644 --- a/src/kiara/registries/metadata/metadata_store/sqlite_store.py +++ b/src/kiara/registries/metadata/metadata_store/sqlite_store.py @@ -123,7 +123,8 @@ def sqlite_engine(self) -> "Engine": model_type_id TEXT NOT NULL, model_schema_hash TEXT NOT NULL, metadata_value TEXT NOT NULL, - FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash) + FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash), + UNIQUE (metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash) ); CREATE TABLE IF NOT EXISTS metadata_references ( reference_item_type TEXT NOT NULL, @@ -247,7 +248,7 @@ def _store_metadata_item( ) else: sql = text( - "INSERT INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)" + "INSERT OR IGNORE INTO metadata (metadata_item_id, metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash, metadata_value) VALUES (:metadata_item_id, :metadata_item_key, :metadata_item_hash, :model_type_id, :model_schema_hash, :metadata_value)" ) metadata_item_id = ID_REGISTRY.generate(comment="new metadata item id")