1616from . import events
1717from vision_agents .core .processors import Processor
1818from vision_agents .core .edge .types import Participant
19+ from ...core .llm .events import RealtimeAudioOutputEvent
1920
2021logger = logging .getLogger (__name__ )
2122
2627
2728"""
2829TODO:
29- - Figure out auth issues AWS SDK bedrock
30+
31+ - Fix audio input event
3032- Cleanup process event loop
3133- Cleanup function calling
3234- Cleanup chat integration
@@ -50,6 +52,7 @@ class Realtime(realtime.Realtime):
5052 full: https://github.com/aws-samples/amazon-nova-samples/blob/main/speech-to-speech/sample-codes/console-python/nova_sonic.py
5153 tool use: https://github.com/aws-samples/amazon-nova-samples/blob/main/speech-to-speech/sample-codes/console-python/nova_sonic_tool_use.py
5254
55+ Input event docs: https://docs.aws.amazon.com/nova/latest/userguide/input-events.html
5356 Available voices are documented here:
5457 https://docs.aws.amazon.com/nova/latest/userguide/available-voices.html
5558
@@ -151,6 +154,8 @@ async def connect(self):
151154
152155 # next send system instructions
153156 system_instructions = self ._build_enhanced_instructions ()
157+ if not system_instructions :
158+ raise Exception ("bedrock requires system instructions before sending regular user input" )
154159 await self .content_input (system_instructions , "SYSTEM" )
155160
156161 async def simple_audio_response (self , pcm : PcmData ):
@@ -159,12 +164,13 @@ async def simple_audio_response(self, pcm: PcmData):
159164 self .logger .warning ("realtime is not active. can't call simple_audio_response" )
160165
161166 content_name = str (uuid .uuid4 ())
162- audio_bytes = pcm . samples . tobytes ()
167+ audio_bytes = pcm
163168
164169 await self .audio_content_start (content_name )
165170 self ._emit_audio_input_event (audio_bytes , sample_rate = pcm .sample_rate )
166-
167- await self .audio_input (content_name , audio_bytes )
171+ # Convert PcmData to base64 encoded bytes
172+ audio_base64 = base64 .b64encode (audio_bytes .samples ).decode ('utf-8' )
173+ await self .audio_input (content_name , audio_base64 )
168174
169175 await self .content_end (content_name )
170176
@@ -191,13 +197,13 @@ async def content_input(self, content: str, role: str):
191197 await self .text_input (content_name , content )
192198 await self .content_end (content_name )
193199
194- async def audio_input (self , content_name : str , audio_bytes : bytes ):
200+ async def audio_input (self , content_name : str , audio_bytes : str ):
195201 audio_event = {
196202 "event" : {
197203 "audioInput" : {
198204 "promptName" : self .session_id ,
199205 "contentName" : content_name ,
200- "content" : audio_bytes . decode ( 'utf-8' )
206+ "content" : audio_bytes
201207 }
202208 }
203209 }
@@ -348,9 +354,11 @@ async def _handle_events(self):
348354 result = await output [1 ].receive ()
349355 if result .value and result .value .bytes_ :
350356 try :
357+ logger .info ("attempt" )
351358 response_data = result .value .bytes_ .decode ('utf-8' )
359+ logger .info (f"Response from Bedrock: { response_data } " )
360+
352361 json_data = json .loads (response_data )
353- logger .info (f"Response from Bedrock: { json_data } " )
354362
355363 # Handle different response types
356364 if 'event' in json_data :
@@ -387,7 +395,17 @@ async def _handle_events(self):
387395 elif 'audioOutput' in json_data ['event' ]:
388396 audio_content = json_data ['event' ]['audioOutput' ]['content' ]
389397 audio_bytes = base64 .b64decode (audio_content )
390- await self .audio_output_queue .put (audio_bytes )
398+ #await self.audio_output_queue.put(audio_bytes)
399+
400+ audio_event = RealtimeAudioOutputEvent (
401+ plugin_name = "gemini" ,
402+ audio_data = audio_content ,
403+ sample_rate = 24000
404+ )
405+ self .events .send (audio_event )
406+
407+ await self .output_track .write (audio_content )
408+
391409 elif 'toolUse' in json_data ['event' ]:
392410 self .toolUseContent = json_data ['event' ]['toolUse' ]
393411 self .toolName = json_data ['event' ]['toolUse' ]['toolName' ]
@@ -419,8 +437,10 @@ async def _handle_events(self):
419437 #await self.output_queue.put({"raw_data": response_data})
420438 except StopAsyncIteration :
421439 # Stream has ended
440+ logger .error ("Stop async iteration exception" )
422441 break
423442 except Exception as e :
443+ logger .error ("Error, %s" , e )
424444 # Handle ValidationException properly
425445 if "ValidationException" in str (e ):
426446 error_message = str (e )
@@ -430,6 +450,7 @@ async def _handle_events(self):
430450 break
431451
432452 except Exception as e :
453+ logger .error ("Error, %s" , e )
433454 print (f"Response processing error: { e } " )
434455 finally :
435456 self .connected = False
0 commit comments