Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions asterisk_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def _handle_stasis_start(self, event):
asyncio.create_task(self._start_connector(original_channel_id))
except Exception as e:
logger.error(f"Failed to start connector for channel {original_channel_id}: {e}")
self.close_channel(original_channel_id)
await self.close_channel(original_channel_id)
# Return control of original channel to dialplan
await self._ari_request(
'POST',
Expand Down Expand Up @@ -378,12 +378,12 @@ async def _handle_channel_hangup(self, event):
return
original_channel_id = None
for id, values in self.channels.items():
if values.get('snoop_channel') == channel_id:
if values.get('snoop_channel_in') == channel_id or values.get('snoop_channel_out') == channel_id:
# Snoop channel hangup
logger.debug(f"Snoop channel {channel_id} hangup")
original_channel_id = id
break
if values.get('external_media_channel') == channel_id:
if values.get('external_media_channel_in') == channel_id or values.get('external_media_channel_out') == channel_id:
# External media channel hangup
logger.debug(f"External media channel {channel_id} hangup")
original_channel_id = id
Expand Down
47 changes: 37 additions & 10 deletions deepgram_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, deepgram_api_key, rtp_stream_in, rtp_stream_out, mqtt_client,
self.dg_connection = None
self.loop = None
self.complete_call = []
self._close_started = False
self._close_lock = asyncio.Lock()

async def start(self):
deepgram: DeepgramClient = DeepgramClient(self.deepgram_api_key)
Expand Down Expand Up @@ -140,17 +142,19 @@ def on_error(self, client, error, **kwargs):
Handle error events
"""
logger.error(f"Error received: {error}")
if self._close_started:
return

# Use the stored event loop to schedule the task
try:
if hasattr(self, 'loop') and self.loop.is_running():
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(
lambda: asyncio.create_task(self.close())
)
else:
logger.warning("No running event loop available, using default scheduling")
asyncio.run_coroutine_threadsafe(self.close(), asyncio.get_event_loop())
logger.warning("No running event loop available; cannot schedule close")
except Exception as e:
logger.error(f"Failed to schedule close operation: {e}")
logger.error(f"Failed to schedule close operation: {e}")

async def read_audio_from_rtp(self):
"""
Expand Down Expand Up @@ -225,12 +229,35 @@ async def close(self):
"""
Close the connection to Deepgram
"""
logger.debug(f"Closing Deepgram connection for {self.uniqueid}")
self.connected = False
await self.dg_connection.finalize()
await self.dg_connection._socket.close()
self.read_audio_from_rtp_task.cancel()
self.send_audio_to_deepgram_task.cancel()
async with self._close_lock:
if self._close_started:
return
self._close_started = True

logger.debug(f"Closing Deepgram connection for {self.uniqueid}")
self.connected = False

# Cancel background tasks
if self.read_audio_from_rtp_task is not None:
self.read_audio_from_rtp_task.cancel()
if self.send_audio_to_deepgram_task is not None:
self.send_audio_to_deepgram_task.cancel()

# Close Deepgram connection/socket
if self.dg_connection is not None:
try:
await self.dg_connection.finalize()
except Exception as e:
logger.debug(f"Deepgram finalize failed for {self.uniqueid}: {e}")

# Best-effort close of the underlying socket if finalize doesn't do it
try:
if self.dg_connection is not None:
socket = getattr(self.dg_connection, "_socket", None)
if socket is not None:
await socket.close()
except Exception as e:
logger.debug(f"Deepgram socket close failed for {self.uniqueid}: {e}")
# publish full conversation to mqtt
text = ""
last_speaker = None
Expand Down
Loading