Skip to content

Commit

Permalink
Merge pull request #78 from jeremiah-k/main
Browse files Browse the repository at this point in the history
Wrap the coroutine in a task using asyncio.create_task() before passing it to asyncio.wait()
  • Loading branch information
jeremiah-k authored Oct 2, 2024
2 parents 2d4da91 + 28efef2 commit d4b015e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
16 changes: 12 additions & 4 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ async def shutdown():
meshtastic_utils.shutting_down = True # Set the shutting_down flag
shutdown_event.set()

loop = asyncio.get_running_loop()

# Handle signals differently based on the platform
if sys.platform != "win32":
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown()))
else:
Expand All @@ -102,8 +103,9 @@ async def shutdown():
sync_task = asyncio.create_task(
matrix_client.sync_forever(timeout=30000)
)
await asyncio.wait(
[sync_task, shutdown_event.wait()],
shutdown_task = asyncio.create_task(shutdown_event.wait())
done, pending = await asyncio.wait(
[sync_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
if shutdown_event.is_set():
Expand Down Expand Up @@ -131,8 +133,14 @@ async def shutdown():
meshtastic_utils.meshtastic_client.close()
except Exception as e:
meshtastic_logger.warning(f"Error closing Meshtastic client: {e}")

# Cancel the reconnect task if it exists
if meshtastic_utils.reconnect_task:
meshtastic_utils.reconnect_task.cancel()
meshtastic_logger.info("Cancelled Meshtastic reconnect task.")

# Cancel any remaining tasks
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
for task in tasks:
task.cancel()
try:
Expand Down
43 changes: 24 additions & 19 deletions meshtastic_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

reconnecting = False
shutting_down = False
reconnect_task = None # To keep track of the reconnect task

def connect_meshtastic(force_connect=False):
"""
Expand Down Expand Up @@ -123,7 +124,7 @@ def on_lost_meshtastic_connection(interface=None):
"""
Callback function invoked when the Meshtastic connection is lost.
"""
global meshtastic_client, reconnecting, shutting_down, event_loop
global meshtastic_client, reconnecting, shutting_down, event_loop, reconnect_task
with meshtastic_lock:
if shutting_down:
logger.info("Shutdown in progress. Not attempting to reconnect.")
Expand All @@ -148,31 +149,35 @@ def on_lost_meshtastic_connection(interface=None):
meshtastic_client = None

if event_loop:
asyncio.run_coroutine_threadsafe(reconnect(), event_loop)
reconnect_task = asyncio.run_coroutine_threadsafe(reconnect(), event_loop)

async def reconnect():
"""
Asynchronously attempts to reconnect to the Meshtastic device with exponential backoff.
"""
global meshtastic_client, reconnecting, shutting_down
backoff_time = 10
while not shutting_down:
try:
logger.info(f"Reconnection attempt starting in {backoff_time} seconds...")
await asyncio.sleep(backoff_time)
if shutting_down:
logger.info("Shutdown in progress. Aborting reconnection attempts.")
break
meshtastic_client = connect_meshtastic(force_connect=True)
if meshtastic_client:
logger.info("Reconnected successfully.")
break
except Exception as e:
logger.error(f"Reconnection attempt failed: {e}")
backoff_time = min(backoff_time * 2, 300) # Cap backoff at 5 minutes
else:
logger.info("Reconnection attempts aborted due to shutdown.")
reconnecting = False
try:
while not shutting_down:
try:
logger.info(f"Reconnection attempt starting in {backoff_time} seconds...")
await asyncio.sleep(backoff_time)
if shutting_down:
logger.info("Shutdown in progress. Aborting reconnection attempts.")
break
meshtastic_client = connect_meshtastic(force_connect=True)
if meshtastic_client:
logger.info("Reconnected successfully.")
break
except Exception as e:
if shutting_down:
break
logger.error(f"Reconnection attempt failed: {e}")
backoff_time = min(backoff_time * 2, 300) # Cap backoff at 5 minutes
except asyncio.CancelledError:
logger.info("Reconnection task was cancelled.")
finally:
reconnecting = False

def on_meshtastic_message(packet, interface):
"""
Expand Down

0 comments on commit d4b015e

Please sign in to comment.