Skip to content

Commit

Permalink
Set liveliness status on fatal ledger error
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Costanzo <ian@anon-solutions.ca>
  • Loading branch information
ianco committed Jul 31, 2020
1 parent 31f1c87 commit 54ce8be
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 10 deletions.
20 changes: 17 additions & 3 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class AdminStatusLivelinessSchema(Schema):


class AdminStatusReadinessSchema(Schema):
"""Schema for the liveliness endpoint."""
"""Schema for the readiness endpoint."""

ready = fields.Boolean(description="Readiness status", example=True)

Expand Down Expand Up @@ -295,6 +295,11 @@ async def collect_stats(request, handler):
app=app, title=agent_label, version=version_string, swagger_path="/api/doc"
)
app.on_startup.append(self.on_startup)

# ensure we always have status values
app._state["ready"] = False
app._state["alive"] = False

return app

async def start(self) -> None:
Expand Down Expand Up @@ -329,6 +334,7 @@ async def start(self) -> None:
try:
await self.site.start()
self.app._state["ready"] = True
self.app._state["alive"] = True
except OSError:
raise AdminSetupError(
"Unable to start webserver with host "
Expand All @@ -338,6 +344,7 @@ async def start(self) -> None:
async def stop(self) -> None:
"""Stop the webserver."""
self.app._state["ready"] = False # in case call does not come through OpenAPI
self.app._state["alive"] = False
for queue in self.websocket_queues.values():
queue.stop()
if self.site:
Expand Down Expand Up @@ -429,7 +436,8 @@ async def liveliness_handler(self, request: web.BaseRequest):
The web response, always indicating True
"""
return web.json_response({"alive": True})
app_live = self.app._state["alive"]
return web.json_response({"alive": app_live})

@docs(tags=["server"], summary="Readiness check")
@response_schema(AdminStatusReadinessSchema(), 200)
Expand All @@ -444,7 +452,8 @@ async def readiness_handler(self, request: web.BaseRequest):
The web response, indicating readiness for further calls
"""
return web.json_response({"ready": self.app._state["ready"]})
app_ready = self.app._state["ready"] and self.app._state["alive"]
return web.json_response({"ready": app_ready})

@docs(tags=["server"], summary="Shut down server")
async def shutdown_handler(self, request: web.BaseRequest):
Expand All @@ -464,6 +473,11 @@ async def shutdown_handler(self, request: web.BaseRequest):

return web.json_response({})

def notify_fatal_error(self):
"""Set our readiness flags to force a restart (openshift)."""
self.app._state["ready"] = False
self.app._state["alive"] = False

async def websocket_handler(self, request):
"""Send notifications to admin client over websocket."""

Expand Down
37 changes: 37 additions & 0 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,40 @@ async def test_visit_shutting_down(self):
) as response:
assert response.status == 200
await server.stop()

async def test_server_health_state(self):
settings = {
"admin.admin_insecure_mode": True,
}
server = self.get_admin_server(settings)
await server.start()

async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/live", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert response_json["alive"]

async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/ready", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert response_json["ready"]

server.notify_fatal_error()
async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/live", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert not response_json["alive"]

async with self.client_session.get(
f"http://127.0.0.1:{self.port}/status/ready", headers={}
) as response:
assert response.status == 200
response_json = await response.json()
assert not response_json["ready"]
await server.stop()
26 changes: 19 additions & 7 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ..config.ledger import ledger_config
from ..config.logging import LoggingConfigurator
from ..config.wallet import wallet_config, BaseWallet
from ..ledger.error import LedgerConfigError
from ..messaging.responder import BaseResponder
from ..protocols.connections.v1_0.manager import (
ConnectionManager,
Expand Down Expand Up @@ -255,12 +256,16 @@ def inbound_message_router(
# Note: at this point we could send the message to a shared queue
# if this pod is too busy to process it

self.dispatcher.queue_message(
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
try:
self.dispatcher.queue_message(
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
except LedgerConfigError:
self.admin_server.notify_fatal_error()
raise

def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
"""Handle completion of message dispatch."""
Expand Down Expand Up @@ -314,7 +319,11 @@ async def outbound_message_router(

def handle_not_returned(self, context: InjectionContext, outbound: OutboundMessage):
"""Handle a message that failed delivery via an inbound session."""
self.dispatcher.run_task(self.queue_outbound(context, outbound))
try:
self.dispatcher.run_task(self.queue_outbound(context, outbound))
except LedgerConfigError:
self.admin_server.notify_fatal_error()
raise

async def queue_outbound(
self,
Expand All @@ -341,6 +350,9 @@ async def queue_outbound(
except ConnectionManagerError:
LOGGER.exception("Error preparing outbound message for transmission")
return
except LedgerConfigError:
self.admin_server.notify_fatal_error()
raise

try:
self.outbound_transport_manager.enqueue_message(context, outbound)
Expand Down

0 comments on commit 54ce8be

Please sign in to comment.