Skip to content

Commit

Permalink
Refactor API health endpoint (#2583)
Browse files Browse the repository at this point in the history
* refactor health check

* Fix tests

* bump version
  • Loading branch information
anatbal authored Sep 13, 2022
1 parent afa323f commit 7fe7a89
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 70 deletions.
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.30"
__version__ = "0.4.31"
29 changes: 15 additions & 14 deletions api_app/api/routes/health.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import asyncio
import logging
from fastapi import APIRouter
from core import credentials
from models.schemas.status import HealthCheck, ServiceStatus, StatusEnum
from resources import strings
# from services.health_checker import create_resource_processor_status, create_state_store_status, create_service_bus_status
# import logging
from services.health_checker import create_resource_processor_status, create_state_store_status, create_service_bus_status


router = APIRouter()


@router.get("/health", name=strings.API_GET_HEALTH_STATUS)
@router.get("/", name=strings.API_GET_HEALTH_STATUS)
async def health_check() -> HealthCheck:
# TEMP: #2048
# cosmos_status, cosmos_message = create_state_store_status()
# sb_status, sb_message = await create_service_bus_status()
# rp_status, rp_message = create_resource_processor_status()
# services = [ServiceStatus(service=strings.COSMOS_DB, status=cosmos_status, message=cosmos_message),
# ServiceStatus(service=strings.SERVICE_BUS, status=sb_status, message=sb_message),
# ServiceStatus(service=strings.RESOURCE_PROCESSOR, status=rp_status, message=rp_message)]
# health_check_result = HealthCheck(services=services)
# if cosmos_status == StatusEnum.not_ok or sb_status == StatusEnum.not_ok or rp_status == StatusEnum.not_ok:
# logging.error(f'Cosmos Status: {cosmos_status}, message: {cosmos_message}')
# logging.error(f'Service Bus Status: {sb_status}, message: {sb_message}')
# logging.error(f'Resource Processor Status: {rp_status}, message: {rp_message}')
async with credentials.get_credential_async() as credential:
cosmos, sb, rp = await asyncio.gather(
create_state_store_status(credential),
create_service_bus_status(credential),
create_resource_processor_status(credential)
)
if cosmos[0] == StatusEnum.not_ok or sb[0] == StatusEnum.not_ok or rp[0] == StatusEnum.not_ok:
logging.error(f'Cosmos Status: {cosmos[0]}, message: {cosmos[1]}')
logging.error(f'Service Bus Status: {sb[0]}, message: {sb[1]}')
logging.error(f'Resource Processor Status: {rp[0]}, message: {rp[1]}')

services = [ServiceStatus(service=strings.COSMOS_DB, status=StatusEnum.ok, message=""),
ServiceStatus(service=strings.SERVICE_BUS, status=StatusEnum.ok, message=""),
Expand Down
2 changes: 2 additions & 0 deletions api_app/resources/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
NOT_OK = "Not OK"
COSMOS_DB = "Cosmos DB"
STATE_STORE_ENDPOINT_NOT_RESPONDING = "State Store endpoint is not responding"
STATE_STORE_ENDPOINT_NOT_ACCESSIBLE = "State Store endpoint is not accessible"
UNSPECIFIED_ERROR = "Unspecified error"

# Service bus status
SERVICE_BUS = "Service Bus"
SERVICE_BUS_NOT_RESPONDING = "Service Bus is not responding"
SERVICE_BUS_AUTHENTICATION_ERROR = "Cannot authenticate Service Bus"

# Resource processor status
RESOURCE_PROCESSOR = "Resource Processor"
Expand Down
73 changes: 45 additions & 28 deletions api_app/services/health_checker.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,85 @@
from typing import Tuple
from azure.core import exceptions
from azure.cosmos import CosmosClient
from azure.cosmos.aio import CosmosClient
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.exceptions import ServiceBusConnectionError
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.cosmosdb.aio import CosmosDBManagementClient
from azure.mgmt.compute.aio import ComputeManagementClient
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.servicebus.exceptions import ServiceBusConnectionError, ServiceBusAuthenticationError

from api.dependencies.database import get_store_key
from core import config, credentials
from core import config
from models.schemas.status import StatusEnum
from resources import strings


def create_state_store_status() -> (StatusEnum, str):
async def get_store_key(credential) -> str:
if config.STATE_STORE_KEY:
primary_master_key = config.STATE_STORE_KEY
else:
async with CosmosDBManagementClient(credential, subscription_id=config.SUBSCRIPTION_ID) as cosmosdb_mng_client:
database_keys = await cosmosdb_mng_client.database_accounts.list_keys(resource_group_name=config.RESOURCE_GROUP_NAME, account_name=config.COSMOSDB_ACCOUNT_NAME)
primary_master_key = database_keys.primary_master_key
return primary_master_key


async def create_state_store_status(credential) -> Tuple[StatusEnum, str]:
status = StatusEnum.ok
message = ""
debug = True if config.DEBUG == "true" else False
try:
primary_master_key = get_store_key()

primary_master_key = await get_store_key(credential)
cosmos_client = CosmosClient(config.STATE_STORE_ENDPOINT, primary_master_key, connection_verify=debug)
list(cosmos_client.list_databases())
async with cosmos_client:
list_databases_response = cosmos_client.list_databases()
[database async for database in list_databases_response]
except exceptions.ServiceRequestError:
status = StatusEnum.not_ok
message = strings.STATE_STORE_ENDPOINT_NOT_RESPONDING
except CosmosHttpResponseError:
status = StatusEnum.not_ok
message = strings.STATE_STORE_ENDPOINT_NOT_ACCESSIBLE
except: # noqa: E722 flake8 - no bare excepts
status = StatusEnum.not_ok
message = strings.UNSPECIFIED_ERROR
return status, message


async def create_service_bus_status() -> (StatusEnum, str):
async def create_service_bus_status(credential) -> Tuple[StatusEnum, str]:
status = StatusEnum.ok
message = ""
try:
async with credentials.get_credential_async() as credential:
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential,
retry_total=0)
async with service_bus_client:
receiver = service_bus_client.get_queue_receiver(
queue_name=config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE)
async with receiver:
pass
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential, retry_total=0)
async with service_bus_client:
receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE)
async with receiver:
pass
except ServiceBusConnectionError:
status = StatusEnum.not_ok
message = strings.SERVICE_BUS_NOT_RESPONDING
except ServiceBusAuthenticationError:
status = StatusEnum.not_ok
message = strings.SERVICE_BUS_AUTHENTICATION_ERROR
except: # noqa: E722 flake8 - no bare excepts
status = StatusEnum.not_ok
message = strings.UNSPECIFIED_ERROR
return status, message


def create_resource_processor_status() -> (StatusEnum, str):
async def create_resource_processor_status(credential) -> Tuple[StatusEnum, str]:
status = StatusEnum.ok
message = ""
try:
vmss_name = f"vmss-rp-porter-{config.TRE_ID}"
compute_client = ComputeManagementClient(credential=credentials.get_credential(), subscription_id=config.SUBSCRIPTION_ID)
vmss_list = compute_client.virtual_machine_scale_set_vms.list(config.RESOURCE_GROUP_NAME, vmss_name)
for vm in vmss_list:
instance_view = compute_client.virtual_machine_scale_set_vms.get_instance_view(config.RESOURCE_GROUP_NAME, vmss_name, vm.instance_id)
health_status = instance_view.vm_health.status.code
if health_status != strings.RESOURCE_PROCESSOR_HEALTHY_MESSAGE:
status = StatusEnum.not_ok
message = strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE
except: # noqa: E722 flake8 - no bare excepts
compute_client = ComputeManagementClient(credential=credential, subscription_id=config.SUBSCRIPTION_ID)
async with compute_client:
vmss_list = compute_client.virtual_machine_scale_set_vms.list(config.RESOURCE_GROUP_NAME, vmss_name)
async for vm in vmss_list:
instance_view = await compute_client.virtual_machine_scale_set_vms.get_instance_view(config.RESOURCE_GROUP_NAME, vmss_name, vm.instance_id)
health_status = instance_view.vm_health.status.code
if health_status != strings.RESOURCE_PROCESSOR_HEALTHY_MESSAGE:
status = StatusEnum.not_ok
message = strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE
except: # noqa: E722 flake8 - no bare excepts
status = StatusEnum.not_ok
message = strings.UNSPECIFIED_ERROR
return status, message
83 changes: 56 additions & 27 deletions api_app/tests_ma/test_services/test_health_checker.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,154 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
from azure.core.exceptions import ServiceRequestError
from azure.servicebus.exceptions import ServiceBusConnectionError
from mock import patch

from models.schemas.status import StatusEnum
from resources import strings
from services import health_checker

pytestmark = pytest.mark.asyncio


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.get_store_key")
@patch("services.health_checker.CosmosClient")
def test_get_state_store_status_responding(cosmos_client_mock, get_store_key_mock) -> None:
async def test_get_state_store_status_responding(_, get_store_key_mock, get_credential_async) -> None:
get_store_key_mock.return_value = None
cosmos_client_mock().list_databases.return_value = []
status, message = health_checker.create_state_store_status()
status, message = await health_checker.create_state_store_status(get_credential_async)

assert status == StatusEnum.ok
assert message == ""


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.get_store_key")
@patch("services.health_checker.CosmosClient")
def test_get_state_store_status_not_responding(cosmos_client_mock, get_store_key_mock) -> None:
async def test_get_state_store_status_not_responding(cosmos_client_mock, get_store_key_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
get_store_key_mock.return_value = None
cosmos_client_mock.return_value = None
cosmos_client_mock.side_effect = ServiceRequestError(message="some message")
status, message = health_checker.create_state_store_status()
status, message = await health_checker.create_state_store_status(get_credential_async)

assert status == StatusEnum.not_ok
assert message == strings.STATE_STORE_ENDPOINT_NOT_RESPONDING


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.get_store_key")
@patch("services.health_checker.CosmosClient")
def test_get_state_store_status_other_exception(cosmos_client_mock, get_store_key_mock) -> None:
async def test_get_state_store_status_other_exception(cosmos_client_mock, get_store_key_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
get_store_key_mock.return_value = None
cosmos_client_mock.return_value = None
cosmos_client_mock.side_effect = Exception()
status, message = health_checker.create_state_store_status()
status, message = await health_checker.create_state_store_status(get_credential_async)

assert status == StatusEnum.not_ok
assert message == strings.UNSPECIFIED_ERROR


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.ServiceBusClient")
@pytest.mark.asyncio
async def test_get_service_bus_status_responding(service_bus_client_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
service_bus_client_mock().get_queue_receiver.__aenter__.return_value = AsyncMock()
status, message = await health_checker.create_service_bus_status()
status, message = await health_checker.create_service_bus_status(get_credential_async)

assert status == StatusEnum.ok
assert message == ""


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.ServiceBusClient")
@pytest.mark.asyncio
async def test_get_service_bus_status_not_responding(service_bus_client_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
service_bus_client_mock.return_value = None
service_bus_client_mock.side_effect = ServiceBusConnectionError(message="some message")
status, message = await health_checker.create_service_bus_status()
status, message = await health_checker.create_service_bus_status(get_credential_async)

assert status == StatusEnum.not_ok
assert message == strings.SERVICE_BUS_NOT_RESPONDING


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.ServiceBusClient")
@pytest.mark.asyncio
async def test_get_service_bus_status_other_exception(service_bus_client_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
service_bus_client_mock.return_value = None
service_bus_client_mock.side_effect = Exception()
status, message = await health_checker.create_service_bus_status()
status, message = await health_checker.create_service_bus_status(get_credential_async)

assert status == StatusEnum.not_ok
assert message == strings.UNSPECIFIED_ERROR


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.ComputeManagementClient")
def test_get_resource_processor_status_healthy(resource_processor_client_mock) -> None:
resource_processor_client_mock().virtual_machine_scale_set_vms = MagicMock()
async def test_get_resource_processor_status_healthy(resource_processor_client_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
resource_processor_client_mock().virtual_machine_scale_set_vms.return_value = AsyncMock()
vm_mock = MagicMock()
vm_mock.instance_id = 'mocked_id'
resource_processor_client_mock().virtual_machine_scale_set_vms.list = MagicMock(return_value=[vm_mock])
resource_processor_client_mock().virtual_machine_scale_set_vms.list.return_value = AsyncIterator([vm_mock])

instance_view_mock = MagicMock()
instance_view_mock.vm_health.status.code = strings.RESOURCE_PROCESSOR_HEALTHY_MESSAGE
resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = instance_view_mock
awaited_mock = asyncio.Future()
awaited_mock.set_result(instance_view_mock)
resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = awaited_mock

status, message = health_checker.create_resource_processor_status()
status, message = await health_checker.create_resource_processor_status(get_credential_async)

assert status == StatusEnum.ok
assert message == ""


@patch("services.health_checker.ComputeManagementClient")
def test_get_resource_processor_status_not_healthy(resource_processor_client_mock) -> None:
resource_processor_client_mock().virtual_machine_scale_set_vms = MagicMock()
@patch("core.credentials.get_credential_async")
@patch("services.health_checker.ComputeManagementClient", return_value=MagicMock())
async def test_get_resource_processor_status_not_healthy(resource_processor_client_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()

resource_processor_client_mock().virtual_machine_scale_set_vms.return_value = AsyncMock()
vm_mock = MagicMock()
vm_mock.instance_id = 'mocked_id'
resource_processor_client_mock().virtual_machine_scale_set_vms.list = MagicMock(return_value=[vm_mock])
resource_processor_client_mock().virtual_machine_scale_set_vms.list.return_value = AsyncIterator([vm_mock])

instance_view_mock = MagicMock()
instance_view_mock.vm_health.status.code = "Unhealthy"
resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = instance_view_mock
status, message = health_checker.create_resource_processor_status()
awaited_mock = asyncio.Future()
awaited_mock.set_result(instance_view_mock)
resource_processor_client_mock().virtual_machine_scale_set_vms.get_instance_view.return_value = awaited_mock

status, message = await health_checker.create_resource_processor_status(get_credential_async)

assert status == StatusEnum.not_ok
assert message == strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE


@patch("core.credentials.get_credential_async")
@patch("services.health_checker.ComputeManagementClient")
def test_get_resource_processor_status_other_exception(resource_processor_client_mock) -> None:
async def test_get_resource_processor_status_other_exception(resource_processor_client_mock, get_credential_async) -> None:
get_credential_async.return_value = AsyncMock()
resource_processor_client_mock.return_value = None
resource_processor_client_mock.side_effect = Exception()
status, message = health_checker.create_resource_processor_status()
status, message = await health_checker.create_resource_processor_status(get_credential_async)

assert status == StatusEnum.not_ok
assert message == strings.UNSPECIFIED_ERROR


class AsyncIterator:
def __init__(self, seq):
self.iter = iter(seq)

def __aiter__(self):
return self

async def __anext__(self):
try:
return next(self.iter)
except StopIteration:
raise StopAsyncIteration

0 comments on commit 7fe7a89

Please sign in to comment.