diff --git a/.gitignore b/.gitignore index 05330721..8e4929ee 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ venv/ venv.bak/ .vscode/ .DS_Store +Pipfile +Pipfile.lock # python artifacts __pycache__ @@ -18,3 +20,4 @@ dist/ # build build/ poetry.lock + diff --git a/README.md b/README.md index 138db147..6e8cf89e 100644 --- a/README.md +++ b/README.md @@ -175,19 +175,26 @@ Before running any of these examples, then you need to take a look at the README pip install -r examples/requirements-examples.txt ``` -Text to Speech: +To run each example set the `DEEPGRAM_API_KEY` as an environment variable, then `cd` into each example folder and execute the example with: `python main.py` or `python3 main.py`. + +### Agent + +- Simple - [examples/agent/simple](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/agent/simple/main.py) +- Async Simple - [examples/agent/async_simple](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/agent/async_simple/main.py) + +### Text to Speech - Asynchronous - [examples/text-to-speech](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/text-to-speech/rest/file/async_hello_world/main.py) - Synchronous - [examples/text-to-speech](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/text-to-speech/rest/file/hello_world/main.py) -Analyze Text: +### Analyze Text - Intent Recognition - [examples/analyze/intent](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/analyze/intent/main.py) - Sentiment Analysis - [examples/sentiment/intent](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/analyze/sentiment/main.py) - Summarization - [examples/analyze/intent](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/analyze/summary/main.py) - Topic Detection - [examples/analyze/intent](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/analyze/topic/main.py) -PreRecorded Audio: +### PreRecorded Audio - Transcription From an Audio File - [examples/prerecorded/file](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/rest/file/main.py) - Transcription From an URL - [examples/prerecorded/url](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/rest/url/main.py) @@ -196,7 +203,7 @@ PreRecorded Audio: - Summarization - [examples/speech-to-text/rest/summary](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/rest/summary/main.py) - Topic Detection - [examples/speech-to-text/rest/topic](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/rest/topic/main.py) -Live Audio Transcription: +### Live Audio Transcription - From a Microphone - [examples/streaming/microphone](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/rest/stream_file/main.py) - From an HTTP Endpoint - [examples/streaming/http](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/rest/async_url/main.py) @@ -211,8 +218,6 @@ Management API exercise the full [CRUD](https://en.wikipedia.org/wiki/Create,_re - Scopes - [examples/manage/scopes](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/manage/scopes/main.py) - Usage - [examples/manage/usage](https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/manage/usage/main.py) -To run each example set the `DEEPGRAM_API_KEY` as an environment variable, then `cd` into each example folder and execute the example: `go run main.py`. - ## Logging This SDK provides logging as a means to troubleshoot and debug issues encountered. By default, this SDK will enable `Information` level messages and higher (ie `Warning`, `Error`, etc) when you initialize the library as follows: diff --git a/deepgram/__init__.py b/deepgram/__init__.py index 1d7568fe..3873d7fa 100644 --- a/deepgram/__init__.py +++ b/deepgram/__init__.py @@ -34,7 +34,7 @@ from .errors import DeepgramApiKeyError # listen/read client -from .client import Listen, Read +from .client import ListenRouter, ReadRouter, SpeakRouter, AgentRouter # common from .client import ( @@ -302,6 +302,60 @@ AsyncSelfHostedClient, ) + +# agent +from .client import AgentWebSocketEvents + +# websocket +from .client import ( + AgentWebSocketClient, + AsyncAgentWebSocketClient, +) + +from .client import ( + #### common websocket response + # OpenResponse, + # CloseResponse, + # ErrorResponse, + # UnhandledResponse, + #### unique + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, +) + +from .client import ( + # top level + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, + # sub level + Listen, + Speak, + Header, + Item, + Properties, + Parameters, + Function, + Provider, + Think, + Agent, + Input, + Output, + Audio, + Context, +) + # utilities # pylint: disable=wrong-import-position from .audio import Microphone, DeepgramMicrophoneError diff --git a/deepgram/audio/microphone/microphone.py b/deepgram/audio/microphone/microphone.py index 91ba0df0..3a3ad341 100644 --- a/deepgram/audio/microphone/microphone.py +++ b/deepgram/audio/microphone/microphone.py @@ -9,6 +9,7 @@ import logging from ...utils import verboselogs + from .constants import LOGGING, CHANNELS, RATE, CHUNK if TYPE_CHECKING: diff --git a/deepgram/audio/speaker/speaker.py b/deepgram/audio/speaker/speaker.py index 869dc490..e16d5898 100644 --- a/deepgram/audio/speaker/speaker.py +++ b/deepgram/audio/speaker/speaker.py @@ -50,7 +50,6 @@ class Speaker: # pylint: disable=too-many-instance-attributes # _asyncio_loop: asyncio.AbstractEventLoop # _asyncio_thread: threading.Thread _receiver_thread: Optional[threading.Thread] = None - _loop: Optional[asyncio.AbstractEventLoop] = None _push_callback_org: Optional[Callable] = None @@ -265,6 +264,7 @@ async def _start_asyncio_receiver(self): await self._push_callback(message) elif isinstance(message, bytes): self._logger.verbose("Received audio data...") + await self._push_callback(message) self.add_audio_to_queue(message) except websockets.exceptions.ConnectionClosedOK as e: self._logger.debug("send() exiting gracefully: %d", e.code) @@ -297,6 +297,7 @@ def _start_threaded_receiver(self): self._push_callback(message) elif isinstance(message, bytes): self._logger.verbose("Received audio data...") + self._push_callback(message) self.add_audio_to_queue(message) except Exception as e: # pylint: disable=broad-except self._logger.notice("_start_threaded_receiver exception: %s", str(e)) @@ -365,6 +366,7 @@ def _play(self, audio_out, stream, stop): "LastPlay delta is greater than threshold. Unmute!" ) self._microphone.unmute() + data = audio_out.get(True, TIMEOUT) with self._lock_wait: self._last_datagram = datetime.now() diff --git a/deepgram/client.py b/deepgram/client.py index 08ef1717..2cd0c02f 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -55,7 +55,7 @@ ) # listen client -from .clients import Listen, Read, Speak +from .clients import ListenRouter, ReadRouter, SpeakRouter, AgentRouter # speech-to-text from .clients import LiveClient, AsyncLiveClient # backward compat @@ -308,6 +308,61 @@ AsyncSelfHostedClient, ) + +# agent +from .clients import AgentWebSocketEvents + +# websocket +from .clients import ( + AgentWebSocketClient, + AsyncAgentWebSocketClient, +) + +from .clients import ( + #### common websocket response + # OpenResponse, + # CloseResponse, + # ErrorResponse, + # UnhandledResponse, + #### unique + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, +) + +from .clients import ( + # top level + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, + # sub level + Listen, + Speak, + Header, + Item, + Properties, + Parameters, + Function, + Provider, + Think, + Agent, + Input, + Output, + Audio, + Context, +) + + # client errors and options from .options import DeepgramClientOptions, ClientOptionsFromEnv from .errors import DeepgramApiKeyError @@ -397,21 +452,21 @@ def listen(self): """ Returns a Listen dot-notation router for interacting with Deepgram's transcription services. """ - return Listen(self._config) + return ListenRouter(self._config) @property def read(self): """ Returns a Read dot-notation router for interacting with Deepgram's read services. """ - return Read(self._config) + return ReadRouter(self._config) @property def speak(self): """ Returns a Speak dot-notation router for interacting with Deepgram's speak services. """ - return Speak(self._config) + return SpeakRouter(self._config) @property @deprecation.deprecated( @@ -480,6 +535,13 @@ def asyncselfhosted(self): """ return self.Version(self._config, "asyncselfhosted") + @property + def agent(self): + """ + Returns a Agent dot-notation router for interacting with Deepgram's speak services. + """ + return AgentRouter(self._config) + # INTERNAL CLASSES class Version: """ diff --git a/deepgram/clients/__init__.py b/deepgram/clients/__init__.py index 01ec90ee..7b62062d 100644 --- a/deepgram/clients/__init__.py +++ b/deepgram/clients/__init__.py @@ -48,9 +48,10 @@ ) from .errors import DeepgramModuleError -from .listen_router import Listen -from .read_router import Read -from .speak_router import Speak +from .listen_router import ListenRouter +from .read_router import ReadRouter +from .speak_router import SpeakRouter +from .agent_router import AgentRouter # listen from .listen import LiveTranscriptionEvents @@ -318,3 +319,56 @@ SelfHostedClient, AsyncSelfHostedClient, ) + +# agent +from .agent import AgentWebSocketEvents + +# websocket +from .agent import ( + AgentWebSocketClient, + AsyncAgentWebSocketClient, +) + +from .agent import ( + #### common websocket response + # OpenResponse, + # CloseResponse, + # ErrorResponse, + # UnhandledResponse, + #### unique + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, +) + +from .agent import ( + # top level + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, + # sub level + Listen, + Speak, + Header, + Item, + Properties, + Parameters, + Function, + Provider, + Think, + Agent, + Input, + Output, + Audio, + Context, +) diff --git a/deepgram/clients/agent/__init__.py b/deepgram/clients/agent/__init__.py new file mode 100644 index 00000000..33988571 --- /dev/null +++ b/deepgram/clients/agent/__init__.py @@ -0,0 +1,55 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from .enums import AgentWebSocketEvents + +# websocket +from .client import ( + AgentWebSocketClient, + AsyncAgentWebSocketClient, +) + +from .client import ( + #### common websocket response + OpenResponse, + CloseResponse, + ErrorResponse, + UnhandledResponse, + #### unique + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, +) + +from .client import ( + # top level + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, + # sub level + Listen, + Speak, + Header, + Item, + Properties, + Parameters, + Function, + Provider, + Think, + Agent, + Input, + Output, + Audio, + Context, +) diff --git a/deepgram/clients/agent/client.py b/deepgram/clients/agent/client.py new file mode 100644 index 00000000..ad3b868e --- /dev/null +++ b/deepgram/clients/agent/client.py @@ -0,0 +1,100 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +# websocket +from .v1 import ( + AgentWebSocketClient as LatestAgentWebSocketClient, + AsyncAgentWebSocketClient as LatestAsyncAgentWebSocketClient, +) + +from .v1 import ( + #### common websocket response + BaseResponse as LatestBaseResponse, + OpenResponse as LatestOpenResponse, + CloseResponse as LatestCloseResponse, + ErrorResponse as LatestErrorResponse, + UnhandledResponse as LatestUnhandledResponse, + #### unique + WelcomeResponse as LatestWelcomeResponse, + SettingsAppliedResponse as LatestSettingsAppliedResponse, + ConversationTextResponse as LatestConversationTextResponse, + UserStartedSpeakingResponse as LatestUserStartedSpeakingResponse, + AgentThinkingResponse as LatestAgentThinkingResponse, + FunctionCalling as LatestFunctionCalling, + FunctionCallRequest as LatestFunctionCallRequest, + AgentStartedSpeakingResponse as LatestAgentStartedSpeakingResponse, + AgentAudioDoneResponse as LatestAgentAudioDoneResponse, + InjectionRefusedResponse as LatestInjectionRefusedResponse, +) + +from .v1 import ( + # top level + SettingsConfigurationOptions as LatestSettingsConfigurationOptions, + UpdateInstructionsOptions as LatestUpdateInstructionsOptions, + UpdateSpeakOptions as LatestUpdateSpeakOptions, + InjectAgentMessageOptions as LatestInjectAgentMessageOptions, + FunctionCallResponse as LatestFunctionCallResponse, + AgentKeepAlive as LatestAgentKeepAlive, + # sub level + Listen as LatestListen, + Speak as LatestSpeak, + Header as LatestHeader, + Item as LatestItem, + Properties as LatestProperties, + Parameters as LatestParameters, + Function as LatestFunction, + Provider as LatestProvider, + Think as LatestThink, + Agent as LatestAgent, + Input as LatestInput, + Output as LatestOutput, + Audio as LatestAudio, + Context as LatestContext, +) + + +# The vX/client.py points to the current supported version in the SDK. +# Older versions are supported in the SDK for backwards compatibility. + +AgentWebSocketClient = LatestAgentWebSocketClient +AsyncAgentWebSocketClient = LatestAsyncAgentWebSocketClient + +OpenResponse = LatestOpenResponse +CloseResponse = LatestCloseResponse +ErrorResponse = LatestErrorResponse +UnhandledResponse = LatestUnhandledResponse + +WelcomeResponse = LatestWelcomeResponse +SettingsAppliedResponse = LatestSettingsAppliedResponse +ConversationTextResponse = LatestConversationTextResponse +UserStartedSpeakingResponse = LatestUserStartedSpeakingResponse +AgentThinkingResponse = LatestAgentThinkingResponse +FunctionCalling = LatestFunctionCalling +FunctionCallRequest = LatestFunctionCallRequest +AgentStartedSpeakingResponse = LatestAgentStartedSpeakingResponse +AgentAudioDoneResponse = LatestAgentAudioDoneResponse +InjectionRefusedResponse = LatestInjectionRefusedResponse + + +SettingsConfigurationOptions = LatestSettingsConfigurationOptions +UpdateInstructionsOptions = LatestUpdateInstructionsOptions +UpdateSpeakOptions = LatestUpdateSpeakOptions +InjectAgentMessageOptions = LatestInjectAgentMessageOptions +FunctionCallResponse = LatestFunctionCallResponse +AgentKeepAlive = LatestAgentKeepAlive + +Listen = LatestListen +Speak = LatestSpeak +Header = LatestHeader +Item = LatestItem +Properties = LatestProperties +Parameters = LatestParameters +Function = LatestFunction +Provider = LatestProvider +Think = LatestThink +Agent = LatestAgent +Input = LatestInput +Output = LatestOutput +Audio = LatestAudio +Context = LatestContext diff --git a/deepgram/clients/agent/enums.py b/deepgram/clients/agent/enums.py new file mode 100644 index 00000000..df20debc --- /dev/null +++ b/deepgram/clients/agent/enums.py @@ -0,0 +1,37 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from aenum import StrEnum + +# Constants mapping to events from the Deepgram API + + +class AgentWebSocketEvents(StrEnum): + """ + Enumerates the possible Agent API events that can be received from the Deepgram API + """ + + # server + Open: str = "Open" + Close: str = "Close" + AudioData: str = "AudioData" + Welcome: str = "Welcome" + SettingsApplied: str = "SettingsApplied" + ConversationText: str = "ConversationText" + UserStartedSpeaking: str = "UserStartedSpeaking" + AgentThinking: str = "AgentThinking" + FunctionCalling: str = "FunctionCalling" + FunctionCallRequest: str = "FunctionCallRequest" + AgentStartedSpeaking: str = "AgentStartedSpeaking" + AgentAudioDone: str = "AgentAudioDone" + Error: str = "Error" + Unhandled: str = "Unhandled" + + # client + SettingsConfiguration: str = "SettingsConfiguration" + UpdateInstructions: str = "UpdateInstructions" + UpdateSpeak: str = "UpdateSpeak" + InjectAgentMessage: str = "InjectAgentMessage" + InjectionRefused: str = "InjectionRefused" + AgentKeepAlive: str = "AgentKeepAlive" diff --git a/deepgram/clients/agent/v1/__init__.py b/deepgram/clients/agent/v1/__init__.py new file mode 100644 index 00000000..305cb891 --- /dev/null +++ b/deepgram/clients/agent/v1/__init__.py @@ -0,0 +1,59 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +# common websocket +from ...common import ( + OpenResponse, + CloseResponse, + UnhandledResponse, + ErrorResponse, +) + +# websocket +from .websocket import AgentWebSocketClient, AsyncAgentWebSocketClient + +from .websocket import ( + #### common websocket response + BaseResponse, + OpenResponse, + CloseResponse, + ErrorResponse, + UnhandledResponse, + #### unique + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, +) + +from .websocket import ( + # top level + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, + # sub level + Listen, + Speak, + Header, + Item, + Properties, + Parameters, + Function, + Provider, + Think, + Agent, + Input, + Output, + Audio, + Context, +) diff --git a/deepgram/clients/agent/v1/websocket/__init__.py b/deepgram/clients/agent/v1/websocket/__init__.py new file mode 100644 index 00000000..e2d5cdba --- /dev/null +++ b/deepgram/clients/agent/v1/websocket/__init__.py @@ -0,0 +1,50 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from .client import AgentWebSocketClient +from .async_client import AsyncAgentWebSocketClient + +from .response import ( + #### common websocket response + BaseResponse, + OpenResponse, + CloseResponse, + ErrorResponse, + UnhandledResponse, + #### unique + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, +) +from .options import ( + # top level + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, + # sub level + Listen, + Speak, + Header, + Item, + Properties, + Parameters, + Function, + Provider, + Think, + Agent, + Input, + Output, + Audio, + Context, +) diff --git a/deepgram/clients/agent/v1/websocket/async_client.py b/deepgram/clients/agent/v1/websocket/async_client.py new file mode 100644 index 00000000..24fde043 --- /dev/null +++ b/deepgram/clients/agent/v1/websocket/async_client.py @@ -0,0 +1,696 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import asyncio +import json +import logging +from typing import Dict, Union, Optional, cast, Any, Callable +import threading + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import AgentWebSocketEvents +from ....common import AbstractAsyncWebSocketClient +from ....common import DeepgramError + +from .response import ( + OpenResponse, + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, + CloseResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import ( + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, +) + +from .....audio.speaker import ( + Speaker, + RATE as SPEAKER_RATE, + CHANNELS as SPEAKER_CHANNELS, + PLAYBACK_DELTA as SPEAKER_PLAYBACK_DELTA, +) +from .....audio.microphone import ( + Microphone, + RATE as MICROPHONE_RATE, + CHANNELS as MICROPHONE_CHANNELS, +) + +ONE_SECOND = 1 +HALF_SECOND = 0.5 +DEEPGRAM_INTERVAL = 5 + + +class AsyncAgentWebSocketClient( + AbstractAsyncWebSocketClient +): # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's live transcription services over WebSockets. + + This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + + _event_handlers: Dict[AgentWebSocketEvents, list] + + _keep_alive_thread: Union[asyncio.Task, None] + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + # note the distinction here. We can't use _config because it's already used in the parent + _settings: Optional[SettingsConfigurationOptions] = None + _headers: Optional[Dict] = None + + _speaker_created: bool = False + _speaker: Optional[Speaker] = None + _microphone_created: bool = False + _microphone: Optional[Microphone] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config is required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + + # needs to be "wss://agent.deepgram.com/agent" + self._endpoint = "agent" + + # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" + self._config.url = "agent.deepgram.com" + self._keep_alive_thread = None + + # init handlers + self._event_handlers = { + event: [] for event in AgentWebSocketEvents.__members__.values() + } + + if self._config.options.get("microphone_record") == "true": + self._logger.info("microphone_record is enabled") + rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) + channels = self._config.options.get( + "microphone_record_channels", MICROPHONE_CHANNELS + ) + device_index = self._config.options.get("microphone_record_device_index") + + self._logger.debug("rate: %s", rate) + self._logger.debug("channels: %s", channels) + if device_index is not None: + self._logger.debug("device_index: %s", device_index) + + self._microphone_created = True + + if device_index is not None: + self._microphone = Microphone( + rate=rate, + channels=channels, + verbose=self._config.verbose, + input_device_index=device_index, + ) + else: + self._microphone = Microphone( + rate=rate, + channels=channels, + verbose=self._config.verbose, + ) + + if self._config.options.get("speaker_playback") == "true": + self._logger.info("speaker_playback is enabled") + rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) + channels = self._config.options.get( + "speaker_playback_channels", SPEAKER_CHANNELS + ) + playback_delta_in_ms = self._config.options.get( + "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA + ) + device_index = self._config.options.get("speaker_playback_device_index") + + self._logger.debug("rate: %s", rate) + self._logger.debug("channels: %s", channels) + + self._speaker_created = True + + if device_index is not None: + self._logger.debug("device_index: %s", device_index) + + self._speaker = Speaker( + rate=rate, + channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, + verbose=self._config.verbose, + output_device_index=device_index, + microphone=self._microphone, + ) + else: + self._speaker = Speaker( + rate=rate, + channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, + verbose=self._config.verbose, + microphone=self._microphone, + ) + # call the parent constructor + super().__init__(self._config, self._endpoint) + + # pylint: disable=too-many-branches,too-many-statements + async def start( + self, + options: Optional[SettingsConfigurationOptions] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for agent API. + """ + self._logger.debug("AsyncAgentWebSocketClient.start ENTER") + self._logger.info("settings: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SettingsConfigurationOptions) and not options.check(): + self._logger.error("settings.check failed") + self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") + raise DeepgramError("Fatal agent settings error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SettingsConfigurationOptions): + self._logger.info("options is class") + self._settings = options + elif isinstance(options, dict): + self._logger.info("options is dict") + self._settings = SettingsConfigurationOptions.from_dict(options) + elif isinstance(options, str): + self._logger.info("options is json") + self._settings = SettingsConfigurationOptions.from_json(options) + else: + raise DeepgramError("Invalid options type") + + try: + # speaker substitutes the listening thread + if self._speaker is not None: + self._logger.notice("passing speaker to delegate_listening") + super().delegate_listening(self._speaker) + + # call parent start + if ( + await super().start( + {}, + self._addons, + self._headers, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + is False + ): + self._logger.error("AsyncAgentWebSocketClient.start failed") + self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") + return False + + if self._speaker is not None: + self._logger.notice("speaker is delegate_listening. Starting speaker") + self._speaker.start() + + if self._speaker is not None and self._microphone is not None: + self._logger.notice( + "speaker is delegate_listening. Starting microphone" + ) + self._microphone.set_callback(self.send) + self._microphone.start() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # keepalive thread + if self._config.is_keep_alive_enabled(): + self._logger.notice("keepalive is enabled") + self._keep_alive_thread = asyncio.create_task(self._keep_alive()) + else: + self._logger.notice("keepalive is disabled") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # send the configurationsetting message + self._logger.notice("Sending ConfigurationSettings...") + ret_send_cs = await self.send(str(self._settings)) + if not ret_send_cs: + self._logger.error("ConfigurationSettings failed") + + err_error: ErrorResponse = ErrorResponse( + "Exception in AsyncAgentWebSocketClient.start", + "ConfigurationSettings failed to send", + "Exception", + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.debug("AgentWebSocketClient.start LEAVE") + return False + + self._logger.notice("start succeeded") + self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") + return True + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "WebSocketException in AsyncAgentWebSocketClient.start: %s", e + ) + self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") + if self._config.options.get("termination_exception_connect") is True: + raise e + return False + + # pylint: enable=too-many-branches,too-many-statements + + def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in AgentWebSocketEvents.__members__.values() and callable(handler): + self._event_handlers[event].append(handler) + + async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("AsyncAgentWebSocketClient._emit ENTER") + self._logger.debug("callback handlers for: %s", event) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("callback handlers for: %s", event) + tasks = [] + for handler in self._event_handlers[event]: + task = asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(task) + + if tasks: + self._logger.debug("waiting for tasks to finish...") + await asyncio.gather(*tasks, return_exceptions=True) + tasks.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE") + + # pylint: disable=too-many-locals,too-many-statements + async def _process_text(self, message: str) -> None: + """ + Processes messages received over the WebSocket connection. + """ + self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER") + + try: + self._logger.debug("Text data received") + if len(message) == 0: + self._logger.debug("message is empty") + self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") + return + + data = json.loads(message) + response_type = data.get("type") + self._logger.debug("response_type: %s, data: %s", response_type, data) + + match response_type: + case AgentWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.Welcome: + welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) + self._logger.verbose("WelcomeResponse: %s", welcome_result) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Welcome), + welcome=welcome_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.SettingsApplied: + settings_applied_result: SettingsAppliedResponse = ( + SettingsAppliedResponse.from_json(message) + ) + self._logger.verbose( + "SettingsAppliedResponse: %s", settings_applied_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), + settings_applied=settings_applied_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.ConversationText: + conversation_text_result: ConversationTextResponse = ( + ConversationTextResponse.from_json(message) + ) + self._logger.verbose( + "ConversationTextResponse: %s", conversation_text_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), + conversation_text=conversation_text_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.UserStartedSpeaking: + user_started_speaking_result: UserStartedSpeakingResponse = ( + UserStartedSpeakingResponse.from_json(message) + ) + self._logger.verbose( + "UserStartedSpeakingResponse: %s", user_started_speaking_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), + user_started_speaking=user_started_speaking_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.AgentThinking: + agent_thinking_result: AgentThinkingResponse = ( + AgentThinkingResponse.from_json(message) + ) + self._logger.verbose( + "AgentThinkingResponse: %s", agent_thinking_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), + agent_thinking=agent_thinking_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.FunctionCalling: + function_calling_result: FunctionCalling = ( + FunctionCalling.from_json(message) + ) + self._logger.verbose("FunctionCalling: %s", function_calling_result) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), + function_calling=function_calling_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.FunctionCallRequest: + function_call_request_result: FunctionCallRequest = ( + FunctionCallRequest.from_json(message) + ) + self._logger.verbose( + "FunctionCallRequest: %s", function_call_request_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), + function_call_request=function_call_request_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.AgentStartedSpeaking: + agent_started_speaking_result: AgentStartedSpeakingResponse = ( + AgentStartedSpeakingResponse.from_json(message) + ) + self._logger.verbose( + "AgentStartedSpeakingResponse: %s", + agent_started_speaking_result, + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), + agent_started_speaking=agent_started_speaking_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.AgentAudioDone: + agent_audio_done_result: AgentAudioDoneResponse = ( + AgentAudioDoneResponse.from_json(message) + ) + self._logger.verbose( + "AgentAudioDoneResponse: %s", agent_audio_done_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), + agent_audio_done=agent_audio_done_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.InjectionRefused: + injection_refused_result: InjectionRefusedResponse = ( + InjectionRefusedResponse.from_json(message) + ) + self._logger.verbose( + "InjectionRefused: %s", injection_refused_result + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), + injection_refused=injection_refused_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json(message) + self._logger.verbose("CloseResponse: %s", close_result) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), + raw=message, + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.notice("_process_text Succeeded") + self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AsyncAgentWebSocketClient._process_text: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncAgentWebSocketClient._process_text", + f"{e}", + "Exception", + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await super()._signal_exit() + + self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") + + if self._config.options.get("termination_exception") is True: + raise + return + + # pylint: enable=too-many-locals,too-many-statements + + async def _process_binary(self, message: bytes) -> None: + self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER") + self._logger.debug("Binary data received") + + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.notice("_process_binary Succeeded") + self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE") + + # pylint: disable=too-many-return-statements + async def _keep_alive(self) -> None: + """ + Sends keepalive messages to the WebSocket connection. + """ + self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER") + + counter = 0 + while True: + try: + counter += 1 + await asyncio.sleep(ONE_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_keep_alive exiting gracefully") + self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") + return + + # deepgram keepalive + if counter % DEEPGRAM_INTERVAL == 0: + await self.keep_alive() + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncAgentWebSocketClient._keep_alive", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e) + ) + await self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await super()._signal_exit() + + self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") + + if self._config.options.get("termination_exception") is True: + raise + return + + async def keep_alive(self) -> bool: + """ + Sends a KeepAlive message + """ + self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER") + + self._logger.notice("Sending KeepAlive...") + ret = await self.send(json.dumps({"type": "KeepAlive"})) + + if not ret: + self._logger.error("keep_alive failed") + self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") + return False + + self._logger.notice("keep_alive succeeded") + self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") + + return True + + async def _close_message(self) -> bool: + # TODO: No known API close message # pylint: disable=fixme + # return await self.send(json.dumps({"type": "Close"})) + return True + + async def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.debug("AsyncAgentWebSocketClient.finish ENTER") + + # stop the threads + self._logger.verbose("cancelling tasks...") + try: + # call parent finish + if await super().finish() is False: + self._logger.error("AsyncAgentWebSocketClient.finish failed") + + if self._microphone is not None and self._microphone_created: + self._microphone.finish() + self._microphone_created = False + + if self._speaker is not None and self._speaker_created: + self._speaker.finish() + self._speaker_created = False + + # Before cancelling, check if the tasks were created + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + if self._keep_alive_thread is not None: + self._keep_alive_thread.cancel() + tasks.append(self._keep_alive_thread) + self._logger.notice("processing _keep_alive_thread cancel...") + + # Use asyncio.gather to wait for tasks to be cancelled + # Prevent indefinite waiting by setting a timeout + await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) + self._logger.notice("threads joined") + + self._speaker = None + self._microphone = None + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE") + return True + + except asyncio.CancelledError as e: + self._logger.error("tasks cancelled error: %s", e) + self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") + return False + + except asyncio.TimeoutError as e: + self._logger.error("tasks cancellation timed out: %s", e) + self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") + return False diff --git a/deepgram/clients/agent/v1/websocket/client.py b/deepgram/clients/agent/v1/websocket/client.py new file mode 100644 index 00000000..2aa285fc --- /dev/null +++ b/deepgram/clients/agent/v1/websocket/client.py @@ -0,0 +1,680 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import json +import logging +from typing import Dict, Union, Optional, cast, Any, Callable +import threading +import time + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import AgentWebSocketEvents +from ....common import AbstractSyncWebSocketClient +from ....common import DeepgramError + +from .response import ( + OpenResponse, + WelcomeResponse, + SettingsAppliedResponse, + ConversationTextResponse, + UserStartedSpeakingResponse, + AgentThinkingResponse, + FunctionCalling, + FunctionCallRequest, + AgentStartedSpeakingResponse, + AgentAudioDoneResponse, + InjectionRefusedResponse, + CloseResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import ( + SettingsConfigurationOptions, + UpdateInstructionsOptions, + UpdateSpeakOptions, + InjectAgentMessageOptions, + FunctionCallResponse, + AgentKeepAlive, +) + +from .....audio.speaker import ( + Speaker, + RATE as SPEAKER_RATE, + CHANNELS as SPEAKER_CHANNELS, + PLAYBACK_DELTA as SPEAKER_PLAYBACK_DELTA, +) +from .....audio.microphone import ( + Microphone, + RATE as MICROPHONE_RATE, + CHANNELS as MICROPHONE_CHANNELS, +) + +ONE_SECOND = 1 +HALF_SECOND = 0.5 +DEEPGRAM_INTERVAL = 5 + + +class AgentWebSocketClient( + AbstractSyncWebSocketClient +): # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's live transcription services over WebSockets. + + This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + + _event_handlers: Dict[AgentWebSocketEvents, list] + + _keep_alive_thread: Union[threading.Thread, None] + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + # note the distinction here. We can't use _config because it's already used in the parent + _settings: Optional[SettingsConfigurationOptions] = None + _headers: Optional[Dict] = None + + _speaker_created: bool = False + _speaker: Optional[Speaker] = None + _microphone_created: bool = False + _microphone: Optional[Microphone] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config is required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + + # needs to be "wss://agent.deepgram.com/agent" + self._endpoint = "agent" + + # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" + self._config.url = "agent.deepgram.com" + + self._keep_alive_thread = None + + # init handlers + self._event_handlers = { + event: [] for event in AgentWebSocketEvents.__members__.values() + } + + if self._config.options.get("microphone_record") == "true": + self._logger.info("microphone_record is enabled") + rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) + channels = self._config.options.get( + "microphone_record_channels", MICROPHONE_CHANNELS + ) + device_index = self._config.options.get("microphone_record_device_index") + + self._logger.debug("rate: %s", rate) + self._logger.debug("channels: %s", channels) + + self._microphone_created = True + + if device_index is not None: + self._logger.debug("device_index: %s", device_index) + self._microphone = Microphone( + rate=rate, + channels=channels, + verbose=self._config.verbose, + input_device_index=device_index, + ) + else: + self._microphone = Microphone( + rate=rate, + channels=channels, + verbose=self._config.verbose, + ) + + if self._config.options.get("speaker_playback") == "true": + self._logger.info("speaker_playback is enabled") + rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) + channels = self._config.options.get( + "speaker_playback_channels", SPEAKER_CHANNELS + ) + playback_delta_in_ms = self._config.options.get( + "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA + ) + device_index = self._config.options.get("speaker_playback_device_index") + + self._logger.debug("rate: %s", rate) + self._logger.debug("channels: %s", channels) + + self._speaker_created = True + + if device_index is not None: + self._logger.debug("device_index: %s", device_index) + + self._speaker = Speaker( + rate=rate, + channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, + verbose=self._config.verbose, + output_device_index=device_index, + microphone=self._microphone, + ) + else: + self._speaker = Speaker( + rate=rate, + channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, + verbose=self._config.verbose, + microphone=self._microphone, + ) + + # call the parent constructor + super().__init__(self._config, self._endpoint) + + # pylint: disable=too-many-statements,too-many-branches + def start( + self, + options: Optional[SettingsConfigurationOptions] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for agent API. + """ + self._logger.debug("AgentWebSocketClient.start ENTER") + self._logger.info("settings: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SettingsConfigurationOptions) and not options.check(): + self._logger.error("settings.check failed") + self._logger.debug("AgentWebSocketClient.start LEAVE") + raise DeepgramError("Fatal agent settings error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SettingsConfigurationOptions): + self._logger.info("options is class") + self._settings = options + elif isinstance(options, dict): + self._logger.info("options is dict") + self._settings = SettingsConfigurationOptions.from_dict(options) + elif isinstance(options, str): + self._logger.info("options is json") + self._settings = SettingsConfigurationOptions.from_json(options) + else: + raise DeepgramError("Invalid options type") + + try: + # speaker substitutes the listening thread + if self._speaker is not None: + self._logger.notice("passing speaker to delegate_listening") + super().delegate_listening(self._speaker) + + # call parent start + if ( + super().start( + {}, + self._addons, + self._headers, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + is False + ): + self._logger.error("AgentWebSocketClient.start failed") + self._logger.debug("AgentWebSocketClient.start LEAVE") + return False + + if self._speaker is not None: + self._logger.notice("speaker is delegate_listening. Starting speaker") + self._speaker.start() + + if self._speaker is not None and self._microphone is not None: + self._logger.notice( + "speaker is delegate_listening. Starting microphone" + ) + self._microphone.set_callback(self.send) + self._microphone.start() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # keepalive thread + if self._config.is_keep_alive_enabled(): + self._logger.notice("keepalive is enabled") + self._keep_alive_thread = threading.Thread(target=self._keep_alive) + self._keep_alive_thread.start() + else: + self._logger.notice("keepalive is disabled") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # send the configurationsetting message + self._logger.notice("Sending ConfigurationSettings...") + ret_send_cs = self.send(str(self._settings)) + if not ret_send_cs: + self._logger.error("ConfigurationSettings failed") + + err_error: ErrorResponse = ErrorResponse( + "Exception in AgentWebSocketClient.start", + "ConfigurationSettings failed to send", + "Exception", + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.debug("AgentWebSocketClient.start LEAVE") + return False + + self._logger.notice("start succeeded") + self._logger.debug("AgentWebSocketClient.start LEAVE") + return True + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "WebSocketException in AgentWebSocketClient.start: %s", e + ) + self._logger.debug("AgentWebSocketClient.start LEAVE") + if self._config.options.get("termination_exception_connect") is True: + raise e + return False + + # pylint: enable=too-many-statements,too-many-branches + + def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in AgentWebSocketEvents.__members__.values() and callable(handler): + self._event_handlers[event].append(handler) + + def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("AgentWebSocketClient._emit ENTER") + self._logger.debug("callback handlers for: %s", event) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("callback handlers for: %s", event) + for handler in self._event_handlers[event]: + handler(self, *args, **kwargs) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AgentWebSocketClient._emit LEAVE") + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + def _process_text(self, message: str) -> None: + """ + Processes messages received over the WebSocket connection. + """ + self._logger.debug("AgentWebSocketClient._process_text ENTER") + + try: + self._logger.debug("Text data received") + if len(message) == 0: + self._logger.debug("message is empty") + self._logger.debug("AgentWebSocketClient._process_text LEAVE") + return + + data = json.loads(message) + response_type = data.get("type") + self._logger.debug("response_type: %s, data: %s", response_type, data) + + match response_type: + case AgentWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.Welcome: + welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) + self._logger.verbose("WelcomeResponse: %s", welcome_result) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Welcome), + welcome=welcome_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.SettingsApplied: + settings_applied_result: SettingsAppliedResponse = ( + SettingsAppliedResponse.from_json(message) + ) + self._logger.verbose( + "SettingsAppliedResponse: %s", settings_applied_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), + settings_applied=settings_applied_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.ConversationText: + conversation_text_result: ConversationTextResponse = ( + ConversationTextResponse.from_json(message) + ) + self._logger.verbose( + "ConversationTextResponse: %s", conversation_text_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), + conversation_text=conversation_text_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.UserStartedSpeaking: + user_started_speaking_result: UserStartedSpeakingResponse = ( + UserStartedSpeakingResponse.from_json(message) + ) + self._logger.verbose( + "UserStartedSpeakingResponse: %s", user_started_speaking_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), + user_started_speaking=user_started_speaking_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.AgentThinking: + agent_thinking_result: AgentThinkingResponse = ( + AgentThinkingResponse.from_json(message) + ) + self._logger.verbose( + "AgentThinkingResponse: %s", agent_thinking_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), + agent_thinking=agent_thinking_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.FunctionCalling: + function_calling_result: FunctionCalling = ( + FunctionCalling.from_json(message) + ) + self._logger.verbose("FunctionCalling: %s", function_calling_result) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), + function_calling=function_calling_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.FunctionCallRequest: + function_call_request_result: FunctionCallRequest = ( + FunctionCallRequest.from_json(message) + ) + self._logger.verbose( + "FunctionCallRequest: %s", function_call_request_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), + function_call_request=function_call_request_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.AgentStartedSpeaking: + agent_started_speaking_result: AgentStartedSpeakingResponse = ( + AgentStartedSpeakingResponse.from_json(message) + ) + self._logger.verbose( + "AgentStartedSpeakingResponse: %s", + agent_started_speaking_result, + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), + agent_started_speaking=agent_started_speaking_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.AgentAudioDone: + agent_audio_done_result: AgentAudioDoneResponse = ( + AgentAudioDoneResponse.from_json(message) + ) + self._logger.verbose( + "AgentAudioDoneResponse: %s", agent_audio_done_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), + agent_audio_done=agent_audio_done_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.InjectionRefused: + injection_refused_result: InjectionRefusedResponse = ( + InjectionRefusedResponse.from_json(message) + ) + self._logger.verbose( + "InjectionRefused: %s", injection_refused_result + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), + injection_refused=injection_refused_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json(message) + self._logger.verbose("CloseResponse: %s", close_result) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case AgentWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), + raw=message, + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.notice("_process_text Succeeded") + self._logger.debug("SpeakStreamClient._process_text LEAVE") + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in AgentWebSocketClient._process_text", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in AgentWebSocketClient._process_text: %s", str(e) + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + super()._signal_exit() + + self._logger.debug("AgentWebSocketClient._process_text LEAVE") + + if self._config.options.get("termination_exception") is True: + raise + return + + # pylint: enable=too-many-return-statements,too-many-statements + + def _process_binary(self, message: bytes) -> None: + self._logger.debug("AgentWebSocketClient._process_binary ENTER") + self._logger.debug("Binary data received") + + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.notice("_process_binary Succeeded") + self._logger.debug("AgentWebSocketClient._process_binary LEAVE") + + # pylint: disable=too-many-return-statements + def _keep_alive(self) -> None: + """ + Sends keepalive messages to the WebSocket connection. + """ + self._logger.debug("AgentWebSocketClient._keep_alive ENTER") + + counter = 0 + while True: + try: + counter += 1 + self._exit_event.wait(timeout=ONE_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_keep_alive exiting gracefully") + self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") + return + + # deepgram keepalive + if counter % DEEPGRAM_INTERVAL == 0: + self.keep_alive() + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AgentWebSocketClient._keep_alive: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AgentWebSocketClient._keep_alive", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in AgentWebSocketClient._keep_alive: %s", str(e) + ) + self._emit( + AgentWebSocketEvents(AgentWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + super()._signal_exit() + + self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") + + if self._config.options.get("termination_exception") is True: + raise + return + + def keep_alive(self) -> bool: + """ + Sends a KeepAlive message + """ + self._logger.spam("AgentWebSocketClient.keep_alive ENTER") + + self._logger.notice("Sending KeepAlive...") + ret = self.send(json.dumps({"type": "KeepAlive"})) + + if not ret: + self._logger.error("keep_alive failed") + self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") + return False + + self._logger.notice("keep_alive succeeded") + self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") + + return True + + def _close_message(self) -> bool: + # TODO: No known API close message # pylint: disable=fixme + # return self.send(json.dumps({"type": "Close"})) + return True + + # closes the WebSocket connection gracefully + def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.spam("AgentWebSocketClient.finish ENTER") + + # call parent finish + if super().finish() is False: + self._logger.error("AgentWebSocketClient.finish failed") + + if self._microphone is not None and self._microphone_created: + self._microphone.finish() + self._microphone_created = False + + if self._speaker is not None and self._speaker_created: + self._speaker.finish() + self._speaker_created = False + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # stop the threads + self._logger.verbose("cancelling tasks...") + if self._keep_alive_thread is not None: + self._keep_alive_thread.join() + self._keep_alive_thread = None + self._logger.notice("processing _keep_alive_thread thread joined") + + if self._listen_thread is not None: + self._listen_thread.join() + self._listen_thread = None + self._logger.notice("listening thread joined") + + self._speaker = None + self._microphone = None + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("AgentWebSocketClient.finish LEAVE") + return True diff --git a/deepgram/clients/agent/v1/websocket/options.py b/deepgram/clients/agent/v1/websocket/options.py new file mode 100644 index 00000000..6d843052 --- /dev/null +++ b/deepgram/clients/agent/v1/websocket/options.py @@ -0,0 +1,354 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from typing import List, Optional, Union, Any, Tuple +import logging + +from dataclasses import dataclass, field +from dataclasses_json import config as dataclass_config + +from deepgram.utils import verboselogs + +from ...enums import AgentWebSocketEvents +from ....common import BaseResponse + + +# ConfigurationSettings + + +@dataclass +class Listen(BaseResponse): + """ + This class defines any configuration settings for the Listen model. + """ + + model: Optional[str] = field(default="nova-2") + + +@dataclass +class Speak(BaseResponse): + """ + This class defines any configuration settings for the Speak model. + """ + + model: Optional[str] = field( + default="aura-asteria-en", + metadata=dataclass_config(exclude=lambda f: f is None), + ) + provider: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + voice_id: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + +@dataclass +class Header(BaseResponse): + """ + This class defines a single key/value pair for a header. + """ + + key: str + value: str + + +@dataclass +class Item(BaseResponse): + """ + This class defines a single item in a list of items. + """ + + type: str + description: str + + +@dataclass +class Properties(BaseResponse): + """ + This class defines the properties which is just a list of items. + """ + + item: Item + + def __getitem__(self, key): + _dict = self.to_dict() + if "item" in _dict: + _dict["item"] = [Item.from_dict(item) for item in _dict["item"]] + return _dict[key] + + +@dataclass +class Parameters(BaseResponse): + """ + This class defines the parameters for a function. + """ + + type: str + properties: Properties + required: List[str] + + def __getitem__(self, key): + _dict = self.to_dict() + if "properties" in _dict: + _dict["properties"] = _dict["properties"].copy() + return _dict[key] + + +@dataclass +class Function(BaseResponse): + """ + This class defines a function for the Think model. + """ + + name: str + description: str + url: str + method: str + headers: Optional[List[Header]] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + parameters: Optional[Parameters] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + if "parameters" in _dict: + _dict["parameters"] = [ + Parameters.from_dict(parameters) for parameters in _dict["parameters"] + ] + if "headers" in _dict: + _dict["headers"] = [ + Header.from_dict(headers) for headers in _dict["headers"] + ] + return _dict[key] + + +@dataclass +class Provider(BaseResponse): + """ + This class defines the provider for the Think model. + """ + + type: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + +@dataclass +class Think(BaseResponse): + """ + This class defines any configuration settings for the Think model. + """ + + provider: Provider = field(default_factory=Provider) + model: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + instructions: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + functions: Optional[List[Function]] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + if "provider" in _dict: + _dict["provider"] = [ + Provider.from_dict(provider) for provider in _dict["provider"] + ] + if "functions" in _dict: + _dict["functions"] = [ + Function.from_dict(functions) for functions in _dict["functions"] + ] + return _dict[key] + + +@dataclass +class Agent(BaseResponse): + """ + This class defines any configuration settings for the Agent model. + """ + + listen: Listen = field(default_factory=Listen) + think: Think = field(default_factory=Think) + speak: Speak = field(default_factory=Speak) + + def __getitem__(self, key): + _dict = self.to_dict() + if "listen" in _dict: + _dict["listen"] = [Listen.from_dict(listen) for listen in _dict["listen"]] + if "think" in _dict: + _dict["think"] = [Think.from_dict(think) for think in _dict["think"]] + if "speak" in _dict: + _dict["speak"] = [Speak.from_dict(speak) for speak in _dict["speak"]] + return _dict[key] + + +@dataclass +class Input(BaseResponse): + """ + This class defines any configuration settings for the input audio. + """ + + encoding: Optional[str] = field(default="linear16") + sample_rate: int = field(default=16000) + + +@dataclass +class Output(BaseResponse): + """ + This class defines any configuration settings for the output audio. + """ + + encoding: Optional[str] = field(default="linear16") + sample_rate: Optional[int] = field(default=16000) + bitrate: Optional[int] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + container: Optional[str] = field(default="none") + + +@dataclass +class Audio(BaseResponse): + """ + This class defines any configuration settings for the audio. + """ + + input: Optional[Input] = field(default_factory=Input) + output: Optional[Output] = field(default_factory=Output) + + def __getitem__(self, key): + _dict = self.to_dict() + if "input" in _dict: + _dict["input"] = [Input.from_dict(input) for input in _dict["input"]] + if "output" in _dict: + _dict["output"] = [Output.from_dict(output) for output in _dict["output"]] + return _dict[key] + + +@dataclass +class Context(BaseResponse): + """ + This class defines any configuration settings for the context. + """ + + messages: Optional[List[Tuple[str, str]]] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + replay: Optional[bool] = field(default=False) + + def __getitem__(self, key): + _dict = self.to_dict() + if "messages" in _dict: + _dict["messages"] = _dict["messages"].copy() + return _dict[key] + + +@dataclass +class SettingsConfigurationOptions(BaseResponse): + """ + The client should send a SettingsConfiguration message immediately after opening the websocket and before sending any audio. + """ + + type: str = str(AgentWebSocketEvents.SettingsConfiguration) + audio: Audio = field(default_factory=Audio) + agent: Agent = field(default_factory=Agent) + context: Optional[Context] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + if "audio" in _dict: + _dict["audio"] = [Audio.from_dict(audio) for audio in _dict["audio"]] + if "agent" in _dict: + _dict["agent"] = [Agent.from_dict(agent) for agent in _dict["agent"]] + if "context" in _dict: + _dict["context"] = [ + Context.from_dict(context) for context in _dict["context"] + ] + return _dict[key] + + def check(self): + """ + Check the options for any deprecated or soon-to-be-deprecated options. + """ + logger = verboselogs.VerboseLogger(__name__) + logger.addHandler(logging.StreamHandler()) + prev = logger.level + logger.setLevel(verboselogs.ERROR) + + # do we need to check anything here? + + logger.setLevel(prev) + + return True + + +# UpdateInstructions + + +@dataclass +class UpdateInstructionsOptions(BaseResponse): + """ + The client can send an UpdateInstructions message to give additional instructions to the Think model in the middle of a conversation. + """ + + type: str = str(AgentWebSocketEvents.UpdateInstructions) + instructions: str = field(default="") + + +# UpdateSpeak + + +@dataclass +class UpdateSpeakOptions(BaseResponse): + """ + The client can send an UpdateSpeak message to change the Speak model in the middle of a conversation. + """ + + type: str = str(AgentWebSocketEvents.UpdateSpeak) + model: str = field(default="") + + +# InjectAgentMessage + + +@dataclass +class InjectAgentMessageOptions(BaseResponse): + """ + The client can send an InjectAgentMessage to immediately trigger an agent statement. If the injection request arrives while the user is speaking, or while the server is in the middle of sending audio for an agent response, then the request will be ignored and the server will reply with an InjectionRefused. + """ + + type: str = str(AgentWebSocketEvents.InjectAgentMessage) + message: str = field(default="") + + +# Function Call Response + + +@dataclass +class FunctionCallResponse(BaseResponse): + """ + TheFunctionCallResponse message is a JSON command that the client should reply with every time there is a FunctionCallRequest received. + """ + + type: str = "FunctionCallResponse" + function_call_id: str = field(default="") + output: str = field(default="") + + +# Agent Keep Alive + + +@dataclass +class AgentKeepAlive(BaseResponse): + """ + The KeepAlive message is a JSON command that you can use to ensure that the server does not close the connection. + """ + + type: str = "KeepAlive" diff --git a/deepgram/clients/agent/v1/websocket/response.py b/deepgram/clients/agent/v1/websocket/response.py new file mode 100644 index 00000000..f4cc737e --- /dev/null +++ b/deepgram/clients/agent/v1/websocket/response.py @@ -0,0 +1,117 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from typing import List, Optional, Dict, Any + +from dataclasses import dataclass + +# common websocket response +from ....common import ( + BaseResponse, + OpenResponse, + CloseResponse, + ErrorResponse, + UnhandledResponse, +) + +# unique + + +@dataclass +class WelcomeResponse(BaseResponse): + """ + The server will send a Welcome message as soon as the websocket opens. + """ + + type: str + session_id: str + + +@dataclass +class SettingsAppliedResponse(BaseResponse): + """ + The server will send a SettingsApplied message as soon as the settings are applied. + """ + + type: str + + +@dataclass +class ConversationTextResponse(BaseResponse): + """ + The server will send a ConversationText message every time the agent hears the user say something, and every time the agent speaks something itself. + """ + + type: str + role: str + content: str + + +@dataclass +class UserStartedSpeakingResponse(BaseResponse): + """ + The server will send a UserStartedSpeaking message every time the user begins a new utterance. + """ + + type: str + + +@dataclass +class AgentThinkingResponse(BaseResponse): + """ + The server will send an AgentThinking message to inform the client of a non-verbalized agent thought. + """ + + type: str + content: str + + +@dataclass +class FunctionCalling(BaseResponse): + """ + The server will sometimes send FunctionCalling messages when making function calls to help the client developer debug function calling workflows. + """ + + type: str + + +@dataclass +class FunctionCallRequest(BaseResponse): + """ + The FunctionCallRequest message is used to call a function from the server to the client. + """ + + type: str + function_name: str + function_call_id: str + input: str + + +@dataclass +class AgentStartedSpeakingResponse(BaseResponse): + """ + The server will send an AgentStartedSpeaking message when it begins streaming an agent audio response to the client for playback. + """ + + total_latency: float + tts_latency: float + ttt_latency: float + + +@dataclass +class AgentAudioDoneResponse(BaseResponse): + """ + The server will send an AgentAudioDone message immediately after it sends the last audio message in a piece of agent speech. + """ + + type: str + + +@dataclass +class InjectionRefusedResponse(BaseResponse): + """ + The server will send an InjectionRefused message when an InjectAgentMessage request is ignored because it arrived while the user was speaking or while the server was sending audio for an agent response. + """ + + type: str diff --git a/deepgram/clients/agent_router.py b/deepgram/clients/agent_router.py new file mode 100644 index 00000000..90d82f3a --- /dev/null +++ b/deepgram/clients/agent_router.py @@ -0,0 +1,130 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from importlib import import_module +import logging + +from ..utils import verboselogs +from ..options import DeepgramClientOptions +from .errors import DeepgramModuleError + + +class AgentRouter: + """ + Represents a client for interacting with the Deepgram API. + + This class provides a client for making requests to the Deepgram API with various configuration options. + + Attributes: + config_options (DeepgramClientOptions): An optional configuration object specifying client options. + + Raises: + DeepgramApiKeyError: If the API key is missing or invalid. + + Methods: + read: (Preferred) Returns an Threaded AnalyzeClient instance for interacting with Deepgram's read transcription services. + asyncread: Returns an (Async) AnalyzeClient instance for interacting with Deepgram's read transcription services. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + + def __init__(self, config: DeepgramClientOptions): + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + self._config = config + + @property + def websocket(self): + """ + Returns an AgentWebSocketClient instance for interacting with Deepgram's Agent API. + """ + return self.Version(self._config, "websocket") + + @property + def asyncwebsocket(self): + """ + Returns an AsyncAgentWebSocketClient instance for interacting with Deepgram's Agent API. + """ + return self.Version(self._config, "asyncwebsocket") + + # INTERNAL CLASSES + class Version: + """ + Represents a version of the Deepgram API. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _parent: str + + def __init__(self, config, parent: str): + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + self._config = config + self._parent = parent + + # FUTURE VERSIONING: + # When v2 or v1.1beta1 or etc. This allows easy access to the latest version of the API. + # @property + # def latest(self): + # match self._parent: + # case "analyze": + # return AnalyzeClient(self._config) + # case _: + # raise DeepgramModuleError("Invalid parent") + + def v(self, version: str = ""): + """ + Returns a specific version of the Deepgram API. + """ + self._logger.debug("Version.v ENTER") + self._logger.info("version: %s", version) + if len(version) == 0: + self._logger.error("version is empty") + self._logger.debug("Version.v LEAVE") + raise DeepgramModuleError("Invalid module version") + + parent = "" + file_name = "" + class_name = "" + match self._parent: + case "websocket": + parent = "websocket" + file_name = "client" + class_name = "AgentWebSocketClient" + case "asyncwebsocket": + parent = "websocket" + file_name = "async_client" + class_name = "AsyncAgentWebSocketClient" + case _: + self._logger.error("parent unknown: %s", self._parent) + self._logger.debug("Version.v LEAVE") + raise DeepgramModuleError("Invalid parent type") + + # create class path + path = f"deepgram.clients.agent.v{version}.{parent}.{file_name}" + self._logger.info("path: %s", path) + self._logger.info("class_name: %s", class_name) + + # import class + mod = import_module(path) + if mod is None: + self._logger.error("module path is None") + self._logger.debug("Version.v LEAVE") + raise DeepgramModuleError("Unable to find package") + + my_class = getattr(mod, class_name) + if my_class is None: + self._logger.error("my_class is None") + self._logger.debug("Version.v LEAVE") + raise DeepgramModuleError("Unable to find class") + + # instantiate class + my_class = my_class(self._config) + self._logger.notice("Version.v succeeded") + self._logger.debug("Version.v LEAVE") + return my_class diff --git a/deepgram/clients/listen_router.py b/deepgram/clients/listen_router.py index 20b35261..7bc88bfe 100644 --- a/deepgram/clients/listen_router.py +++ b/deepgram/clients/listen_router.py @@ -18,14 +18,13 @@ from .errors import DeepgramModuleError -class Listen: +class ListenRouter: """ Represents a client for interacting with the Deepgram API. This class provides a client for making requests to the Deepgram API with various configuration options. Attributes: - api_key (str): The Deepgram API key used for authentication. config_options (DeepgramClientOptions): An optional configuration object specifying client options. Raises: diff --git a/deepgram/clients/read_router.py b/deepgram/clients/read_router.py index 9ce4c81c..1ca09ee3 100644 --- a/deepgram/clients/read_router.py +++ b/deepgram/clients/read_router.py @@ -10,14 +10,13 @@ from .errors import DeepgramModuleError -class Read: +class ReadRouter: """ Represents a client for interacting with the Deepgram API. This class provides a client for making requests to the Deepgram API with various configuration options. Attributes: - api_key (str): The Deepgram API key used for authentication. config_options (DeepgramClientOptions): An optional configuration object specifying client options. Raises: @@ -40,14 +39,14 @@ def __init__(self, config: DeepgramClientOptions): @property def analyze(self): """ - Returns an AnalyzeClient instance for interacting with Deepgram's read transcription services. + Returns an AnalyzeClient instance for interacting with Deepgram's read services. """ return self.Version(self._config, "analyze") @property def asyncanalyze(self): """ - Returns an AsyncAnalyzeClient instance for interacting with Deepgram's read transcription services. + Returns an AsyncAnalyzeClient instance for interacting with Deepgram's read services. """ return self.Version(self._config, "asyncanalyze") diff --git a/deepgram/clients/speak_router.py b/deepgram/clients/speak_router.py index 5e5dec2f..7290ed48 100644 --- a/deepgram/clients/speak_router.py +++ b/deepgram/clients/speak_router.py @@ -13,7 +13,7 @@ from .errors import DeepgramModuleError -class Speak: +class SpeakRouter: """ This class provides a Speak Clients for making requests to the Deepgram API with various configuration options. diff --git a/deepgram/options.py b/deepgram/options.py index ff4ef56c..ce43480a 100644 --- a/deepgram/options.py +++ b/deepgram/options.py @@ -88,9 +88,9 @@ def _update_headers(self, headers: Optional[Dict] = None): self.headers["Authorization"] = f"Token {self.api_key}" elif "Authorization" in self.headers: del self.headers["Authorization"] - self.headers["User-Agent"] = ( - f"@deepgram/sdk/{__version__} python/{sys.version_info[1]}.{sys.version_info[2]}" - ) + self.headers[ + "User-Agent" + ] = f"@deepgram/sdk/{__version__} python/{sys.version_info[1]}.{sys.version_info[2]}" # Overwrite / add any headers that were passed in if headers: self.headers.update(headers) diff --git a/examples/agent/async_simple/main.py b/examples/agent/async_simple/main.py new file mode 100644 index 00000000..3c04b090 --- /dev/null +++ b/examples/agent/async_simple/main.py @@ -0,0 +1,152 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from signal import SIGINT, SIGTERM +import asyncio +import time +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + AgentWebSocketEvents, + SettingsConfigurationOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + +global warning_notice +warning_notice = True + + +async def main(): + try: + loop = asyncio.get_event_loop() + + for signal in (SIGTERM, SIGINT): + loop.add_signal_handler( + signal, + lambda: asyncio.create_task(shutdown(signal, loop, dg_connection)), + ) + + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config: DeepgramClientOptions = DeepgramClientOptions( + options={ + "keepalive": "true", + "microphone_record": "true", + "speaker_playback": "true", + }, + # verbose=verboselogs.DEBUG, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + + # Create a websocket connection to Deepgram + dg_connection = deepgram.agent.asyncwebsocket.v("1") + + async def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + async def on_binary_data(self, data, **kwargs): + global warning_notice + if warning_notice: + print("Received binary data") + print("You can do something with the binary data here") + print("OR") + print( + "If you want to simply play the audio, set speaker_playback to true in the options for DeepgramClientOptions" + ) + warning_notice = False + + async def on_welcome(self, welcome, **kwargs): + print(f"\n\n{welcome}\n\n") + + async def on_settings_applied(self, settings_applied, **kwargs): + print(f"\n\n{settings_applied}\n\n") + + async def on_conversation_text(self, conversation_text, **kwargs): + print(f"\n\n{conversation_text}\n\n") + + async def on_user_started_speaking(self, user_started_speaking, **kwargs): + print(f"\n\n{user_started_speaking}\n\n") + + async def on_agent_thinking(self, agent_thinking, **kwargs): + print(f"\n\n{agent_thinking}\n\n") + + async def on_function_calling(self, function_calling, **kwargs): + print(f"\n\n{function_calling}\n\n") + + async def on_agent_started_speaking(self, agent_started_speaking, **kwargs): + print(f"\n\n{agent_started_speaking}\n\n") + + async def on_agent_audio_done(self, agent_audio_done, **kwargs): + print(f"\n\n{agent_audio_done}\n\n") + + async def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + async def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + async def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(AgentWebSocketEvents.Open, on_open) + dg_connection.on(AgentWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(AgentWebSocketEvents.Welcome, on_welcome) + dg_connection.on(AgentWebSocketEvents.SettingsApplied, on_settings_applied) + dg_connection.on(AgentWebSocketEvents.ConversationText, on_conversation_text) + dg_connection.on( + AgentWebSocketEvents.UserStartedSpeaking, on_user_started_speaking + ) + dg_connection.on(AgentWebSocketEvents.AgentThinking, on_agent_thinking) + dg_connection.on(AgentWebSocketEvents.FunctionCalling, on_function_calling) + dg_connection.on( + AgentWebSocketEvents.AgentStartedSpeaking, on_agent_started_speaking + ) + dg_connection.on(AgentWebSocketEvents.AgentAudioDone, on_agent_audio_done) + dg_connection.on(AgentWebSocketEvents.Close, on_close) + dg_connection.on(AgentWebSocketEvents.Error, on_error) + dg_connection.on(AgentWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SettingsConfigurationOptions() + options.agent.think.provider.type = "open_ai" + options.agent.think.model = "gpt-4o-mini" + options.agent.think.instructions = "You are a helpful AI assistant." + + print("\n\nPress Enter to stop...\n\n") + if await dg_connection.start(options) is False: + print("Failed to start connection") + return + + # wait until cancelled + try: + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + # This block will be executed when the shutdown coroutine cancels all tasks + pass + finally: + await dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +async def shutdown(signal, loop, dg_connection): + print(f"Received exit signal {signal.name}...") + await dg_connection.finish() + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + [task.cancel() for task in tasks] + print(f"Cancelling {len(tasks)} outstanding tasks") + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + print("Shutdown complete.") + + +asyncio.run(main()) diff --git a/examples/agent/simple/main.py b/examples/agent/simple/main.py new file mode 100644 index 00000000..c47ffb89 --- /dev/null +++ b/examples/agent/simple/main.py @@ -0,0 +1,166 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + AgentWebSocketEvents, + SettingsConfigurationOptions, + FunctionCalling, + FunctionCallRequest, + FunctionCallResponse, +) + +# Add debug prints for imports +print("Checking imports...") +try: + from deepgram import FunctionCallRequest + + print("Successfully imported FunctionCallRequest") +except ImportError as e: + print(f"Failed to import FunctionCallRequest: {e}") + +try: + from deepgram import FunctionCallResponse + + print("Successfully imported FunctionCallResponse") +except ImportError as e: + print(f"Failed to import FunctionCallResponse: {e}") + +global warning_notice +warning_notice = True + + +def main(): + try: + print("Starting main function...") + config: DeepgramClientOptions = DeepgramClientOptions( + options={ + "keepalive": "true", + "microphone_record": "true", + "speaker_playback": "true", + }, + # verbose=verboselogs.DEBUG, + ) + print("Created DeepgramClientOptions...") + + deepgram: DeepgramClient = DeepgramClient("", config) + print("Created DeepgramClient...") + + dg_connection = deepgram.agent.websocket.v("1") + print("Created WebSocket connection...") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + global warning_notice + if warning_notice: + print("Received binary data") + print("You can do something with the binary data here") + print("OR") + print( + "If you want to simply play the audio, set speaker_playback to true in the options for DeepgramClientOptions" + ) + warning_notice = False + + def on_welcome(self, welcome, **kwargs): + print(f"\n\n{welcome}\n\n") + + def on_settings_applied(self, settings_applied, **kwargs): + print(f"\n\n{settings_applied}\n\n") + + def on_conversation_text(self, conversation_text, **kwargs): + print(f"\n\n{conversation_text}\n\n") + + def on_user_started_speaking(self, user_started_speaking, **kwargs): + print(f"\n\n{user_started_speaking}\n\n") + + def on_agent_thinking(self, agent_thinking, **kwargs): + print(f"\n\n{agent_thinking}\n\n") + + def on_function_calling(self, function_calling: FunctionCalling, **kwargs): + print(f"\n\nFunction Calling Debug: {function_calling}\n\n") + + def on_function_call_request( + self, function_call_request: FunctionCallRequest, **kwargs + ): + print(f"\n\nFunction Call Request: {function_call_request}\n\n") + try: + response = FunctionCallResponse( + function_call_id=function_call_request.function_call_id, + output="Function response here", + ) + dg_connection.send_function_call_response(response) + except Exception as e: + print(f"Error in function call: {e}") + + def on_agent_started_speaking(self, agent_started_speaking, **kwargs): + print(f"\n\n{agent_started_speaking}\n\n") + + def on_agent_audio_done(self, agent_audio_done, **kwargs): + print(f"\n\n{agent_audio_done}\n\n") + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(AgentWebSocketEvents.Open, on_open) + dg_connection.on(AgentWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(AgentWebSocketEvents.Welcome, on_welcome) + dg_connection.on(AgentWebSocketEvents.SettingsApplied, on_settings_applied) + dg_connection.on(AgentWebSocketEvents.ConversationText, on_conversation_text) + dg_connection.on( + AgentWebSocketEvents.UserStartedSpeaking, on_user_started_speaking + ) + dg_connection.on(AgentWebSocketEvents.AgentThinking, on_agent_thinking) + dg_connection.on(AgentWebSocketEvents.FunctionCalling, on_function_calling) + dg_connection.on( + AgentWebSocketEvents.FunctionCallRequest, on_function_call_request + ) + dg_connection.on( + AgentWebSocketEvents.AgentStartedSpeaking, on_agent_started_speaking + ) + dg_connection.on(AgentWebSocketEvents.AgentAudioDone, on_agent_audio_done) + dg_connection.on(AgentWebSocketEvents.Close, on_close) + dg_connection.on(AgentWebSocketEvents.Error, on_error) + dg_connection.on(AgentWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options: SettingsConfigurationOptions = SettingsConfigurationOptions() + options.agent.think.provider.type = "open_ai" + options.agent.think.model = "gpt-4o-mini" + options.agent.think.instructions = "You are a helpful AI assistant." + + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + print("\n\nPress Enter to stop...\n\n") + input() + + # Close the connection + dg_connection.finish() + + print("Finished") + + except ImportError as e: + print(f"Import Error Details: {e}") + print(f"Error occurred in module: {getattr(e, 'name', 'unknown')}") + print(f"Path that failed: {getattr(e, 'path', 'unknown')}") + except Exception as e: + print(f"Unexpected error type: {type(e)}") + print(f"Error message: {str(e)}") + print(f"Error occurred in: {__file__}") + + +if __name__ == "__main__": + main()