Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set liveliness status on fatal ledger error #643

Merged
merged 6 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from ..config.injection_context import InjectionContext
from ..core.plugin_registry import PluginRegistry
from ..ledger.error import LedgerConfigError, LedgerTransactionError
from ..messaging.responder import BaseResponder
from ..transport.queue.basic import BasicMessageQueue
from ..transport.outbound.message import OutboundMessage
Expand Down Expand Up @@ -51,7 +52,7 @@ class AdminStatusLivelinessSchema(Schema):


class AdminStatusReadinessSchema(Schema):
"""Schema for the liveliness endpoint."""
"""Schema for the readiness endpoint."""

ready = fields.Boolean(description="Readiness status", example=True)

Expand Down Expand Up @@ -131,7 +132,24 @@ async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
"/status/live",
"/status/ready",
) or request.app._state.get("ready"):
return await handler(request)
try:
return await handler(request)
except LedgerConfigError:
# fatal, signal server shutdown
LOGGER.error(f"Shutdown with LedgerConfigError")
request.app._state["ready"] = False
request.app._state["alive"] = False
raise
except LedgerTransactionError:
# fatal, signal server shutdown
LOGGER.error(f"Shutdown with LedgerTransactionError")
request.app._state["ready"] = False
request.app._state["alive"] = False
raise
except Exception as e:
# some other error?
LOGGER.error("Handler error with exception: " + str(e))
raise e

raise web.HTTPServiceUnavailable(reason="Shutdown in progress")

Expand Down Expand Up @@ -295,6 +313,11 @@ async def collect_stats(request, handler):
app=app, title=agent_label, version=version_string, swagger_path="/api/doc"
)
app.on_startup.append(self.on_startup)

# ensure we always have status values
app._state["ready"] = False
app._state["alive"] = False

return app

async def start(self) -> None:
Expand Down Expand Up @@ -329,6 +352,7 @@ async def start(self) -> None:
try:
await self.site.start()
self.app._state["ready"] = True
self.app._state["alive"] = True
except OSError:
raise AdminSetupError(
"Unable to start webserver with host "
Expand All @@ -338,6 +362,7 @@ async def start(self) -> None:
async def stop(self) -> None:
"""Stop the webserver."""
self.app._state["ready"] = False # in case call does not come through OpenAPI
self.app._state["alive"] = False
for queue in self.websocket_queues.values():
queue.stop()
if self.site:
Expand Down Expand Up @@ -429,7 +454,8 @@ async def liveliness_handler(self, request: web.BaseRequest):
The web response, always indicating True

"""
return web.json_response({"alive": True})
app_live = self.app._state["alive"]
return web.json_response({"alive": app_live})

@docs(tags=["server"], summary="Readiness check")
@response_schema(AdminStatusReadinessSchema(), 200)
Expand All @@ -444,7 +470,8 @@ async def readiness_handler(self, request: web.BaseRequest):
The web response, indicating readiness for further calls

"""
return web.json_response({"ready": self.app._state["ready"]})
app_ready = self.app._state["ready"] and self.app._state["alive"]
return web.json_response({"ready": app_ready})

@docs(tags=["server"], summary="Shut down server")
async def shutdown_handler(self, request: web.BaseRequest):
Expand All @@ -464,6 +491,12 @@ async def shutdown_handler(self, request: web.BaseRequest):

return web.json_response({})

def notify_fatal_error(self):
"""Set our readiness flags to force a restart (openshift)."""
LOGGER.error("Received shutdown request notify_fatal_error()")
self.app._state["ready"] = False
self.app._state["alive"] = False

async def websocket_handler(self, request):
"""Send notifications to admin client over websocket."""

Expand Down
37 changes: 37 additions & 0 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,40 @@ async def test_visit_shutting_down(self):
) as response:
assert response.status == 200
await server.stop()

async def test_server_health_state(self):
settings = {
"admin.admin_insecure_mode": True,
}
server = self.get_admin_server(settings)
await server.start()

async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/live", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert response_json["alive"]

async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/ready", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert response_json["ready"]

server.notify_fatal_error()
async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/live", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert not response_json["alive"]

async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/ready", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert not response_json["ready"]
await server.stop()
35 changes: 28 additions & 7 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ..config.ledger import ledger_config
from ..config.logging import LoggingConfigurator
from ..config.wallet import wallet_config, BaseWallet
from ..ledger.error import LedgerConfigError, LedgerTransactionError
from ..messaging.responder import BaseResponder
from ..protocols.connections.v1_0.manager import (
ConnectionManager,
Expand Down Expand Up @@ -255,12 +256,19 @@ def inbound_message_router(
# Note: at this point we could send the message to a shared queue
# if this pod is too busy to process it

self.dispatcher.queue_message(
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
try:
self.dispatcher.queue_message(
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
except LedgerConfigError:
self.admin_server.notify_fatal_error()
raise
except LedgerTransactionError:
self.admin_server.notify_fatal_error()
raise

def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
"""Handle completion of message dispatch."""
Expand Down Expand Up @@ -314,7 +322,14 @@ async def outbound_message_router(

def handle_not_returned(self, context: InjectionContext, outbound: OutboundMessage):
"""Handle a message that failed delivery via an inbound session."""
self.dispatcher.run_task(self.queue_outbound(context, outbound))
try:
self.dispatcher.run_task(self.queue_outbound(context, outbound))
except LedgerConfigError:
self.admin_server.notify_fatal_error()
raise
except LedgerTransactionError:
self.admin_server.notify_fatal_error()
raise

async def queue_outbound(
self,
Expand All @@ -341,6 +356,12 @@ async def queue_outbound(
except ConnectionManagerError:
LOGGER.exception("Error preparing outbound message for transmission")
return
except LedgerConfigError:
self.admin_server.notify_fatal_error()
raise
except LedgerTransactionError:
self.admin_server.notify_fatal_error()
raise

try:
self.outbound_transport_manager.enqueue_message(context, outbound)
Expand Down