From ef3567a49a1c7af7aa96d882af99a9b92b082d03 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 1 Sep 2023 11:29:58 -0400 Subject: [PATCH 1/3] feat: aggregate workflows fix: parallel service requests with service manager dependency --- bento_service_registry/routes.py | 14 ++- bento_service_registry/services.py | 136 ++++++++++++++++++---------- bento_service_registry/workflows.py | 89 ++++++++++++++++++ tests/test_api.py | 8 ++ 4 files changed, 193 insertions(+), 54 deletions(-) create mode 100644 bento_service_registry/workflows.py diff --git a/bento_service_registry/routes.py b/bento_service_registry/routes.py index 0585cd4..e3a8c23 100644 --- a/bento_service_registry/routes.py +++ b/bento_service_registry/routes.py @@ -5,10 +5,10 @@ from .bento_services_json import BentoServicesByKindDependency, BentoServicesByComposeIDDependency from .data_types import DataTypesDependency, DataTypesTuple from .http_session import HTTPSessionDependency -from .logger import LoggerDependency from .models import DataTypeWithServiceURL from .service_info import ServiceInfoDependency -from .services import get_service, ServicesDependency +from .services import ServiceManagerDependency, ServicesDependency +from .workflows import WorkflowsByPurpose, WorkflowsDependency __all__ = [ "service_registry", @@ -42,8 +42,8 @@ async def get_service_by_id( authz_header: OptionalAuthzHeaderDependency, bento_services_by_kind: BentoServicesByKindDependency, http_session: HTTPSessionDependency, - logger: LoggerDependency, service_info: ServiceInfoDependency, + service_manager: ServiceManagerDependency, services_tuple: ServicesDependency, service_id: str, ): @@ -55,9 +55,8 @@ async def get_service_by_id( svc = services_by_service_id[service_id] # Get service by bento.serviceKind, using type.artifact as a backup for legacy reasons - service_data = await get_service( + service_data = await service_manager.get_service( authz_header, - logger, http_session, service_info, bento_services_by_kind[svc.get("bento", {}).get("serviceKind", svc["type"]["artifact"])], @@ -83,6 +82,11 @@ async def get_data_type(data_types: DataTypesDependency, data_type_id: str) -> D raise HTTPException(status.HTTP_404_NOT_FOUND, f"Data type with ID {data_type_id} was not found") +@service_registry.get("/workflows", dependencies=[authz_middleware.dep_public_endpoint()]) +async def list_workflows_by_purpose(workflows: WorkflowsDependency) -> WorkflowsByPurpose: + return workflows + + @service_registry.get("/service-info", dependencies=[authz_middleware.dep_public_endpoint()]) async def get_service_info(service_info: ServiceInfoDependency): # Spec: https://github.com/ga4gh-discovery/ga4gh-service-info diff --git a/bento_service_registry/services.py b/bento_service_registry/services.py index fddbf13..fdf9ad1 100644 --- a/bento_service_registry/services.py +++ b/bento_service_registry/services.py @@ -2,15 +2,17 @@ import asyncio import logging +from aiohttp import ClientSession from bento_lib.types import GA4GHServiceInfo from datetime import datetime from fastapi import Depends, status +from functools import lru_cache from json import JSONDecodeError -from typing import Annotated +from typing import Annotated, Awaitable from urllib.parse import urljoin from .authz_header import OptionalAuthzHeader, OptionalAuthzHeaderDependency -from .bento_services_json import BentoServicesByKindDependency +from .bento_services_json import BentoServicesByKind, BentoServicesByKindDependency from .constants import BENTO_SERVICE_KIND from .http_session import HTTPSessionDependency from .logger import LoggerDependency @@ -19,79 +21,115 @@ __all__ = [ - "get_service", + "get_service_manager", + "ServiceManagerDependency", "get_services", "ServicesDependency", ] -async def get_service( - authz_header: OptionalAuthzHeader, - logger: logging.Logger, - session: aiohttp.ClientSession, - service_info: GA4GHServiceInfo, - service_metadata: BentoService, -) -> dict[str, dict] | None: - kind = service_metadata["service_kind"] +class ServiceManager: + def __init__(self, logger: logging.Logger): + self._co: Awaitable[list[dict | None]] | None = None + self._logger = logger - # special case: requesting info about the current service. Skip networking / self-connect; - # instead, return pre-calculated /service-info contents. - if kind == BENTO_SERVICE_KIND: - return service_info + async def get_service( + self, + authz_header: OptionalAuthzHeaderDependency, + http_session: HTTPSessionDependency, + service_info: ServiceInfoDependency, + service_metadata: BentoService, + ) -> dict | None: + kind = service_metadata["service_kind"] - s_url: str = service_metadata["url"] - service_info_url: str = urljoin(f"{s_url}/", "service-info") + # special case: requesting info about the current service. Skip networking / self-connect; + # instead, return pre-calculated /service-info contents. + if kind == BENTO_SERVICE_KIND: + return service_info - dt = datetime.now() - logger.info(f"Contacting {service_info_url}{' with bearer token' if authz_header else ''}") + s_url: str = service_metadata["url"] + service_info_url: str = urljoin(f"{s_url}/", "service-info") - service_resp: dict[str, dict] = {} + dt = datetime.now() + self._logger.info(f"Contacting {service_info_url}{' with bearer token' if authz_header else ''}") - try: - async with session.get(service_info_url, headers=authz_header) as r: - if r.status != status.HTTP_200_OK: - r_text = await r.text() - logger.error(f"Non-200 status code on {kind}: {r.status} Content: {r_text}") + service_resp: dict[str, dict] = {} - # If we have the special case where we got a JWT error from the proxy script, we can safely print out - # headers for debugging, since the JWT leaked isn't valid anyway. - if "invalid jwt" in r_text: - logger.error(f"Encountered auth error on {kind}; tried to use header: {authz_header}") + try: + async with http_session.get(service_info_url, headers=authz_header) as r: + if r.status != status.HTTP_200_OK: + r_text = await r.text() + self._logger.error(f"Non-200 status code on {kind}: {r.status} Content: {r_text}") - return None + # If we have the special case where we got a JWT error from the proxy script, we can safely print + # out headers for debugging, since the JWT leaked isn't valid anyway. + if "invalid jwt" in r_text: + self._logger.error(f"Encountered auth error on {kind}; tried to use header: {authz_header}") - try: - service_resp[kind] = {**(await r.json()), "url": s_url} - except (JSONDecodeError, aiohttp.ContentTypeError, TypeError) as e: - # JSONDecodeError can happen if the JSON is invalid - # ContentTypeError can happen if the Content-Type is not application/json - # TypeError can happen if None is received - logger.error(f"Encountered invalid response ({str(e)}) from {service_info_url}: {await r.text()}") + return None - logger.info(f"{service_info_url}: Took {(datetime.now() - dt).total_seconds():.1f}s") + try: + service_resp[kind] = {**(await r.json()), "url": s_url} + except (JSONDecodeError, aiohttp.ContentTypeError, TypeError) as e: + # JSONDecodeError can happen if the JSON is invalid + # ContentTypeError can happen if the Content-Type is not application/json + # TypeError can happen if None is received + self._logger.error( + f"Encountered invalid response ({str(e)}) from {service_info_url}: {await r.text()}") - except asyncio.TimeoutError: - logger.error(f"Encountered timeout with {service_info_url}") + self._logger.info(f"{service_info_url}: Took {(datetime.now() - dt).total_seconds():.1f}s") - except aiohttp.ClientConnectionError as e: - logger.error(f"Encountered connection error with {service_info_url}: {str(e)}") + except asyncio.TimeoutError: + self._logger.error(f"Encountered timeout with {service_info_url}") - return service_resp.get(kind) + except aiohttp.ClientConnectionError as e: + self._logger.error(f"Encountered connection error with {service_info_url}: {str(e)}") + + return service_resp.get(kind) + + async def get_services( + self, + authz_header: OptionalAuthzHeader, + bento_services_by_kind: BentoServicesByKind, + http_session: ClientSession, + service_info: GA4GHServiceInfo, + ) -> tuple[dict, ...]: + if not self._co: + self._co = asyncio.gather(*[ + self.get_service(authz_header, http_session, service_info, s) + for s in bento_services_by_kind.values() + ]) + + service_list: list[dict | None] = await self._co + self._co = None + + return tuple(s for s in service_list if s is not None) + + +@lru_cache +def get_service_manager( + logger: LoggerDependency, +): + return ServiceManager(logger) + + +ServiceManagerDependency = Annotated[ServiceManager, Depends(get_service_manager)] async def get_services( authz_header: OptionalAuthzHeaderDependency, bento_services_by_kind: BentoServicesByKindDependency, http_session: HTTPSessionDependency, - logger: LoggerDependency, service_info: ServiceInfoDependency, + service_manager: ServiceManagerDependency, ) -> tuple[dict, ...]: # noinspection PyTypeChecker - service_list: list[dict | None] = await asyncio.gather(*[ - get_service(authz_header, logger, http_session, service_info, s) - for s in bento_services_by_kind.values() - ]) - return tuple(s for s in service_list if s is not None) + return await service_manager.get_services( + authz_header, + bento_services_by_kind, + http_session, + service_info, + ) ServicesDependency = Annotated[tuple[dict, ...], Depends(get_services)] diff --git a/bento_service_registry/workflows.py b/bento_service_registry/workflows.py new file mode 100644 index 0000000..2f48a0a --- /dev/null +++ b/bento_service_registry/workflows.py @@ -0,0 +1,89 @@ +import aiohttp +import asyncio +import logging + +from fastapi import Depends, status +from typing import Annotated +from urllib.parse import urljoin + +from .authz_header import OptionalAuthzHeader, OptionalAuthzHeaderDependency +from .http_session import HTTPSessionDependency +from .logger import LoggerDependency +from .services import ServicesDependency + +__all__ = [ + "WorkflowsByPurpose", + "get_workflows", + "WorkflowsDependency", +] + + +WorkflowsByPurpose = dict[str, dict[str, dict]] + + +async def get_workflows_from_service( + authz_header: OptionalAuthzHeader, + http_session: aiohttp.ClientSession, + logger: logging.Logger, + service: dict, +) -> WorkflowsByPurpose: + service_url: str | None = service.get("url") + + if service_url is None: + logger.error(f"Encountered a service missing a URL: {service}") + return {} + + service_url_norm: str = service_url.rstrip("/") + "/" + + async with http_session.get(urljoin(service_url_norm, "workflows"), headers=authz_header) as res: + if res.status != status.HTTP_200_OK: + logger.error( + f"Got non-200 response from data type service ({service_url=}): {res.status=}; body={await res.json()}") + return {} + + wfs: dict[str, dict[str, dict]] = {} + + for purpose, purpose_wfs in (await res.json()).items(): + if purpose not in wfs: + wfs[purpose] = {} + # TODO: pydantic model + validation + wfs[purpose].update({k: {**wf, "service_base_url": service_url_norm} for k, wf in purpose_wfs.items()}) + + return wfs + + +async def get_workflows( + authz_header: OptionalAuthzHeaderDependency, + http_session: HTTPSessionDependency, + logger: LoggerDependency, + services_tuple: ServicesDependency, +) -> WorkflowsByPurpose: + logger.debug("Collecting workflows from workflow-providing services") + + workflow_services = [ + s for s in services_tuple + if (b := s.get("bento", {})).get("dataService", False) or b.get("workflowProvider", False) + ] + + logger.debug(f"Found {len(workflow_services)} workflow-providing services") + + service_wfs = await asyncio.gather( + *(get_workflows_from_service(authz_header, http_session, logger, s) for s in workflow_services) + ) + + workflows_from_services: WorkflowsByPurpose = {} + workflows_found: int = 0 + + for s_wfs in service_wfs: + for purpose, purpose_wfs in s_wfs.items(): + if purpose not in workflows_from_services: + workflows_from_services[purpose] = {} + workflows_from_services[purpose].update(purpose_wfs) + workflows_found += len(purpose_wfs) + + logger.debug(f"Obtained {workflows_found} workflows") + + return workflows_from_services + + +WorkflowsDependency = Annotated[WorkflowsByPurpose, Depends(get_workflows)] diff --git a/tests/test_api.py b/tests/test_api.py index d4ed131..786f099 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -84,3 +84,11 @@ def test_data_types_list(client): def test_data_types_detail_404(client): r = client.get("/data-types/dne") assert r.status_code == 404 + + +def test_workflows_by_purpose(client): + r = client.get("/workflows") + d = r.json() + + assert r.status_code == 200 + assert len(d) == 0 # no workflow-providing services From a94a9b6007ba092fbf77f923a56c0709666750c1 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 1 Sep 2023 11:34:53 -0400 Subject: [PATCH 2/3] chore: bump version to 1.2.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 21e1c27..80b340d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "bento_service_registry" -version = "1.1.1" +version = "1.2.0" description = "An implementation of GA4GH Service Registry API for the Bento platform." authors = ["David Lougheed "] readme = "README.md" From da2eb4ccdb3e2b39c632c7214a74fd909d784fc7 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 1 Sep 2023 11:36:05 -0400 Subject: [PATCH 3/3] chore: update lockfile --- poetry.lock | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index e508c81..ad3395d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -160,24 +160,24 @@ files = [ [[package]] name = "anyio" -version = "3.7.1" +version = "4.0.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "anyio-3.7.1-py3-none-any.whl", hash = "sha256:91dee416e570e92c64041bd18b900d1d6fa78dff7048769ce5ac5ddad004fbb5"}, - {file = "anyio-3.7.1.tar.gz", hash = "sha256:44a3c9aba0f5defa43261a8b3efb97891f2bd7d804e0e1f56419befa1adfc780"}, + {file = "anyio-4.0.0-py3-none-any.whl", hash = "sha256:cfdb2b588b9fc25ede96d8db56ed50848b0b649dca3dd1df0b11f683bb9e0b5f"}, + {file = "anyio-4.0.0.tar.gz", hash = "sha256:f7ed51751b2c2add651e5747c891b47e26d2a21be5d32d9311dfe9692f3e5d7a"}, ] [package.dependencies] -exceptiongroup = {version = "*", markers = "python_version < \"3.11\""} +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} idna = ">=2.8" sniffio = ">=1.1" [package.extras] -doc = ["Sphinx", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme (>=1.2.2)", "sphinxcontrib-jquery"] -test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] -trio = ["trio (<0.22)"] +doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] +trio = ["trio (>=0.22)"] [[package]] name = "async-timeout"