From 94b804ab3a6086c9b2188632286f8884774432db Mon Sep 17 00:00:00 2001 From: Mikael Dallaire Cote <110583667+0mdc@users.noreply.github.com> Date: Sat, 6 Apr 2024 18:57:43 -0400 Subject: [PATCH] Improve reliability and exception logging of network loop. --- .../networking/networking_process.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/habitat-hitl/habitat_hitl/_internal/networking/networking_process.py b/habitat-hitl/habitat_hitl/_internal/networking/networking_process.py index fa5f16614a..5e0ae9eb2f 100644 --- a/habitat-hitl/habitat_hitl/_internal/networking/networking_process.py +++ b/habitat-hitl/habitat_hitl/_internal/networking/networking_process.py @@ -9,6 +9,7 @@ import os import signal import ssl +import traceback from datetime import datetime, timedelta from multiprocessing import Process from typing import Dict, List, Optional @@ -377,7 +378,7 @@ async def networking_main_async( ) # Define tasks (concurrent looping coroutines). - tasks = [] + tasks: List[asyncio.Future] = [] tasks.append(asyncio.create_task(network_mgr.check_keyframe_queue())) tasks.append( asyncio.create_task(network_mgr.check_close_broken_connection()) @@ -395,13 +396,34 @@ async def networking_main_async( ] for stop_signal in stop_signals: loop.add_signal_handler(stop_signal, stop.set_result, None) + # Add the stop signal as a task. + tasks.append(stop) # Run tasks. - tasks = asyncio.gather(*tasks) - - # Wait for cancellation (from termination signals). - await stop - print("Networking process terminating.") + abort = False + while tasks: + # Execute tasks until one is done (or fails). + done_tasks, pending = await asyncio.wait( + tasks, return_when=asyncio.FIRST_COMPLETED + ) + for task in done_tasks: + # Print exception for failed tasks. + try: + await task + except Exception as e: + print(f"Exception raised in network process. Aborting: {e}.") + traceback.print_exc() + abort = True + # Abort if exception was raised, or if a termination signal was caught. + if abort or stop.done(): + if stop.done(): + print(f"Caught termination signal: {stop.result}.") + break + # Resume pending tasks. + tasks = pending + + # Terminate network process. + print("Networking process terminating...") # Close servers. websocket_server.close() @@ -410,9 +432,6 @@ async def networking_main_async( if http_runner: await http_runner.cleanup() - # Cancel running tasks. - tasks.cancel() - def networking_main(interprocess_record: InterprocessRecord) -> None: # Set up the event loop and run the main coroutine