From 7fe7a89efa321f23d375ba8ff7a060f19b6014cc Mon Sep 17 00:00:00 2001 From: Anat Balzam <13421451+anatbal@users.noreply.github.com> Date: Tue, 13 Sep 2022 10:35:39 +0300 Subject: [PATCH] Refactor API health endpoint (#2583) * refactor health check * Fix tests * bump version --- api_app/_version.py | 2 +- api_app/api/routes/health.py | 29 +++---- api_app/resources/strings.py | 2 + api_app/services/health_checker.py | 73 +++++++++------- .../test_services/test_health_checker.py | 83 +++++++++++++------ 5 files changed, 119 insertions(+), 70 deletions(-) diff --git a/api_app/_version.py b/api_app/_version.py index b6f65f35da..e2b01a98c0 100644 --- a/api_app/_version.py +++ b/api_app/_version.py @@ -1 +1 @@ -__version__ = "0.4.30" +__version__ = "0.4.31" diff --git a/api_app/api/routes/health.py b/api_app/api/routes/health.py index e668c9e037..0386aeab30 100644 --- a/api_app/api/routes/health.py +++ b/api_app/api/routes/health.py @@ -1,8 +1,11 @@ +import asyncio +import logging from fastapi import APIRouter +from core import credentials from models.schemas.status import HealthCheck, ServiceStatus, StatusEnum from resources import strings -# from services.health_checker import create_resource_processor_status, create_state_store_status, create_service_bus_status -# import logging +from services.health_checker import create_resource_processor_status, create_state_store_status, create_service_bus_status + router = APIRouter() @@ -10,18 +13,16 @@ @router.get("/health", name=strings.API_GET_HEALTH_STATUS) @router.get("/", name=strings.API_GET_HEALTH_STATUS) async def health_check() -> HealthCheck: - # TEMP: #2048 - # cosmos_status, cosmos_message = create_state_store_status() - # sb_status, sb_message = await create_service_bus_status() - # rp_status, rp_message = create_resource_processor_status() - # services = [ServiceStatus(service=strings.COSMOS_DB, status=cosmos_status, message=cosmos_message), - # ServiceStatus(service=strings.SERVICE_BUS, status=sb_status, message=sb_message), - # ServiceStatus(service=strings.RESOURCE_PROCESSOR, status=rp_status, message=rp_message)] - # health_check_result = HealthCheck(services=services) - # if cosmos_status == StatusEnum.not_ok or sb_status == StatusEnum.not_ok or rp_status == StatusEnum.not_ok: - # logging.error(f'Cosmos Status: {cosmos_status}, message: {cosmos_message}') - # logging.error(f'Service Bus Status: {sb_status}, message: {sb_message}') - # logging.error(f'Resource Processor Status: {rp_status}, message: {rp_message}') + async with credentials.get_credential_async() as credential: + cosmos, sb, rp = await asyncio.gather( + create_state_store_status(credential), + create_service_bus_status(credential), + create_resource_processor_status(credential) + ) + if cosmos[0] == StatusEnum.not_ok or sb[0] == StatusEnum.not_ok or rp[0] == StatusEnum.not_ok: + logging.error(f'Cosmos Status: {cosmos[0]}, message: {cosmos[1]}') + logging.error(f'Service Bus Status: {sb[0]}, message: {sb[1]}') + logging.error(f'Resource Processor Status: {rp[0]}, message: {rp[1]}') services = [ServiceStatus(service=strings.COSMOS_DB, status=StatusEnum.ok, message=""), ServiceStatus(service=strings.SERVICE_BUS, status=StatusEnum.ok, message=""), diff --git a/api_app/resources/strings.py b/api_app/resources/strings.py index 9063eddc8a..0f51f37eae 100644 --- a/api_app/resources/strings.py +++ b/api_app/resources/strings.py @@ -71,11 +71,13 @@ NOT_OK = "Not OK" COSMOS_DB = "Cosmos DB" STATE_STORE_ENDPOINT_NOT_RESPONDING = "State Store endpoint is not responding" +STATE_STORE_ENDPOINT_NOT_ACCESSIBLE = "State Store endpoint is not accessible" UNSPECIFIED_ERROR = "Unspecified error" # Service bus status SERVICE_BUS = "Service Bus" SERVICE_BUS_NOT_RESPONDING = "Service Bus is not responding" +SERVICE_BUS_AUTHENTICATION_ERROR = "Cannot authenticate Service Bus" # Resource processor status RESOURCE_PROCESSOR = "Resource Processor" diff --git a/api_app/services/health_checker.py b/api_app/services/health_checker.py index 7d945e2f54..69a6861b84 100644 --- a/api_app/services/health_checker.py +++ b/api_app/services/health_checker.py @@ -1,68 +1,85 @@ +from typing import Tuple from azure.core import exceptions -from azure.cosmos import CosmosClient +from azure.cosmos.aio import CosmosClient from azure.servicebus.aio import ServiceBusClient -from azure.servicebus.exceptions import ServiceBusConnectionError -from azure.mgmt.compute import ComputeManagementClient +from azure.mgmt.cosmosdb.aio import CosmosDBManagementClient +from azure.mgmt.compute.aio import ComputeManagementClient +from azure.cosmos.exceptions import CosmosHttpResponseError +from azure.servicebus.exceptions import ServiceBusConnectionError, ServiceBusAuthenticationError -from api.dependencies.database import get_store_key -from core import config, credentials +from core import config from models.schemas.status import StatusEnum from resources import strings -def create_state_store_status() -> (StatusEnum, str): +async def get_store_key(credential) -> str: + if config.STATE_STORE_KEY: + primary_master_key = config.STATE_STORE_KEY + else: + async with CosmosDBManagementClient(credential, subscription_id=config.SUBSCRIPTION_ID) as cosmosdb_mng_client: + database_keys = await cosmosdb_mng_client.database_accounts.list_keys(resource_group_name=config.RESOURCE_GROUP_NAME, account_name=config.COSMOSDB_ACCOUNT_NAME) + primary_master_key = database_keys.primary_master_key + return primary_master_key + + +async def create_state_store_status(credential) -> Tuple[StatusEnum, str]: status = StatusEnum.ok message = "" debug = True if config.DEBUG == "true" else False try: - primary_master_key = get_store_key() - + primary_master_key = await get_store_key(credential) cosmos_client = CosmosClient(config.STATE_STORE_ENDPOINT, primary_master_key, connection_verify=debug) - list(cosmos_client.list_databases()) + async with cosmos_client: + list_databases_response = cosmos_client.list_databases() + [database async for database in list_databases_response] except exceptions.ServiceRequestError: status = StatusEnum.not_ok message = strings.STATE_STORE_ENDPOINT_NOT_RESPONDING + except CosmosHttpResponseError: + status = StatusEnum.not_ok + message = strings.STATE_STORE_ENDPOINT_NOT_ACCESSIBLE except: # noqa: E722 flake8 - no bare excepts status = StatusEnum.not_ok message = strings.UNSPECIFIED_ERROR return status, message -async def create_service_bus_status() -> (StatusEnum, str): +async def create_service_bus_status(credential) -> Tuple[StatusEnum, str]: status = StatusEnum.ok message = "" try: - async with credentials.get_credential_async() as credential: - service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential, - retry_total=0) - async with service_bus_client: - receiver = service_bus_client.get_queue_receiver( - queue_name=config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE) - async with receiver: - pass + service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential, retry_total=0) + async with service_bus_client: + receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE) + async with receiver: + pass except ServiceBusConnectionError: status = StatusEnum.not_ok message = strings.SERVICE_BUS_NOT_RESPONDING + except ServiceBusAuthenticationError: + status = StatusEnum.not_ok + message = strings.SERVICE_BUS_AUTHENTICATION_ERROR except: # noqa: E722 flake8 - no bare excepts status = StatusEnum.not_ok message = strings.UNSPECIFIED_ERROR return status, message -def create_resource_processor_status() -> (StatusEnum, str): +async def create_resource_processor_status(credential) -> Tuple[StatusEnum, str]: status = StatusEnum.ok message = "" try: vmss_name = f"vmss-rp-porter-{config.TRE_ID}" - compute_client = ComputeManagementClient(credential=credentials.get_credential(), subscription_id=config.SUBSCRIPTION_ID) - vmss_list = compute_client.virtual_machine_scale_set_vms.list(config.RESOURCE_GROUP_NAME, vmss_name) - for vm in vmss_list: - instance_view = compute_client.virtual_machine_scale_set_vms.get_instance_view(config.RESOURCE_GROUP_NAME, vmss_name, vm.instance_id) - health_status = instance_view.vm_health.status.code - if health_status != strings.RESOURCE_PROCESSOR_HEALTHY_MESSAGE: - status = StatusEnum.not_ok - message = strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE - except: # noqa: E722 flake8 - no bare excepts + compute_client = ComputeManagementClient(credential=credential, subscription_id=config.SUBSCRIPTION_ID) + async with compute_client: + vmss_list = compute_client.virtual_machine_scale_set_vms.list(config.RESOURCE_GROUP_NAME, vmss_name) + async for vm in vmss_list: + instance_view = await compute_client.virtual_machine_scale_set_vms.get_instance_view(config.RESOURCE_GROUP_NAME, vmss_name, vm.instance_id) + health_status = instance_view.vm_health.status.code + if health_status != strings.RESOURCE_PROCESSOR_HEALTHY_MESSAGE: + status = StatusEnum.not_ok + message = strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE + except: # noqa: E722 flake8 - no bare excepts status = StatusEnum.not_ok message = strings.UNSPECIFIED_ERROR return status, message diff --git a/api_app/tests_ma/test_services/test_health_checker.py b/api_app/tests_ma/test_services/test_health_checker.py index 8c7afb95cf..84ff3ae989 100644 --- a/api_app/tests_ma/test_services/test_health_checker.py +++ b/api_app/tests_ma/test_services/test_health_checker.py @@ -1,44 +1,50 @@ +import asyncio from unittest.mock import AsyncMock, MagicMock import pytest from azure.core.exceptions import ServiceRequestError from azure.servicebus.exceptions import ServiceBusConnectionError from mock import patch - from models.schemas.status import StatusEnum from resources import strings from services import health_checker +pytestmark = pytest.mark.asyncio + +@patch("core.credentials.get_credential_async") @patch("services.health_checker.get_store_key") @patch("services.health_checker.CosmosClient") -def test_get_state_store_status_responding(cosmos_client_mock, get_store_key_mock) -> None: +async def test_get_state_store_status_responding(_, get_store_key_mock, get_credential_async) -> None: get_store_key_mock.return_value = None - cosmos_client_mock().list_databases.return_value = [] - status, message = health_checker.create_state_store_status() + status, message = await health_checker.create_state_store_status(get_credential_async) assert status == StatusEnum.ok assert message == "" +@patch("core.credentials.get_credential_async") @patch("services.health_checker.get_store_key") @patch("services.health_checker.CosmosClient") -def test_get_state_store_status_not_responding(cosmos_client_mock, get_store_key_mock) -> None: +async def test_get_state_store_status_not_responding(cosmos_client_mock, get_store_key_mock, get_credential_async) -> None: + get_credential_async.return_value = AsyncMock() get_store_key_mock.return_value = None cosmos_client_mock.return_value = None cosmos_client_mock.side_effect = ServiceRequestError(message="some message") - status, message = health_checker.create_state_store_status() + status, message = await health_checker.create_state_store_status(get_credential_async) assert status == StatusEnum.not_ok assert message == strings.STATE_STORE_ENDPOINT_NOT_RESPONDING +@patch("core.credentials.get_credential_async") @patch("services.health_checker.get_store_key") @patch("services.health_checker.CosmosClient") -def test_get_state_store_status_other_exception(cosmos_client_mock, get_store_key_mock) -> None: +async def test_get_state_store_status_other_exception(cosmos_client_mock, get_store_key_mock, get_credential_async) -> None: + get_credential_async.return_value = AsyncMock() get_store_key_mock.return_value = None cosmos_client_mock.return_value = None cosmos_client_mock.side_effect = Exception() - status, message = health_checker.create_state_store_status() + status, message = await health_checker.create_state_store_status(get_credential_async) assert status == StatusEnum.not_ok assert message == strings.UNSPECIFIED_ERROR @@ -46,11 +52,10 @@ def test_get_state_store_status_other_exception(cosmos_client_mock, get_store_ke @patch("core.credentials.get_credential_async") @patch("services.health_checker.ServiceBusClient") -@pytest.mark.asyncio async def test_get_service_bus_status_responding(service_bus_client_mock, get_credential_async) -> None: get_credential_async.return_value = AsyncMock() service_bus_client_mock().get_queue_receiver.__aenter__.return_value = AsyncMock() - status, message = await health_checker.create_service_bus_status() + status, message = await health_checker.create_service_bus_status(get_credential_async) assert status == StatusEnum.ok assert message == "" @@ -58,12 +63,11 @@ async def test_get_service_bus_status_responding(service_bus_client_mock, get_cr @patch("core.credentials.get_credential_async") @patch("services.health_checker.ServiceBusClient") -@pytest.mark.asyncio async def test_get_service_bus_status_not_responding(service_bus_client_mock, get_credential_async) -> None: get_credential_async.return_value = AsyncMock() service_bus_client_mock.return_value = None service_bus_client_mock.side_effect = ServiceBusConnectionError(message="some message") - status, message = await health_checker.create_service_bus_status() + status, message = await health_checker.create_service_bus_status(get_credential_async) assert status == StatusEnum.not_ok assert message == strings.SERVICE_BUS_NOT_RESPONDING @@ -71,55 +75,80 @@ async def test_get_service_bus_status_not_responding(service_bus_client_mock, ge @patch("core.credentials.get_credential_async") @patch("services.health_checker.ServiceBusClient") -@pytest.mark.asyncio async def test_get_service_bus_status_other_exception(service_bus_client_mock, get_credential_async) -> None: get_credential_async.return_value = AsyncMock() service_bus_client_mock.return_value = None service_bus_client_mock.side_effect = Exception() - status, message = await health_checker.create_service_bus_status() + status, message = await health_checker.create_service_bus_status(get_credential_async) assert status == StatusEnum.not_ok assert message == strings.UNSPECIFIED_ERROR +@patch("core.credentials.get_credential_async") @patch("services.health_checker.ComputeManagementClient") -def test_get_resource_processor_status_healthy(resource_processor_client_mock) -> None: - resource_processor_client_mock().virtual_machine_scale_set_vms = MagicMock() +async def test_get_resource_processor_status_healthy(resource_processor_client_mock, get_credential_async) -> None: + get_credential_async.return_value = AsyncMock() + resource_processor_client_mock().virtual_machine_scale_set_vms.return_value = AsyncMock() vm_mock = MagicMock() vm_mock.instance_id = 'mocked_id' - resource_processor_client_mock().virtual_machine_scale_set_vms.list = MagicMock(return_value=[vm_mock]) + resource_processor_client_mock().virtual_machine_scale_set_vms.list.return_value = AsyncIterator([vm_mock]) instance_view_mock = MagicMock() instance_view_mock.vm_health.status.code = strings.RESOURCE_PROCESSOR_HEALTHY_MESSAGE - resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = instance_view_mock + awaited_mock = asyncio.Future() + awaited_mock.set_result(instance_view_mock) + resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = awaited_mock - status, message = health_checker.create_resource_processor_status() + status, message = await health_checker.create_resource_processor_status(get_credential_async) assert status == StatusEnum.ok assert message == "" -@patch("services.health_checker.ComputeManagementClient") -def test_get_resource_processor_status_not_healthy(resource_processor_client_mock) -> None: - resource_processor_client_mock().virtual_machine_scale_set_vms = MagicMock() +@patch("core.credentials.get_credential_async") +@patch("services.health_checker.ComputeManagementClient", return_value=MagicMock()) +async def test_get_resource_processor_status_not_healthy(resource_processor_client_mock, get_credential_async) -> None: + get_credential_async.return_value = AsyncMock() + + resource_processor_client_mock().virtual_machine_scale_set_vms.return_value = AsyncMock() vm_mock = MagicMock() vm_mock.instance_id = 'mocked_id' - resource_processor_client_mock().virtual_machine_scale_set_vms.list = MagicMock(return_value=[vm_mock]) + resource_processor_client_mock().virtual_machine_scale_set_vms.list.return_value = AsyncIterator([vm_mock]) instance_view_mock = MagicMock() instance_view_mock.vm_health.status.code = "Unhealthy" - resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = instance_view_mock - status, message = health_checker.create_resource_processor_status() + awaited_mock = asyncio.Future() + awaited_mock.set_result(instance_view_mock) + resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = awaited_mock + + status, message = await health_checker.create_resource_processor_status(get_credential_async) assert status == StatusEnum.not_ok assert message == strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE +@patch("core.credentials.get_credential_async") @patch("services.health_checker.ComputeManagementClient") -def test_get_resource_processor_status_other_exception(resource_processor_client_mock) -> None: +async def test_get_resource_processor_status_other_exception(resource_processor_client_mock, get_credential_async) -> None: + get_credential_async.return_value = AsyncMock() resource_processor_client_mock.return_value = None resource_processor_client_mock.side_effect = Exception() - status, message = health_checker.create_resource_processor_status() + status, message = await health_checker.create_resource_processor_status(get_credential_async) assert status == StatusEnum.not_ok assert message == strings.UNSPECIFIED_ERROR + + +class AsyncIterator: + def __init__(self, seq): + self.iter = iter(seq) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.iter) + except StopIteration: + raise StopAsyncIteration