Skip to content

Commit

Permalink
Improve reliability and exception logging of network loop. (facebookr…
Browse files Browse the repository at this point in the history
  • Loading branch information
0mdc authored Apr 9, 2024
1 parent 5b442fe commit 89da15e
Showing 1 changed file with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import signal
import ssl
import traceback
from datetime import datetime, timedelta
from multiprocessing import Process
from typing import Dict, List, Optional
Expand Down Expand Up @@ -377,7 +378,7 @@ async def networking_main_async(
)

# Define tasks (concurrent looping coroutines).
tasks = []
tasks: List[asyncio.Future] = []
tasks.append(asyncio.create_task(network_mgr.check_keyframe_queue()))
tasks.append(
asyncio.create_task(network_mgr.check_close_broken_connection())
Expand All @@ -395,13 +396,34 @@ async def networking_main_async(
]
for stop_signal in stop_signals:
loop.add_signal_handler(stop_signal, stop.set_result, None)
# Add the stop signal as a task.
tasks.append(stop)

# Run tasks.
tasks = asyncio.gather(*tasks)

# Wait for cancellation (from termination signals).
await stop
print("Networking process terminating.")
abort = False
while tasks:
# Execute tasks until one is done (or fails).
done_tasks, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done_tasks:
# Print exception for failed tasks.
try:
await task
except Exception as e:
print(f"Exception raised in network process. Aborting: {e}.")
traceback.print_exc()
abort = True
# Abort if exception was raised, or if a termination signal was caught.
if abort or stop.done():
if stop.done():
print(f"Caught termination signal: {stop.result}.")
break
# Resume pending tasks.
tasks = pending

# Terminate network process.
print("Networking process terminating...")

# Close servers.
websocket_server.close()
Expand All @@ -410,9 +432,6 @@ async def networking_main_async(
if http_runner:
await http_runner.cleanup()

# Cancel running tasks.
tasks.cancel()


def networking_main(interprocess_record: InterprocessRecord) -> None:
# Set up the event loop and run the main coroutine
Expand Down

0 comments on commit 89da15e

Please sign in to comment.