diff --git a/asterisk_bridge.py b/asterisk_bridge.py index 9a20df0..4e31dcb 100644 --- a/asterisk_bridge.py +++ b/asterisk_bridge.py @@ -278,7 +278,7 @@ async def _handle_stasis_start(self, event): asyncio.create_task(self._start_connector(original_channel_id)) except Exception as e: logger.error(f"Failed to start connector for channel {original_channel_id}: {e}") - self.close_channel(original_channel_id) + await self.close_channel(original_channel_id) # Return control of original channel to dialplan await self._ari_request( 'POST', @@ -378,12 +378,12 @@ async def _handle_channel_hangup(self, event): return original_channel_id = None for id, values in self.channels.items(): - if values.get('snoop_channel') == channel_id: + if values.get('snoop_channel_in') == channel_id or values.get('snoop_channel_out') == channel_id: # Snoop channel hangup logger.debug(f"Snoop channel {channel_id} hangup") original_channel_id = id break - if values.get('external_media_channel') == channel_id: + if values.get('external_media_channel_in') == channel_id or values.get('external_media_channel_out') == channel_id: # External media channel hangup logger.debug(f"External media channel {channel_id} hangup") original_channel_id = id diff --git a/deepgram_connector.py b/deepgram_connector.py index 5d84b1a..62bb0d3 100644 --- a/deepgram_connector.py +++ b/deepgram_connector.py @@ -50,6 +50,8 @@ def __init__(self, deepgram_api_key, rtp_stream_in, rtp_stream_out, mqtt_client, self.dg_connection = None self.loop = None self.complete_call = [] + self._close_started = False + self._close_lock = asyncio.Lock() async def start(self): deepgram: DeepgramClient = DeepgramClient(self.deepgram_api_key) @@ -140,17 +142,19 @@ def on_error(self, client, error, **kwargs): Handle error events """ logger.error(f"Error received: {error}") + if self._close_started: + return + # Use the stored event loop to schedule the task try: - if hasattr(self, 'loop') and self.loop.is_running(): + if self.loop and self.loop.is_running(): self.loop.call_soon_threadsafe( lambda: asyncio.create_task(self.close()) ) else: - logger.warning("No running event loop available, using default scheduling") - asyncio.run_coroutine_threadsafe(self.close(), asyncio.get_event_loop()) + logger.warning("No running event loop available; cannot schedule close") except Exception as e: - logger.error(f"Failed to schedule close operation: {e}") + logger.error(f"Failed to schedule close operation: {e}") async def read_audio_from_rtp(self): """ @@ -225,12 +229,35 @@ async def close(self): """ Close the connection to Deepgram """ - logger.debug(f"Closing Deepgram connection for {self.uniqueid}") - self.connected = False - await self.dg_connection.finalize() - await self.dg_connection._socket.close() - self.read_audio_from_rtp_task.cancel() - self.send_audio_to_deepgram_task.cancel() + async with self._close_lock: + if self._close_started: + return + self._close_started = True + + logger.debug(f"Closing Deepgram connection for {self.uniqueid}") + self.connected = False + + # Cancel background tasks + if self.read_audio_from_rtp_task is not None: + self.read_audio_from_rtp_task.cancel() + if self.send_audio_to_deepgram_task is not None: + self.send_audio_to_deepgram_task.cancel() + + # Close Deepgram connection/socket + if self.dg_connection is not None: + try: + await self.dg_connection.finalize() + except Exception as e: + logger.debug(f"Deepgram finalize failed for {self.uniqueid}: {e}") + + # Best-effort close of the underlying socket if finalize doesn't do it + try: + if self.dg_connection is not None: + socket = getattr(self.dg_connection, "_socket", None) + if socket is not None: + await socket.close() + except Exception as e: + logger.debug(f"Deepgram socket close failed for {self.uniqueid}: {e}") # publish full conversation to mqtt text = "" last_speaker = None