Skip to content

Commit

Permalink
Merge pull request #21 from bento-platform/feat/collect-workflows
Browse files Browse the repository at this point in the history
feat: aggregate workflows
  • Loading branch information
davidlougheed authored Sep 1, 2023
2 parents 7e36d3b + da2eb4c commit 779dc73
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 63 deletions.
14 changes: 9 additions & 5 deletions bento_service_registry/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from .bento_services_json import BentoServicesByKindDependency, BentoServicesByComposeIDDependency
from .data_types import DataTypesDependency, DataTypesTuple
from .http_session import HTTPSessionDependency
from .logger import LoggerDependency
from .models import DataTypeWithServiceURL
from .service_info import ServiceInfoDependency
from .services import get_service, ServicesDependency
from .services import ServiceManagerDependency, ServicesDependency
from .workflows import WorkflowsByPurpose, WorkflowsDependency

__all__ = [
"service_registry",
Expand Down Expand Up @@ -42,8 +42,8 @@ async def get_service_by_id(
authz_header: OptionalAuthzHeaderDependency,
bento_services_by_kind: BentoServicesByKindDependency,
http_session: HTTPSessionDependency,
logger: LoggerDependency,
service_info: ServiceInfoDependency,
service_manager: ServiceManagerDependency,
services_tuple: ServicesDependency,
service_id: str,
):
Expand All @@ -55,9 +55,8 @@ async def get_service_by_id(
svc = services_by_service_id[service_id]

# Get service by bento.serviceKind, using type.artifact as a backup for legacy reasons
service_data = await get_service(
service_data = await service_manager.get_service(
authz_header,
logger,
http_session,
service_info,
bento_services_by_kind[svc.get("bento", {}).get("serviceKind", svc["type"]["artifact"])],
Expand All @@ -83,6 +82,11 @@ async def get_data_type(data_types: DataTypesDependency, data_type_id: str) -> D
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Data type with ID {data_type_id} was not found")


@service_registry.get("/workflows", dependencies=[authz_middleware.dep_public_endpoint()])
async def list_workflows_by_purpose(workflows: WorkflowsDependency) -> WorkflowsByPurpose:
return workflows


@service_registry.get("/service-info", dependencies=[authz_middleware.dep_public_endpoint()])
async def get_service_info(service_info: ServiceInfoDependency):
# Spec: https://github.com/ga4gh-discovery/ga4gh-service-info
Expand Down
136 changes: 87 additions & 49 deletions bento_service_registry/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
import asyncio
import logging

from aiohttp import ClientSession
from bento_lib.types import GA4GHServiceInfo
from datetime import datetime
from fastapi import Depends, status
from functools import lru_cache
from json import JSONDecodeError
from typing import Annotated
from typing import Annotated, Awaitable
from urllib.parse import urljoin

from .authz_header import OptionalAuthzHeader, OptionalAuthzHeaderDependency
from .bento_services_json import BentoServicesByKindDependency
from .bento_services_json import BentoServicesByKind, BentoServicesByKindDependency
from .constants import BENTO_SERVICE_KIND
from .http_session import HTTPSessionDependency
from .logger import LoggerDependency
Expand All @@ -19,79 +21,115 @@


__all__ = [
"get_service",
"get_service_manager",
"ServiceManagerDependency",
"get_services",
"ServicesDependency",
]


async def get_service(
authz_header: OptionalAuthzHeader,
logger: logging.Logger,
session: aiohttp.ClientSession,
service_info: GA4GHServiceInfo,
service_metadata: BentoService,
) -> dict[str, dict] | None:
kind = service_metadata["service_kind"]
class ServiceManager:
def __init__(self, logger: logging.Logger):
self._co: Awaitable[list[dict | None]] | None = None
self._logger = logger

# special case: requesting info about the current service. Skip networking / self-connect;
# instead, return pre-calculated /service-info contents.
if kind == BENTO_SERVICE_KIND:
return service_info
async def get_service(
self,
authz_header: OptionalAuthzHeaderDependency,
http_session: HTTPSessionDependency,
service_info: ServiceInfoDependency,
service_metadata: BentoService,
) -> dict | None:
kind = service_metadata["service_kind"]

s_url: str = service_metadata["url"]
service_info_url: str = urljoin(f"{s_url}/", "service-info")
# special case: requesting info about the current service. Skip networking / self-connect;
# instead, return pre-calculated /service-info contents.
if kind == BENTO_SERVICE_KIND:
return service_info

dt = datetime.now()
logger.info(f"Contacting {service_info_url}{' with bearer token' if authz_header else ''}")
s_url: str = service_metadata["url"]
service_info_url: str = urljoin(f"{s_url}/", "service-info")

service_resp: dict[str, dict] = {}
dt = datetime.now()
self._logger.info(f"Contacting {service_info_url}{' with bearer token' if authz_header else ''}")

try:
async with session.get(service_info_url, headers=authz_header) as r:
if r.status != status.HTTP_200_OK:
r_text = await r.text()
logger.error(f"Non-200 status code on {kind}: {r.status} Content: {r_text}")
service_resp: dict[str, dict] = {}

# If we have the special case where we got a JWT error from the proxy script, we can safely print out
# headers for debugging, since the JWT leaked isn't valid anyway.
if "invalid jwt" in r_text:
logger.error(f"Encountered auth error on {kind}; tried to use header: {authz_header}")
try:
async with http_session.get(service_info_url, headers=authz_header) as r:
if r.status != status.HTTP_200_OK:
r_text = await r.text()
self._logger.error(f"Non-200 status code on {kind}: {r.status} Content: {r_text}")

return None
# If we have the special case where we got a JWT error from the proxy script, we can safely print
# out headers for debugging, since the JWT leaked isn't valid anyway.
if "invalid jwt" in r_text:
self._logger.error(f"Encountered auth error on {kind}; tried to use header: {authz_header}")

try:
service_resp[kind] = {**(await r.json()), "url": s_url}
except (JSONDecodeError, aiohttp.ContentTypeError, TypeError) as e:
# JSONDecodeError can happen if the JSON is invalid
# ContentTypeError can happen if the Content-Type is not application/json
# TypeError can happen if None is received
logger.error(f"Encountered invalid response ({str(e)}) from {service_info_url}: {await r.text()}")
return None

logger.info(f"{service_info_url}: Took {(datetime.now() - dt).total_seconds():.1f}s")
try:
service_resp[kind] = {**(await r.json()), "url": s_url}
except (JSONDecodeError, aiohttp.ContentTypeError, TypeError) as e:
# JSONDecodeError can happen if the JSON is invalid
# ContentTypeError can happen if the Content-Type is not application/json
# TypeError can happen if None is received
self._logger.error(
f"Encountered invalid response ({str(e)}) from {service_info_url}: {await r.text()}")

except asyncio.TimeoutError:
logger.error(f"Encountered timeout with {service_info_url}")
self._logger.info(f"{service_info_url}: Took {(datetime.now() - dt).total_seconds():.1f}s")

except aiohttp.ClientConnectionError as e:
logger.error(f"Encountered connection error with {service_info_url}: {str(e)}")
except asyncio.TimeoutError:
self._logger.error(f"Encountered timeout with {service_info_url}")

return service_resp.get(kind)
except aiohttp.ClientConnectionError as e:
self._logger.error(f"Encountered connection error with {service_info_url}: {str(e)}")

return service_resp.get(kind)

async def get_services(
self,
authz_header: OptionalAuthzHeader,
bento_services_by_kind: BentoServicesByKind,
http_session: ClientSession,
service_info: GA4GHServiceInfo,
) -> tuple[dict, ...]:
if not self._co:
self._co = asyncio.gather(*[
self.get_service(authz_header, http_session, service_info, s)
for s in bento_services_by_kind.values()
])

service_list: list[dict | None] = await self._co
self._co = None

return tuple(s for s in service_list if s is not None)


@lru_cache
def get_service_manager(
logger: LoggerDependency,
):
return ServiceManager(logger)


ServiceManagerDependency = Annotated[ServiceManager, Depends(get_service_manager)]


async def get_services(
authz_header: OptionalAuthzHeaderDependency,
bento_services_by_kind: BentoServicesByKindDependency,
http_session: HTTPSessionDependency,
logger: LoggerDependency,
service_info: ServiceInfoDependency,
service_manager: ServiceManagerDependency,
) -> tuple[dict, ...]:
# noinspection PyTypeChecker
service_list: list[dict | None] = await asyncio.gather(*[
get_service(authz_header, logger, http_session, service_info, s)
for s in bento_services_by_kind.values()
])
return tuple(s for s in service_list if s is not None)
return await service_manager.get_services(
authz_header,
bento_services_by_kind,
http_session,
service_info,
)


ServicesDependency = Annotated[tuple[dict, ...], Depends(get_services)]
89 changes: 89 additions & 0 deletions bento_service_registry/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import aiohttp
import asyncio
import logging

from fastapi import Depends, status
from typing import Annotated
from urllib.parse import urljoin

from .authz_header import OptionalAuthzHeader, OptionalAuthzHeaderDependency
from .http_session import HTTPSessionDependency
from .logger import LoggerDependency
from .services import ServicesDependency

__all__ = [
"WorkflowsByPurpose",
"get_workflows",
"WorkflowsDependency",
]


WorkflowsByPurpose = dict[str, dict[str, dict]]


async def get_workflows_from_service(
authz_header: OptionalAuthzHeader,
http_session: aiohttp.ClientSession,
logger: logging.Logger,
service: dict,
) -> WorkflowsByPurpose:
service_url: str | None = service.get("url")

if service_url is None:
logger.error(f"Encountered a service missing a URL: {service}")
return {}

service_url_norm: str = service_url.rstrip("/") + "/"

async with http_session.get(urljoin(service_url_norm, "workflows"), headers=authz_header) as res:
if res.status != status.HTTP_200_OK:
logger.error(
f"Got non-200 response from data type service ({service_url=}): {res.status=}; body={await res.json()}")
return {}

wfs: dict[str, dict[str, dict]] = {}

for purpose, purpose_wfs in (await res.json()).items():
if purpose not in wfs:
wfs[purpose] = {}
# TODO: pydantic model + validation
wfs[purpose].update({k: {**wf, "service_base_url": service_url_norm} for k, wf in purpose_wfs.items()})

return wfs


async def get_workflows(
authz_header: OptionalAuthzHeaderDependency,
http_session: HTTPSessionDependency,
logger: LoggerDependency,
services_tuple: ServicesDependency,
) -> WorkflowsByPurpose:
logger.debug("Collecting workflows from workflow-providing services")

workflow_services = [
s for s in services_tuple
if (b := s.get("bento", {})).get("dataService", False) or b.get("workflowProvider", False)
]

logger.debug(f"Found {len(workflow_services)} workflow-providing services")

service_wfs = await asyncio.gather(
*(get_workflows_from_service(authz_header, http_session, logger, s) for s in workflow_services)
)

workflows_from_services: WorkflowsByPurpose = {}
workflows_found: int = 0

for s_wfs in service_wfs:
for purpose, purpose_wfs in s_wfs.items():
if purpose not in workflows_from_services:
workflows_from_services[purpose] = {}
workflows_from_services[purpose].update(purpose_wfs)
workflows_found += len(purpose_wfs)

logger.debug(f"Obtained {workflows_found} workflows")

return workflows_from_services


WorkflowsDependency = Annotated[WorkflowsByPurpose, Depends(get_workflows)]
16 changes: 8 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "bento_service_registry"
version = "1.1.1"
version = "1.2.0"
description = "An implementation of GA4GH Service Registry API for the Bento platform."
authors = ["David Lougheed <david.lougheed@mail.mcgill.ca>"]
readme = "README.md"
Expand Down
8 changes: 8 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ def test_data_types_list(client):
def test_data_types_detail_404(client):
r = client.get("/data-types/dne")
assert r.status_code == 404


def test_workflows_by_purpose(client):
r = client.get("/workflows")
d = r.json()

assert r.status_code == 200
assert len(d) == 0 # no workflow-providing services

0 comments on commit 779dc73

Please sign in to comment.