From cc8b557642e914c7e94ff6fbfe121a4472e5e5e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cristina=20E=2E=20Gonz=C3=A1lez-Espinoza?= Date: Tue, 12 Dec 2023 09:14:35 +0100 Subject: [PATCH 1/3] Add web service --- .../DictionaryMapping/NeuronMorphology.hjson | 1 - kgforge/core/archetypes/dataset_store.py | 5 +- kgforge/core/archetypes/read_only_store.py | 104 +++-------- kgforge/core/archetypes/store.py | 7 +- kgforge/core/commons/parser.py | 17 ++ kgforge/core/commons/sparql_query_builder.py | 19 +- kgforge/specializations/stores/__init__.py | 2 + .../specializations/stores/sparql_store.py | 28 +-- .../stores/web_service/__init__.py | 0 .../stores/web_service/webservice.py | 102 ++++++++++ .../stores/web_service_store.py | 175 ++++++++++++++++++ .../stores/test_web_service.py | 71 +++++++ 12 files changed, 406 insertions(+), 125 deletions(-) create mode 100644 kgforge/specializations/stores/web_service/__init__.py create mode 100644 kgforge/specializations/stores/web_service/webservice.py create mode 100644 kgforge/specializations/stores/web_service_store.py create mode 100644 tests/specializations/stores/test_web_service.py diff --git a/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson b/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson index 6ff77840..2d191837 100644 --- a/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson +++ b/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson @@ -4,7 +4,6 @@ NeuronMorphology Entity ] - id: forge.format('identifier', 'neuronmorphologies/neuromorpho', f'{x.bbpID}') brainLocation: { type: BrainLocation brainRegion: forge.resolve(x.brain_region[0], scope='ontology', strategy='EXACT_CASE_INSENSITIVE_MATCH') diff --git a/kgforge/core/archetypes/dataset_store.py b/kgforge/core/archetypes/dataset_store.py index 4896e0f9..931cd123 100644 --- a/kgforge/core/archetypes/dataset_store.py +++ b/kgforge/core/archetypes/dataset_store.py @@ -77,13 +77,14 @@ def types(self) -> Optional[List[str]]: return list(self.model.mappings(self.model.source, False).keys()) def search( - self, filters: List[Union[Dict, Filter]], resolvers: Optional[List[Resolver]] = None, + self, resolvers: Optional[List[Resolver]], + filters: List[Union[Dict, Filter]], **params ) -> Optional[List[Resource]]: """Search within the database. :param map: bool """ - unmapped_resources = self._search(filters, resolvers, **params) + unmapped_resources = self._search(resolvers, filters, **params) if not params.pop('map', True): return unmapped_resources diff --git a/kgforge/core/archetypes/read_only_store.py b/kgforge/core/archetypes/read_only_store.py index a16c70f6..1ee272ed 100644 --- a/kgforge/core/archetypes/read_only_store.py +++ b/kgforge/core/archetypes/read_only_store.py @@ -11,12 +11,13 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . + import time from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union -from kgforge.core.resource import Resource +from kgforge.core import Resource from kgforge.core.archetypes.model import Model from kgforge.core.archetypes.resolver import Resolver from kgforge.core.commons.attributes import repr_class @@ -25,9 +26,7 @@ DownloadingError, ) from kgforge.core.commons.execution import not_supported -from kgforge.core.commons.sparql_query_builder import SPARQLQueryBuilder from kgforge.core.reshaping import collect_values -from kgforge.core.wrappings import Filter from kgforge.core.wrappings.dict import DictWrapper DEFAULT_LIMIT = 100 @@ -50,42 +49,29 @@ def __init__( model: Optional[Model] = None, ) -> None: self.model: Optional[Model] = model + self.model_context: Optional[Context] = ( + self.model.context() if hasattr(self.model, 'context') else None + ) def __repr__(self) -> str: return repr_class(self) - @staticmethod - def _context_to_dict(context: Context): - return { - k: v["@id"] if isinstance(v, Dict) and "@id" in v else v - for k, v in context.document["@context"].items() - } - - def get_context_prefix_vocab(self) -> Tuple[Optional[Dict], Optional[Dict], Optional[str]]: - return ( - ReadOnlyStore._context_to_dict(self.model_context().document), - self.model_context().prefixes, - self.model_context().vocab - ) - # C[R]UD. @abstractmethod def retrieve( - self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool = False, **params - ) -> Optional[Resource]: + self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool, **params + ) -> Resource: # POLICY Should notify of failures with exception RetrievalError including a message. # POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict(). # POLICY Resource _synchronized should be set to True. # TODO These two operations might be abstracted here when other stores will be implemented. - ... + pass - @abstractmethod def _retrieve_filename(self, id: str) -> Tuple[str, str]: # TODO This operation might be adapted if other file metadata are needed. - ... + not_supported() - @abstractmethod def _prepare_download_one( self, url: str, @@ -93,7 +79,7 @@ def _prepare_download_one( cross_bucket: bool ) -> Tuple[str, str]: # Prepare download url and download bucket - ... + not_supported() def download( self, @@ -164,7 +150,6 @@ def _download_many( for url, path, store_m, bucket in zip(urls, paths, store_metadata, buckets): self._download_one(url, path, store_m, cross_bucket, content_type, bucket) - @abstractmethod def _download_one( self, url: str, @@ -176,13 +161,15 @@ def _download_one( ) -> None: # path: FilePath. # POLICY Should notify of failures with exception DownloadingError including a message. - ... + not_supported() # Querying. @abstractmethod def search( - self, resolvers: Optional[List[Resolver]], filters: List[Union[Dict, Filter]], **params + self, resolvers: Optional[List[Resolver]] = None, + *filters, + **params ) -> List[Resource]: # Positional arguments in 'filters' are instances of type Filter from wrappings/paths.py @@ -205,53 +192,10 @@ def search( # TODO These two operations might be abstracted here when other stores will be implemented. ... - def sparql( - self, query: str, - debug: bool, - limit: int = DEFAULT_LIMIT, - offset: int = DEFAULT_OFFSET, - **params - ) -> List[Resource]: - rewrite = params.get("rewrite", True) - - if self.model_context() is not None and rewrite: - - context_as_dict, prefixes, vocab = self.get_context_prefix_vocab() - - qr = SPARQLQueryBuilder.rewrite_sparql( - query, - context_as_dict=context_as_dict, - prefixes=prefixes, - vocab=vocab - ) - else: - qr = query - - qr = SPARQLQueryBuilder.apply_limit_and_offset_to_query( - qr, - limit=limit, - offset=offset, - default_limit=DEFAULT_LIMIT, - default_offset=DEFAULT_OFFSET - ) - - if debug: - SPARQLQueryBuilder.debug_query(qr) - - return self._sparql(qr) - - @abstractmethod - def _sparql(self, query: str) -> Optional[Union[List[Resource], Resource]]: - # POLICY Should notify of failures with exception QueryingError including a message. - # POLICY Resource _store_metadata should not be set (default is None). - # POLICY Resource _synchronized should not be set (default is False). - ... - @abstractmethod - def elastic( - self, query: str, debug: bool, limit: int = None, - offset: int = None, **params - ) -> Optional[Union[List[Resource], Resource]]: + def sparql(self, query: str, debug: bool, limit: int = DEFAULT_LIMIT, + offset: int = DEFAULT_OFFSET, + **params) -> Optional[Union[List[Resource], Resource]]: ... # Versioning. @@ -268,14 +212,18 @@ def _initialize_service( # POLICY Should initialize the access to the store according to its configuration. ... - @abstractmethod + @staticmethod + def _debug_query(query) -> None: + if isinstance(query, Dict): + print("Submitted query:", query) + else: + print(*["Submitted query:", *query.splitlines()], sep="\n ") + print() + def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: """Rewrite a given uri using the store Context :param uri: a URI to rewrite. :param context: a Store Context object :return: str """ - ... - - def model_context(self): - return self.model.context() if self.model else None + not_supported() diff --git a/kgforge/core/archetypes/store.py b/kgforge/core/archetypes/store.py index 21bf9ed7..7484c5f3 100644 --- a/kgforge/core/archetypes/store.py +++ b/kgforge/core/archetypes/store.py @@ -14,13 +14,14 @@ import json from abc import abstractmethod from pathlib import Path -from typing import Any, Dict, List, Optional, Union, Type, Match +from typing import Any, Dict, List, Optional, Union, Type from kgforge.core.archetypes.read_only_store import ReadOnlyStore, DEFAULT_LIMIT, DEFAULT_OFFSET from kgforge.core.archetypes.model import Model from kgforge.core.commons import Context from kgforge.core.resource import Resource from kgforge.core.archetypes.mapping import Mapping +from kgforge.core.archetypes.resolver import Resolver from kgforge.core.archetypes.mapper import Mapper from kgforge.core.commons.attributes import repr_class from kgforge.core.commons.es_query_builder import ESQueryBuilder @@ -32,7 +33,7 @@ UpdatingError, UploadingError ) -from kgforge.core.commons.execution import not_supported, run +from kgforge.core.commons.execution import run class Store(ReadOnlyStore): @@ -238,6 +239,8 @@ def _deprecate_one(self, resource: Resource) -> None: # POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict(). ... + # Querying + def elastic( self, query: str, debug: bool, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET ) -> List[Resource]: diff --git a/kgforge/core/commons/parser.py b/kgforge/core/commons/parser.py index 0ab99d41..abd69e5a 100644 --- a/kgforge/core/commons/parser.py +++ b/kgforge/core/commons/parser.py @@ -14,6 +14,7 @@ from typing import Any import datetime +import json def _parse_type(value: Any, parse_str: bool = False): @@ -42,3 +43,19 @@ def _parse_type(value: Any, parse_str: bool = False): return _type, value except Exception: return _type, value + + +def _process_types(values): + """Assign correct data type to values from a request response""" + if values['type'] == 'literal' and 'datatype' in values and values['datatype'] == \ + 'http://www.w3.org/2001/XMLSchema#boolean': + + return json.loads(str(values["value"]).lower()) + + elif values['type'] == 'literal' and 'datatype' in values and values['datatype'] == \ + 'http://www.w3.org/2001/XMLSchema#integer': + + return int(values["value"]) + + else: + return values["value"] diff --git a/kgforge/core/commons/sparql_query_builder.py b/kgforge/core/commons/sparql_query_builder.py index b02b9f17..e8ea684b 100644 --- a/kgforge/core/commons/sparql_query_builder.py +++ b/kgforge/core/commons/sparql_query_builder.py @@ -28,7 +28,7 @@ from kgforge.core.archetypes.resolver import Resolver from kgforge.core.commons.context import Context from kgforge.core.commons.files import is_valid_url -from kgforge.core.commons.parser import _parse_type +from kgforge.core.commons.parser import _parse_type, _process_types from kgforge.core.commons.query_builder import QueryBuilder from kgforge.core.wrappings.paths import Filter @@ -232,23 +232,8 @@ def triples_to_resource(iri, triples): @staticmethod def build_resource_from_select_query(results: List) -> List[Resource]: - - def process_v(v): - if v['type'] == 'literal' and 'datatype' in v and v['datatype'] == \ - 'http://www.w3.org/2001/XMLSchema#boolean': - - return json.loads(str(v["value"]).lower()) - - elif v['type'] == 'literal' and 'datatype' in v and v['datatype'] == \ - 'http://www.w3.org/2001/XMLSchema#integer': - - return int(v["value"]) - - else: - return v["value"] - return [ - Resource(**{k: process_v(v) for k, v in x.items()}) + Resource(**{k: _process_types(v) for k, v in x.items()}) for x in results ] diff --git a/kgforge/specializations/stores/__init__.py b/kgforge/specializations/stores/__init__.py index ec3d912f..0cdf3755 100644 --- a/kgforge/specializations/stores/__init__.py +++ b/kgforge/specializations/stores/__init__.py @@ -14,3 +14,5 @@ from .bluebrain_nexus import BlueBrainNexus from .demo_store import DemoStore +from .sparql_store import SPARQLStore +from .web_service_store import WebServiceStore diff --git a/kgforge/specializations/stores/sparql_store.py b/kgforge/specializations/stores/sparql_store.py index 828908fd..df9822e6 100644 --- a/kgforge/specializations/stores/sparql_store.py +++ b/kgforge/specializations/stores/sparql_store.py @@ -51,28 +51,6 @@ def __init__(self, model: Optional[Model] = None, def mapper(self) -> Optional[Type[Mapper]]: return DictionaryMapper - def _download_one( - self, - url: str, - path: str, - store_metadata: Optional[DictWrapper], - cross_bucket: bool, - content_type: str, - bucket: str - ) -> None: - # path: FilePath. - # TODO define downloading method - # POLICY Should notify of failures with exception DownloadingError including a message. - raise not_supported() - - def retrieve( - self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool = False, **params - ) -> Optional[Resource]: - raise not_supported() - - def _retrieve_filename(self, id: str) -> str: - raise not_supported() - def _search( self, filters: List[Union[Dict, Filter]], resolvers: Optional[List[Resolver]] = None, **params @@ -130,7 +108,7 @@ def _search( ) query_statements, query_filters = SPARQLQueryBuilder.build( - schema=None, resolvers=resolvers, context=self.model_context(), filters=filters + schema=None, resolvers=resolvers, context=self.model_context, filters=filters ) statements = ";\n ".join(query_statements) _filters = (".\n".join(query_filters) + '\n') if len(filters) > 0 else "" @@ -155,7 +133,7 @@ def _sparql(self, query: str) -> Optional[Union[List[Resource], Resource]]: data = response.json() - return SPARQLQueryBuilder.build_resource_from_response(query, data, self.model_context()) + return SPARQLQueryBuilder.build_resource_from_response(query, data, self.model_context) # Utils. @@ -180,7 +158,7 @@ def _initialize_service( raise ValueError(f"Store configuration error: {ve}") from ve return SPARQLService( - endpoint=endpoint, model_context=self.model_context(), + endpoint=endpoint, model_context=self.model_context, store_context=store_context, max_connection=max_connection, searchendpoints=searchendpoints, content_type=content_type, diff --git a/kgforge/specializations/stores/web_service/__init__.py b/kgforge/specializations/stores/web_service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kgforge/specializations/stores/web_service/webservice.py b/kgforge/specializations/stores/web_service/webservice.py new file mode 100644 index 00000000..cf175413 --- /dev/null +++ b/kgforge/specializations/stores/web_service/webservice.py @@ -0,0 +1,102 @@ +# +# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Blue Brain Nexus Forge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +# General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Blue Brain Nexus Forge. If not, see . +import copy +import requests +from typing import Dict, Optional + +from kgforge.core.resource import Resource +from kgforge.core.commons.parser import _process_types +from kgforge.core.commons.exceptions import ConfigurationError, QueryingError + + +class WebService: + + def __init__( + self, + endpoint: str, + content_type: str, + accept: str, + response_location : Optional[str] = None, + files_download: Optional[Dict] = None, + searchendpoints : Optional[Dict] = None, + health_endpoint: Optional[str] = None, + **params, + ): + """A Web service""" + self.endpoint = endpoint + self.context_cache: Dict = dict() + self.response_location = response_location + self.files_download = files_download + self.health_endpoint = health_endpoint + self.searchendpoints = searchendpoints + if self.searchendpoints: + for endpoint in self.searchendpoints: + if not 'endpoint' in self.searchendpoints[endpoint]: + raise ConfigurationError(f"Missing endpoint searchenpoints") + self.params = copy.deepcopy(params) + + self.headers = {"Content-Type": content_type, "Accept": accept} + + self.headers_download = { + "Content-Type": self.files_download["Content-Type"] + if self.files_download and "Content-Type" in self.files_download + else "text/plain", + "Accept": self.files_download["Accept"] + if self.files_download and "Accept" in self.files_download + else "text/plain", + } + self.max_connection = params.pop('max_connection', None) + + @staticmethod + def resources_from_request(url: str, + headers: Dict, + response_location: Dict, + **request_params): + """Perform a HTTP request + + :param headers: The headers to be passed to the request + :param response_loc: The nested location of the relevat metadata in the + response. Example: NeuroMorpho uses response["_embedded"]["neuronResources"] + which should be given as: response_loc = ["_embedded", "neuronResources"] + :param request_params: Any other parameter for the request + """ + try: + response = requests.get( + url, params=request_params, headers=headers, verify=False + ) + response.raise_for_status() + except Exception as e: + raise QueryingError(e) + else: + data = response.json() + if response_location: + # Get the resources directly from a location in the response + if isinstance(response_location, str): + results = data[response_location] + elif isinstance(response_location, (list, tuple)): + for inner in response_location: + data = data[inner] + results = data + return [Resource(**result) for result in results] + else: + # Standard response format + results = data["results"]["bindings"] + return WebService.build_resources_from_results(results) + + @staticmethod + def build_resources_from_results(results): + return [ + Resource(**{k: _process_types(v) for k, v in x.items()}) + for x in results + ] \ No newline at end of file diff --git a/kgforge/specializations/stores/web_service_store.py b/kgforge/specializations/stores/web_service_store.py new file mode 100644 index 00000000..adbc1fff --- /dev/null +++ b/kgforge/specializations/stores/web_service_store.py @@ -0,0 +1,175 @@ +# +# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Blue Brain Nexus Forge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +# General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Blue Brain Nexus Forge. If not, see . +import copy +import asyncio +import requests +from aiohttp import ClientSession +from requests.exceptions import SSLError +from typing import Dict, List, Optional, Union, Type, Callable + +from kgforge.core.resource import Resource +from kgforge.core.archetypes.model import Model +from kgforge.core.archetypes.mapper import Mapper +from kgforge.specializations.mappers.dictionaries import DictionaryMapper +from kgforge.core.archetypes.dataset_store import DatasetStore +from kgforge.core.archetypes.resolver import Resolver +from kgforge.core.commons.exceptions import ConfigurationError, DownloadingError + + +from kgforge.core.wrappings.paths import Filter +from kgforge.specializations.stores.web_service.webservice import WebService + + +class WebServiceStore(DatasetStore): + """A high-level class to retrieve and create Datasets from a Web Service.""" + + def __init__( + self, + model: Model, + endpoint: str, + content_type: str, + accept: str, + response_location : Optional[str] = None, + files_download: Optional[Dict] = None, + searchendpoints : Optional[Dict] = None, + health_endpoint: Optional[str] = None, + **params, + ): + super().__init__(model) + self.health_endpoint = health_endpoint + self.service = self._initialize_service(endpoint, content_type, accept, + response_location, files_download, + searchendpoints, **params) + + @property + def mapper(self) -> Optional[Type[Mapper]]: + return DictionaryMapper + + def _search(self, resolvers: Optional[List[Resolver]], + filters: List[Union[Dict, Filter]], + **params + ) -> Optional[List[Resource]]: + # resolvers are not used, just passed because of previous methods shapes + if not isinstance(filters, dict): + raise NotImplementedError('Currently only the use of a dictionary as a filter is implemented') + searchendpoint = params.pop('searchendpoint', None) + if searchendpoint: + if self.service.searchendpoints is None: + raise ConfigurationError("No searchendpoints were given " + "in the initial configuration.") + try: + endpoint = self.service.searchendpoints[searchendpoint]['endpoint'] + except KeyError: + raise ConfigurationError(f"The {searchendpoint} searchpoint was not given " \ + "in the initial configuration.") + else: + endpoint = self.service.endpoint + # Combine the two dictionaries + for flr in filters: + params.update(flr) + return self.service.resources_from_request(endpoint, self.service.headers, + self.service.response_location, **params) + + + + @catch + def download(self, urls: Union[str, List[str]], + paths: Union[str, List[str]], overwrite: bool = False) -> None: + # path: DirPath. + """Download files """ + if isinstance(urls, list): + # Check consistancy between urls and paths + if not isinstance(paths, list): + raise TypeError("Given multiple urls, paths should also be a list.") + if len(paths) != len(urls): + raise ValueError(f"Missmatch between urls ({len(urls)}) and paths ({len(paths)}), \ + they should be the same amount.") + self._download_many(urls, paths) + else: + self._download_one(urls, paths) + + def _download_many(self, urls: List[str], + paths: List[str]) -> None: + async def _bulk(): + loop = asyncio.get_event_loop() + semaphore = asyncio.Semaphore(self.service.max_connection) + async with ClientSession(headers=self.service.headers_download) as session: + tasks = ( + _create_task(x, y, loop, semaphore, session) + for x, y in zip(urls, paths) + ) + return await asyncio.gather(*tasks) + + def _create_task(url, path, loop, semaphore, session): + return loop.create_task( + _download(url, path, semaphore, session) + ) + + async def _download(url, path, semaphore, session): + async with semaphore: + params_download = copy.deepcopy(self.service.params.get('download', {})) + async with session.get(url, params=params_download) as response: + try: + response.raise_for_status() + except Exception as e: + raise DownloadingError( + f"Downloading:{_error_message(e)}" + ) + else: + with open(path, "wb") as f: + data = await response.read() + f.write(data) + + return asyncio.run(_bulk()) + + def _download_one(self, url: str, path: str) -> None: + try: + params_download = copy.deepcopy(self.service.params.get('download', {})) + response = requests.get( + url=url, + headers=self.service.headers_download, + params=params_download, + verify=False + ) + response.raise_for_status() + except Exception as e: + raise DownloadingError( + f"Downloading from failed :{_error_message(e)}" + ) + else: + with open(path, "wb") as f: + for chunk in response.iter_content(chunk_size=4096): + f.write(chunk) + + def health(self) -> Callable: + if self.health_endpoint: + try: + response = requests.get(self.health_endpoint) + except SSLError: + response = requests.get(self.health_endpoint, verify=False) + return response.json() + else: + raise ConfigurationError('Health information not reachable with given configuration. \ + Define health in configuration arguments or set _health.') + + def _initialize_service(endpoint: str, + content_type: str, + accept: str, + response_location : Optional[str] = None, + files_download: Optional[Dict] = None, + searchendpoints : Optional[Dict] = None, + **params + ) -> WebService: + return WebService(endpoint, content_type, accept, response_location, + files_download, searchendpoints, **params) \ No newline at end of file diff --git a/tests/specializations/stores/test_web_service.py b/tests/specializations/stores/test_web_service.py new file mode 100644 index 00000000..b3445ad7 --- /dev/null +++ b/tests/specializations/stores/test_web_service.py @@ -0,0 +1,71 @@ +# +# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Blue Brain Nexus Forge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +# General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Blue Brain Nexus Forge. If not, see . + +import pytest + +from utils import full_path_relative_to_root +from kgforge.specializations.models.rdf_model import RdfModel +from kgforge.specializations.stores.web_service_store import WebServiceStore + +ENDPOINT = "http://neuromorpho.org/api/neuron" + + +@pytest.fixture +def rdf_model(): + model = RdfModel( + origin='directory', + source=full_path_relative_to_root("examples/mappings/NeuroMorpho"), + context={ + 'iri': full_path_relative_to_root("examples/mappings/NeuroMorpho/jsonld_context.json") + } + ) + return model + + +# model: Model, +# endpoint: str, +# content_type: str, +# accept: str, +# response_location : Optional[str] = None, +# files_download: Optional[Dict] = None, +# searchendpoints : Optional[Dict] = None, +# health_endpoint: Optional[str] = None, + +def test_config_searchendpoints(rdf_model: RdfModel): + with pytest.raises(ValueError): + WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + content_type="application/json", + accept="*/*", + searchendpoints={"elastic": None} + ) + + +@pytest.fixture +def sparql_store(rdf_model: RdfModel): + return SPARQLStore( + model=rdf_model, + ) + + +def test_config(sparql_store: Any, rdf_model: RdfModel): + assert sparql_store.model == rdf_model + assert not sparql_store.endpoint + assert sparql_store.model.context() == rdf_model.context() + + +def test_search_params(sparql_store: Any): + with pytest.raises(ValueError): + sparql_store.search(resolvers=[None], filters=[None]) \ No newline at end of file From f6f853ecd0307538abe239dd0d36cfca8ced230b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cristina=20E=2E=20Gonz=C3=A1lez-Espinoza?= Date: Wed, 13 Dec 2023 14:25:21 +0100 Subject: [PATCH 2/3] More changes --- kgforge/core/commons/execution.py | 14 +++++- .../specializations/stores/bluebrain_nexus.py | 2 +- .../specializations/stores/nexus/service.py | 13 ++--- .../stores/web_service/webservice.py | 5 +- .../stores/web_service_store.py | 49 +++++++++---------- 5 files changed, 43 insertions(+), 40 deletions(-) diff --git a/kgforge/core/commons/execution.py b/kgforge/core/commons/execution.py index 74991f4f..d0eee24d 100644 --- a/kgforge/core/commons/execution.py +++ b/kgforge/core/commons/execution.py @@ -15,7 +15,7 @@ import inspect import traceback from functools import wraps -from typing import Any, Callable, List, Optional, Tuple, Union, Type +from typing import Any, Callable, List, Optional, Tuple, Union, Type, Dict import requests from kgforge.core.resource import Resource @@ -107,6 +107,18 @@ def catch_http_error( raise error_to_throw(error_message_formatter(e)) from e +def format_message(msg): + return "".join([msg[0].lower(), msg[1:-1], msg[-1] if msg[-1] != "." else ""]) + + +def error_message(error: Union[requests.HTTPError, Dict]) -> str: + try: + error_text = error.response.text() if isinstance(error, requests.HTTPError) else str(error) + return format_message(error_text) + except Exception: + return format_message(str(error)) + + def run(fun_one: Callable, fun_many: Optional[Callable], data: Union[Resource, List[Resource]], exception: Type[RunException], id_required: bool = False, required_synchronized: Optional[bool] = None, execute_actions: bool = False, diff --git a/kgforge/specializations/stores/bluebrain_nexus.py b/kgforge/specializations/stores/bluebrain_nexus.py index f68396d3..b9a9b2db 100644 --- a/kgforge/specializations/stores/bluebrain_nexus.py +++ b/kgforge/specializations/stores/bluebrain_nexus.py @@ -690,7 +690,7 @@ def search( **params ) -> List[Resource]: - if self.model_context() is None: + if self.model_context is None: raise ValueError("context model missing") debug = params.get("debug", False) diff --git a/kgforge/specializations/stores/nexus/service.py b/kgforge/specializations/stores/nexus/service.py index 76be51a0..b7e34921 100644 --- a/kgforge/specializations/stores/nexus/service.py +++ b/kgforge/specializations/stores/nexus/service.py @@ -37,6 +37,7 @@ LazyAction, ) from kgforge.core.commons.exceptions import ConfigurationError +from kgforge.core.commons.execution import error_message, format_message from kgforge.core.commons.context import Context from kgforge.core.conversions.rdf import ( _from_jsonld_one, @@ -597,10 +598,8 @@ def to_resource( def _error_message(error: Union[HTTPError, Dict]) -> str: - def format_message(msg): - return "".join([msg[0].lower(), msg[1:-1], msg[-1] if msg[-1] != "." else ""]) - try: + # Error from Nexus error_json = error.response.json() if isinstance(error, HTTPError) else error messages = [] reason = error_json.get("reason", None) @@ -612,9 +611,5 @@ def format_message(msg): messages = messages if reason or details else [str(error)] return ". ".join(messages) except Exception: - pass - try: - error_text = error.response.text() if isinstance(error, HTTPError) else str(error) - return format_message(error_text) - except Exception: - return format_message(str(error)) + # Return general HTTPError + return error_message(error) diff --git a/kgforge/specializations/stores/web_service/webservice.py b/kgforge/specializations/stores/web_service/webservice.py index cf175413..2f46a9e0 100644 --- a/kgforge/specializations/stores/web_service/webservice.py +++ b/kgforge/specializations/stores/web_service/webservice.py @@ -72,9 +72,8 @@ def resources_from_request(url: str, :param request_params: Any other parameter for the request """ try: - response = requests.get( - url, params=request_params, headers=headers, verify=False - ) + response = requests.get(url, params=request_params, + headers=headers, verify=False) response.raise_for_status() except Exception as e: raise QueryingError(e) diff --git a/kgforge/specializations/stores/web_service_store.py b/kgforge/specializations/stores/web_service_store.py index adbc1fff..5b700934 100644 --- a/kgforge/specializations/stores/web_service_store.py +++ b/kgforge/specializations/stores/web_service_store.py @@ -25,6 +25,7 @@ from kgforge.core.archetypes.dataset_store import DatasetStore from kgforge.core.archetypes.resolver import Resolver from kgforge.core.commons.exceptions import ConfigurationError, DownloadingError +from kgforge.core.commons.execution import catch_http_error, error_message from kgforge.core.wrappings.paths import Filter @@ -120,37 +121,33 @@ async def _download(url, path, semaphore, session): async with semaphore: params_download = copy.deepcopy(self.service.params.get('download', {})) async with session.get(url, params=params_download) as response: - try: - response.raise_for_status() - except Exception as e: - raise DownloadingError( - f"Downloading:{_error_message(e)}" + catch_http_error( + response, DownloadingError, + error_message_formatter=lambda e: + f"Downloading url {url} failed: {error_message(e)}" ) - else: - with open(path, "wb") as f: - data = await response.read() - f.write(data) + with open(path, "wb") as f: + data = await response.read() + f.write(data) return asyncio.run(_bulk()) def _download_one(self, url: str, path: str) -> None: - try: - params_download = copy.deepcopy(self.service.params.get('download', {})) - response = requests.get( - url=url, - headers=self.service.headers_download, - params=params_download, - verify=False - ) - response.raise_for_status() - except Exception as e: - raise DownloadingError( - f"Downloading from failed :{_error_message(e)}" - ) - else: - with open(path, "wb") as f: - for chunk in response.iter_content(chunk_size=4096): - f.write(chunk) + params_download = copy.deepcopy(self.service.params.get('download', {})) + response = requests.get( + url=url, + headers=self.service.headers_download, + params=params_download, + verify=False + ) + catch_http_error( + response, DownloadingError, + error_message_formatter=lambda e: f"Downloading failed: " + f"{error_message(e)}" + ) + with open(path, "wb") as f: + for chunk in response.iter_content(chunk_size=4096): + f.write(chunk) def health(self) -> Callable: if self.health_endpoint: From 47e8464ac0ad92e6c9e33b946ac4e1efa77c27dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cristina=20E=2E=20Gonz=C3=A1lez-Espinoza?= Date: Wed, 13 Dec 2023 17:20:18 +0100 Subject: [PATCH 3/3] Clean code and add tests --- kgforge/core/archetypes/dataset_store.py | 6 +- kgforge/core/archetypes/read_only_store.py | 105 +++++++++--- kgforge/core/archetypes/store.py | 2 +- .../specializations/stores/bluebrain_nexus.py | 3 +- kgforge/specializations/stores/demo_store.py | 4 +- .../specializations/stores/sparql_store.py | 43 +++-- .../stores/web_service/webservice.py | 46 +++--- .../stores/web_service_store.py | 154 +++++++++++------- tests/specializations/stores/test_sparql.py | 7 + .../stores/test_web_service.py | 83 +++++++--- 10 files changed, 310 insertions(+), 143 deletions(-) diff --git a/kgforge/core/archetypes/dataset_store.py b/kgforge/core/archetypes/dataset_store.py index 931cd123..88ac44c7 100644 --- a/kgforge/core/archetypes/dataset_store.py +++ b/kgforge/core/archetypes/dataset_store.py @@ -15,6 +15,7 @@ from abc import abstractmethod, ABC from typing import Optional, Union, List, Type, Dict +from kgforge.core.archetypes.model import Model from kgforge.core.resource import Resource from kgforge.core.archetypes.read_only_store import ReadOnlyStore from kgforge.core.archetypes.resolver import Resolver @@ -28,6 +29,9 @@ class DatasetStore(ReadOnlyStore): """A class to link to external databases, query and search directly on datasets. """ + def __init__(self, model: Optional[Model] = None) -> None: + super().__init__(model) + @property @abstractmethod def mapper(self) -> Type[Mapper]: @@ -106,7 +110,7 @@ def sparql( """Use SPARQL within the database. :param map: bool """ - unmapped_resources = super(ReadOnlyStore, self).sparql( + unmapped_resources = super().sparql( query, debug, limit, offset, **params ) diff --git a/kgforge/core/archetypes/read_only_store.py b/kgforge/core/archetypes/read_only_store.py index 1ee272ed..909c37c6 100644 --- a/kgforge/core/archetypes/read_only_store.py +++ b/kgforge/core/archetypes/read_only_store.py @@ -11,13 +11,12 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . - import time from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union -from kgforge.core import Resource +from kgforge.core.resource import Resource from kgforge.core.archetypes.model import Model from kgforge.core.archetypes.resolver import Resolver from kgforge.core.commons.attributes import repr_class @@ -26,7 +25,9 @@ DownloadingError, ) from kgforge.core.commons.execution import not_supported +from kgforge.core.commons.sparql_query_builder import SPARQLQueryBuilder from kgforge.core.reshaping import collect_values +from kgforge.core.wrappings import Filter from kgforge.core.wrappings.dict import DictWrapper DEFAULT_LIMIT = 100 @@ -49,29 +50,42 @@ def __init__( model: Optional[Model] = None, ) -> None: self.model: Optional[Model] = model - self.model_context: Optional[Context] = ( - self.model.context() if hasattr(self.model, 'context') else None - ) def __repr__(self) -> str: return repr_class(self) + @staticmethod + def _context_to_dict(context: Context): + return { + k: v["@id"] if isinstance(v, Dict) and "@id" in v else v + for k, v in context.document["@context"].items() + } + + def get_context_prefix_vocab(self) -> Tuple[Optional[Dict], Optional[Dict], Optional[str]]: + return ( + ReadOnlyStore._context_to_dict(self.model_context()), + self.model_context().prefixes, + self.model_context().vocab + ) + # C[R]UD. @abstractmethod def retrieve( - self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool, **params - ) -> Resource: + self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool = False, **params + ) -> Optional[Resource]: # POLICY Should notify of failures with exception RetrievalError including a message. # POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict(). # POLICY Resource _synchronized should be set to True. # TODO These two operations might be abstracted here when other stores will be implemented. - pass + ... + @abstractmethod def _retrieve_filename(self, id: str) -> Tuple[str, str]: # TODO This operation might be adapted if other file metadata are needed. - not_supported() + ... + @abstractmethod def _prepare_download_one( self, url: str, @@ -79,7 +93,7 @@ def _prepare_download_one( cross_bucket: bool ) -> Tuple[str, str]: # Prepare download url and download bucket - not_supported() + ... def download( self, @@ -150,6 +164,7 @@ def _download_many( for url, path, store_m, bucket in zip(urls, paths, store_metadata, buckets): self._download_one(url, path, store_m, cross_bucket, content_type, bucket) + @abstractmethod def _download_one( self, url: str, @@ -161,15 +176,13 @@ def _download_one( ) -> None: # path: FilePath. # POLICY Should notify of failures with exception DownloadingError including a message. - not_supported() + ... # Querying. @abstractmethod def search( - self, resolvers: Optional[List[Resolver]] = None, - *filters, - **params + self, resolvers: Optional[List[Resolver]], filters: List[Union[Dict, Filter]], **params ) -> List[Resource]: # Positional arguments in 'filters' are instances of type Filter from wrappings/paths.py @@ -192,10 +205,53 @@ def search( # TODO These two operations might be abstracted here when other stores will be implemented. ... + def sparql( + self, query: str, + debug: bool, + limit: int = DEFAULT_LIMIT, + offset: int = DEFAULT_OFFSET, + **params + ) -> List[Resource]: + rewrite = params.get("rewrite", True) + + if self.model_context() is not None and rewrite: + + context_as_dict, prefixes, vocab = self.get_context_prefix_vocab() + + qr = SPARQLQueryBuilder.rewrite_sparql( + query, + context_as_dict=context_as_dict, + prefixes=prefixes, + vocab=vocab + ) + else: + qr = query + + qr = SPARQLQueryBuilder.apply_limit_and_offset_to_query( + qr, + limit=limit, + offset=offset, + default_limit=DEFAULT_LIMIT, + default_offset=DEFAULT_OFFSET + ) + + if debug: + SPARQLQueryBuilder.debug_query(qr) + + return self._sparql(qr) + + @abstractmethod + def _sparql(self, query: str) -> Optional[Union[List[Resource], Resource]]: + # POLICY Should notify of failures with exception QueryingError including a message. + # POLICY Resource _store_metadata should not be set (default is None). + # POLICY Resource _synchronized should not be set (default is False). + ... + @abstractmethod - def sparql(self, query: str, debug: bool, limit: int = DEFAULT_LIMIT, - offset: int = DEFAULT_OFFSET, - **params) -> Optional[Union[List[Resource], Resource]]: + def elastic( + self, query: str, debug: bool, limit: int = None, + offset: int = None, **params + ) -> Optional[Union[List[Resource], Resource]]: ... # Versioning. @@ -204,7 +260,6 @@ def sparql(self, query: str, debug: bool, limit: int = DEFAULT_LIMIT, def _initialize_service( self, endpoint: Optional[str], - bucket: Optional[str], token: Optional[str], searchendpoints: Optional[Dict] = None, **store_config, @@ -212,18 +267,14 @@ def _initialize_service( # POLICY Should initialize the access to the store according to its configuration. ... - @staticmethod - def _debug_query(query) -> None: - if isinstance(query, Dict): - print("Submitted query:", query) - else: - print(*["Submitted query:", *query.splitlines()], sep="\n ") - print() - + @abstractmethod def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: """Rewrite a given uri using the store Context :param uri: a URI to rewrite. :param context: a Store Context object :return: str """ - not_supported() + ... + + def model_context(self): + return self.model.context() if self.model else None diff --git a/kgforge/core/archetypes/store.py b/kgforge/core/archetypes/store.py index 7484c5f3..ba52925d 100644 --- a/kgforge/core/archetypes/store.py +++ b/kgforge/core/archetypes/store.py @@ -67,7 +67,7 @@ def __init__( if file_resource_mapping else None self.service: Any = self._initialize_service( - self.endpoint, self.bucket, self.token, searchendpoints, **store_config + self.endpoint, self.token, searchendpoints, **store_config ) def __repr__(self) -> str: diff --git a/kgforge/specializations/stores/bluebrain_nexus.py b/kgforge/specializations/stores/bluebrain_nexus.py index b9a9b2db..d1bddd05 100644 --- a/kgforge/specializations/stores/bluebrain_nexus.py +++ b/kgforge/specializations/stores/bluebrain_nexus.py @@ -690,7 +690,7 @@ def search( **params ) -> List[Resource]: - if self.model_context is None: + if self.model_context() is None: raise ValueError("context model missing") debug = params.get("debug", False) @@ -929,7 +929,6 @@ def _elastic(self, query: str) -> List[Resource]: def _initialize_service( self, endpoint: Optional[str], - bucket: Optional[str], token: Optional[str], searchendpoints: Optional[Dict] = None, **store_config, diff --git a/kgforge/specializations/stores/demo_store.py b/kgforge/specializations/stores/demo_store.py index d2a9805d..5daad16b 100644 --- a/kgforge/specializations/stores/demo_store.py +++ b/kgforge/specializations/stores/demo_store.py @@ -165,8 +165,8 @@ def _elastic(self, query: str) -> Optional[Union[List[Resource], Resource]]: # Utils. def _initialize_service( - self, endpoint: Optional[str], bucket: Optional[str], - token: Optional[str], searchendpoints: Optional[Dict] = None, **store_config, + self, endpoint: Optional[str], token: Optional[str], + searchendpoints: Optional[Dict] = None, **store_config, ): return StoreLibrary() diff --git a/kgforge/specializations/stores/sparql_store.py b/kgforge/specializations/stores/sparql_store.py index df9822e6..ae30de4e 100644 --- a/kgforge/specializations/stores/sparql_store.py +++ b/kgforge/specializations/stores/sparql_store.py @@ -51,6 +51,17 @@ def __init__(self, model: Optional[Model] = None, def mapper(self) -> Optional[Type[Mapper]]: return DictionaryMapper + def _download_one( + self, + url: str, + path: str, + store_metadata: Optional[DictWrapper], + cross_bucket: bool, + content_type: str, + bucket: str + ) -> None: + raise not_supported() + def _search( self, filters: List[Union[Dict, Filter]], resolvers: Optional[List[Resolver]] = None, **params @@ -135,6 +146,26 @@ def _sparql(self, query: str) -> Optional[Union[List[Resource], Resource]]: return SPARQLQueryBuilder.build_resource_from_response(query, data, self.model_context) + def elastic( + self, query: str, debug: bool, limit: int = None, offset: int = None, **params + ) -> Optional[Union[List[Resource], Resource]]: + raise not_supported() + + def _prepare_download_one(self, url: str, store_metadata: Optional[DictWrapper], + cross_bucket: bool) -> Tuple[str, str]: + raise not_supported() + + def retrieve( + self, id: str, version: Optional[Union[int, str]], cross_bucket: bool, **params + ) -> Resource: + not_supported() + + def _retrieve_filename(self, id: str) -> str: + not_supported() + + def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: + raise not_supported() + # Utils. def _initialize_service( @@ -164,15 +195,3 @@ def _initialize_service( content_type=content_type, accept=accept, **params ) - - def elastic( - self, query: str, debug: bool, limit: int = None, offset: int = None, **params - ) -> Optional[Union[List[Resource], Resource]]: - raise not_supported() - - def _prepare_download_one(self, url: str, store_metadata: Optional[DictWrapper], - cross_bucket: bool) -> Tuple[str, str]: - raise not_supported() - - def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: - raise not_supported() diff --git a/kgforge/specializations/stores/web_service/webservice.py b/kgforge/specializations/stores/web_service/webservice.py index 2f46a9e0..25a591c6 100644 --- a/kgforge/specializations/stores/web_service/webservice.py +++ b/kgforge/specializations/stores/web_service/webservice.py @@ -11,9 +11,9 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . +from typing import Dict, Optional import copy import requests -from typing import Dict, Optional from kgforge.core.resource import Resource from kgforge.core.commons.parser import _process_types @@ -26,38 +26,46 @@ def __init__( self, endpoint: str, content_type: str, - accept: str, - response_location : Optional[str] = None, + accept: str = "*/*", + response_location: Optional[str] = None, files_download: Optional[Dict] = None, - searchendpoints : Optional[Dict] = None, - health_endpoint: Optional[str] = None, + searchendpoints: Optional[Dict] = None, **params, ): """A Web service""" self.endpoint = endpoint - self.context_cache: Dict = dict() + self.content_type = content_type + self.accept = accept + self.context_cache: Dict = [] self.response_location = response_location self.files_download = files_download - self.health_endpoint = health_endpoint self.searchendpoints = searchendpoints if self.searchendpoints: + if not isinstance(self.searchendpoints, dict): + raise ConfigurationError("searchendpoints must be a dict") for endpoint in self.searchendpoints: - if not 'endpoint' in self.searchendpoints[endpoint]: - raise ConfigurationError(f"Missing endpoint searchenpoints") + if not isinstance(endpoint, dict): + raise ConfigurationError("endpoint configuration must be a dict") + if 'endpoint' not in self.searchendpoints[endpoint]: + raise ConfigurationError("Missing endpoint searchenpoints") + self.max_connection = params.pop('max_connection', None) self.params = copy.deepcopy(params) self.headers = {"Content-Type": content_type, "Accept": accept} - + if files_download: + if 'Content-Type' not in files_download: + raise ConfigurationError("Files download configuration misses the `Content-Type` value") + if 'Accept' not in files_download: + raise ConfigurationError("Files download configuration misses the `Accept` value") + file_content_type = files_download['Content-Type'] + file_accept = files_download['Accept'] + else: + file_content_type = file_accept = "text/plain" self.headers_download = { - "Content-Type": self.files_download["Content-Type"] - if self.files_download and "Content-Type" in self.files_download - else "text/plain", - "Accept": self.files_download["Accept"] - if self.files_download and "Accept" in self.files_download - else "text/plain", + "Content-Type": file_content_type, + "Accept": file_accept } - self.max_connection = params.pop('max_connection', None) - + @staticmethod def resources_from_request(url: str, headers: Dict, @@ -98,4 +106,4 @@ def build_resources_from_results(results): return [ Resource(**{k: _process_types(v) for k, v in x.items()}) for x in results - ] \ No newline at end of file + ] diff --git a/kgforge/specializations/stores/web_service_store.py b/kgforge/specializations/stores/web_service_store.py index 5b700934..94a672df 100644 --- a/kgforge/specializations/stores/web_service_store.py +++ b/kgforge/specializations/stores/web_service_store.py @@ -11,21 +11,24 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . +from typing import Dict, List, Optional, Union, Type, Callable, Tuple import copy import asyncio import requests from aiohttp import ClientSession from requests.exceptions import SSLError -from typing import Dict, List, Optional, Union, Type, Callable + from kgforge.core.resource import Resource +from kgforge.core.commons.context import Context from kgforge.core.archetypes.model import Model from kgforge.core.archetypes.mapper import Mapper -from kgforge.specializations.mappers.dictionaries import DictionaryMapper -from kgforge.core.archetypes.dataset_store import DatasetStore from kgforge.core.archetypes.resolver import Resolver +from kgforge.core.wrappings.dict import DictWrapper +from kgforge.core.archetypes.dataset_store import DatasetStore +from kgforge.specializations.mappers.dictionaries import DictionaryMapper from kgforge.core.commons.exceptions import ConfigurationError, DownloadingError -from kgforge.core.commons.execution import catch_http_error, error_message +from kgforge.core.commons.execution import not_supported, catch_http_error, error_message from kgforge.core.wrappings.paths import Filter @@ -39,54 +42,26 @@ def __init__( self, model: Model, endpoint: str, - content_type: str, - accept: str, - response_location : Optional[str] = None, - files_download: Optional[Dict] = None, - searchendpoints : Optional[Dict] = None, + request_params: dict, + token: Optional[str] = None, + searchendpoints: Optional[Dict] = None, health_endpoint: Optional[str] = None, **params, ): super().__init__(model) self.health_endpoint = health_endpoint - self.service = self._initialize_service(endpoint, content_type, accept, - response_location, files_download, - searchendpoints, **params) + params.update({"request_params": request_params}) + self.service = self._initialize_service(endpoint=endpoint, + token=token, + searchendpoints=searchendpoints, + **params) @property def mapper(self) -> Optional[Type[Mapper]]: return DictionaryMapper - def _search(self, resolvers: Optional[List[Resolver]], - filters: List[Union[Dict, Filter]], - **params - ) -> Optional[List[Resource]]: - # resolvers are not used, just passed because of previous methods shapes - if not isinstance(filters, dict): - raise NotImplementedError('Currently only the use of a dictionary as a filter is implemented') - searchendpoint = params.pop('searchendpoint', None) - if searchendpoint: - if self.service.searchendpoints is None: - raise ConfigurationError("No searchendpoints were given " - "in the initial configuration.") - try: - endpoint = self.service.searchendpoints[searchendpoint]['endpoint'] - except KeyError: - raise ConfigurationError(f"The {searchendpoint} searchpoint was not given " \ - "in the initial configuration.") - else: - endpoint = self.service.endpoint - # Combine the two dictionaries - for flr in filters: - params.update(flr) - return self.service.resources_from_request(endpoint, self.service.headers, - self.service.response_location, **params) - - - - @catch - def download(self, urls: Union[str, List[str]], - paths: Union[str, List[str]], overwrite: bool = False) -> None: + def download(self, urls: Union[str, List[str]], + paths: Union[str, List[str]], overwrite: bool = False) -> None: # path: DirPath. """Download files """ if isinstance(urls, list): @@ -149,24 +124,87 @@ def _download_one(self, url: str, path: str) -> None: for chunk in response.iter_content(chunk_size=4096): f.write(chunk) - def health(self) -> Callable: + def _prepare_download_one(self, url: str, store_metadata: Optional[DictWrapper], + cross_bucket: bool) -> Tuple[str, str]: + raise not_supported() + + def retrieve( + self, id: str, version: Optional[Union[int, str]], cross_bucket: bool, **params + ) -> Resource: + raise not_supported() + + def _retrieve_filename(self, id: str) -> str: + raise not_supported() + + def _search(self, resolvers: Optional[List[Resolver]], + filters: List[Union[Dict, Filter]], + **params + ) -> Optional[List[Resource]]: + # resolvers are not used, just passed because of previous methods shapes + if not isinstance(filters, dict): + raise NotImplementedError('Currently only the use of a dictionary as a filter is implemented') + searchendpoint = params.pop('searchendpoint', None) + if searchendpoint: + if self.service.searchendpoints is None: + raise ConfigurationError("No searchendpoints were given " + "in the initial configuration.") + try: + endpoint = self.service.searchendpoints[searchendpoint]['endpoint'] + except KeyError: + raise ConfigurationError(f"The {searchendpoint} searchpoint was not given " + "in the initial configuration.") + else: + endpoint = self.service.endpoint + # Combine the two dictionaries + for flr in filters: + params.update(flr) + return self.service.resources_from_request(endpoint, self.service.headers, + self.service.response_location, **params) + + def _sparql(self, query: str) -> Optional[Union[List[Resource], Resource]]: + raise not_supported() + + def elastic( + self, query: str, debug: bool, limit: int = None, offset: int = None, **params + ) -> Optional[Union[List[Resource], Resource]]: + raise not_supported() + + def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: + raise not_supported() + + def health(self) -> Dict: if self.health_endpoint: try: response = requests.get(self.health_endpoint) - except SSLError: - response = requests.get(self.health_endpoint, verify=False) - return response.json() + except Exception as error: + if isinstance(error, SSLError): + response = requests.get(self.health_endpoint, verify=False) + catch_http_error( + response, requests.HTTPError, + error_message_formatter=lambda e: + f"Health check failed: {error_message(e)}" + ) + return response.json() + else: + raise ConfigurationError(f"Health check failed: {error_message(error)}") else: - raise ConfigurationError('Health information not reachable with given configuration. \ - Define health in configuration arguments or set _health.') - - def _initialize_service(endpoint: str, - content_type: str, - accept: str, - response_location : Optional[str] = None, - files_download: Optional[Dict] = None, - searchendpoints : Optional[Dict] = None, + raise ConfigurationError("Health information not found with given configuration. " + "Define health in configuration arguments or set _health.") + + def _initialize_service(self, endpoint: str, + token: Optional[str], + searchendpoints: Optional[Dict], **params - ) -> WebService: - return WebService(endpoint, content_type, accept, response_location, - files_download, searchendpoints, **params) \ No newline at end of file + ) -> WebService: + requests_params = params.pop("request_params") + # split the parameters before initializing the service + content_type = requests_params.get("content_type", None) + if not content_type: + raise ConfigurationError("Content type not specified in request_params: " + f"{requests_params}") + accept = requests_params.get("accept", None) + response_location = requests_params.get("response_location", None) + files_download = requests_params.get("files_download", None) + return WebService(endpoint, content_type, + accept, response_location, + files_download, searchendpoints, **params) diff --git a/tests/specializations/stores/test_sparql.py b/tests/specializations/stores/test_sparql.py index f782df8e..da414f33 100644 --- a/tests/specializations/stores/test_sparql.py +++ b/tests/specializations/stores/test_sparql.py @@ -12,11 +12,13 @@ # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . +from typing import Any import pytest from utils import full_path_relative_to_root from kgforge.specializations.models.rdf_model import RdfModel from kgforge.specializations.stores.sparql_store import SPARQLStore +from kgforge.core.commons.exceptions import NotSupportedError SEARCH_ENDPOINT = {"sparql": {"endpoint": "http://dbpedia.org/sparql"}} @@ -58,3 +60,8 @@ def test_config(sparql_store, rdf_model): def test_search_params(sparql_store): with pytest.raises(ValueError): sparql_store.search(resolvers=[None], filters=[None]) + + +def test_elastic_not_supported(sparql_store: Any): + with pytest.raises(NotSupportedError): + sparql_store.elastic(query=None, debug=False) diff --git a/tests/specializations/stores/test_web_service.py b/tests/specializations/stores/test_web_service.py index b3445ad7..2c6aeeba 100644 --- a/tests/specializations/stores/test_web_service.py +++ b/tests/specializations/stores/test_web_service.py @@ -12,9 +12,11 @@ # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . +from typing import Any import pytest from utils import full_path_relative_to_root +from kgforge.core.commons.exceptions import ConfigurationError, NotSupportedError from kgforge.specializations.models.rdf_model import RdfModel from kgforge.specializations.stores.web_service_store import WebServiceStore @@ -33,39 +35,78 @@ def rdf_model(): return model -# model: Model, -# endpoint: str, -# content_type: str, -# accept: str, -# response_location : Optional[str] = None, -# files_download: Optional[Dict] = None, -# searchendpoints : Optional[Dict] = None, -# health_endpoint: Optional[str] = None, - def test_config_searchendpoints(rdf_model: RdfModel): - with pytest.raises(ValueError): + with pytest.raises(ConfigurationError): WebServiceStore( model=rdf_model, endpoint=ENDPOINT, - content_type="application/json", - accept="*/*", + request_params={ + "content_type":"application/json", + "accept":"*/*"}, searchendpoints={"elastic": None} ) +def test_config_file_downloads_content_type(rdf_model: RdfModel): + with pytest.raises(ConfigurationError): + WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*", + "files_download": {"Accept": "application/json"}}, + ) + + +def test_config_file_downloads_accept(rdf_model: RdfModel): + with pytest.raises(ConfigurationError): + WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*", + "files_download": {"Content-Type": "application/json"}}, + ) + + @pytest.fixture -def sparql_store(rdf_model: RdfModel): - return SPARQLStore( +def web_service_store(rdf_model: RdfModel): + return WebServiceStore( model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*", + "files_download": {"Content-Type": "application/json", + "Accept": "*/*"} + }, + health_endpoint="https://mynotreal.com/health" ) -def test_config(sparql_store: Any, rdf_model: RdfModel): - assert sparql_store.model == rdf_model - assert not sparql_store.endpoint - assert sparql_store.model.context() == rdf_model.context() +def test_config(web_service_store: Any, rdf_model: RdfModel): + assert web_service_store.model == rdf_model + assert web_service_store.service.endpoint + assert web_service_store.model.context() == rdf_model.context() + + +def test_health_not_valid(web_service_store): + with pytest.raises(ConfigurationError): + web_service_store.health() + + +def test_sparql_not_implemented(web_service_store: Any): + with pytest.raises(NotSupportedError): + web_service_store.sparql(query="SELECT * WHERE { ?s ?p ?o }") + + +def test_elastic_not_supported(web_service_store: Any): + with pytest.raises(NotSupportedError): + web_service_store.elastic(query=None, debug=False) -def test_search_params(sparql_store: Any): - with pytest.raises(ValueError): - sparql_store.search(resolvers=[None], filters=[None]) \ No newline at end of file +def test_retrieve_not_supported(web_service_store: Any): + with pytest.raises(NotSupportedError): + web_service_store.retrieve(id=None, version=None, cross_bucket=False)