Skip to content

Commit

Permalink
Merge pull request #11 from speechmatics/bugfix/linux-playback-issue
Browse files Browse the repository at this point in the history
Fix latency issue on linux
  • Loading branch information
dumitrugutu authored Jan 24, 2025
2 parents 22c68af + 3629088 commit 197fe3f
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 74 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.0.9] - 2025-01-24

### Fixed

- Issue with Flow latency on Linux

### Changed

- Examples load env vars from .env file
- Examples now print user/agent transcripts

## [0.0.8] - 2024-11-29

### Added
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ key can be found here: https://docs.speechmatics.com/flow/getting-started#set-up
*Note: Requires access to microphone

```bash
speechmatics-flow --auth-token $TOKEN --ssl-mode insecure
speechmatics-flow --auth-token $TOKEN
```

### Change Assistant (Amelia → Humphrey)

To set the assistant to *Humphrey* instead of *Amelia* run this command:

```bash
speechmatics-flow --auth-token $TOKEN --ssl-mode insecure --assistant humphrey
speechmatics-flow --auth-token $TOKEN --assistant humphrey
```

### Load conversation_config from a config file
Expand All @@ -63,7 +63,7 @@ using the `--config-file` option
```

```bash
speechmatics-flow --auth-token $TOKEN --ssl-mode insecure --config-file conversation_config.json
speechmatics-flow --auth-token $TOKEN --config-file conversation_config.json
```

> **Hint**: Why limit Humphrey? Try changing the template_variables to see what happens if he’s not a butler but
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.8
0.0.9
45 changes: 14 additions & 31 deletions examples/client_side_function_calling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@
import sys
from typing import Any, Dict

import pyaudio
from dotenv import load_dotenv

from speechmatics_flow.cli import Transcripts, add_printing_handlers
from speechmatics_flow.client import WebsocketClient
from speechmatics_flow.models import (
ConnectionSettings,
Interaction,
AudioSettings,
ClientMessageType,
ConnectionSettings,
ConversationConfig,
Interaction,
ServerMessageType,
ClientMessageType,
)
from speechmatics_flow.playback import audio_playback

load_dotenv()

# Create a websocket client
client = WebsocketClient(
Expand Down Expand Up @@ -185,34 +189,13 @@ async def send_response(response: Dict[str, Any]) -> None:
print(f"Failed to send response: {e}")


client.add_event_handler(ServerMessageType.AddAudio, binary_msg_callback)
client.add_event_handler(ServerMessageType.ToolInvoke, order_callback)


async def audio_playback():
"""Continuously read from the audio queue and play audio back to the user."""
p = pyaudio.PyAudio()
player_stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
frames_per_buffer=128,
output=True,
)
try:
while True:
audio = await audio_queue.get()
player_stream.write(audio)
# read from buffer at a constant rate
await asyncio.sleep(0.005)
finally:
player_stream.stop_stream()
player_stream.close()
p.terminate()


async def main():
"""Main function to run both the WebSocket client and audio playback."""
transcripts = Transcripts()
client.add_event_handler(ServerMessageType.AddAudio, binary_msg_callback)
client.add_event_handler(ServerMessageType.ToolInvoke, order_callback)
add_printing_handlers(client, transcripts, False)

tasks = [
# Start the WebSocket client and conversation
asyncio.create_task(
Expand Down Expand Up @@ -294,7 +277,7 @@ async def main():
)
),
# Start the audio playback handler
asyncio.create_task(audio_playback()),
asyncio.create_task(audio_playback(audio_queue)),
]

(done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
Expand Down
43 changes: 13 additions & 30 deletions examples/stream_from_microphone.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
import os
import sys

import pyaudio
from dotenv import load_dotenv

from speechmatics_flow.cli import Transcripts, add_printing_handlers
from speechmatics_flow.client import WebsocketClient
from speechmatics_flow.models import (
ConnectionSettings,
Interaction,
AudioSettings,
ConnectionSettings,
ConversationConfig,
Interaction,
ServerMessageType,
)
from speechmatics_flow.playback import audio_playback

load_dotenv()

# Create a websocket client
client = WebsocketClient(
Expand All @@ -31,34 +35,13 @@ async def binary_msg_callback(msg: bytes):
await audio_queue.put(msg)


# Register the callback to be called when the client receives an audio message
client.add_event_handler(ServerMessageType.AddAudio, binary_msg_callback)


async def audio_playback():
"""Continuously read from the audio queue and play audio back to the user."""
p = pyaudio.PyAudio()
player_stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
frames_per_buffer=128,
output=True,
)
try:
while True:
audio = await audio_queue.get()
player_stream.write(audio)
# read from buffer at a constant rate
await asyncio.sleep(0.005)
finally:
player_stream.stop_stream()
player_stream.close()
p.terminate()


async def main():
"""Main function to run both the WebSocket client and audio playback."""
transcripts = Transcripts()
# Register callbacks
client.add_event_handler(ServerMessageType.AddAudio, binary_msg_callback)
add_printing_handlers(client, transcripts, False)

tasks = [
# Start the WebSocket client and conversation
asyncio.create_task(
Expand All @@ -69,7 +52,7 @@ async def main():
)
),
# Start the audio playback handler
asyncio.create_task(audio_playback()),
asyncio.create_task(audio_playback(audio_queue)),
]

(done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ httpx==0.27.1
pyaudio==0.2.14
setuptools
websockets>=10,<=13.1
python-dotenv~=1.0
24 changes: 15 additions & 9 deletions speechmatics_flow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
# Semaphore used to ensure that we don't send too much audio data to
# the server too quickly and burst any buffers downstream.
self._buffer_semaphore = asyncio.BoundedSemaphore
self._loop = None

async def _init_synchronization_primitives(self):
"""
Expand All @@ -100,6 +101,7 @@ async def _init_synchronization_primitives(self):
self._buffer_semaphore = asyncio.BoundedSemaphore(
self.connection_settings.message_buffer_size
)
self._loop = asyncio.get_running_loop()

def _flag_conversation_started(self):
"""
Expand Down Expand Up @@ -233,8 +235,7 @@ async def _consumer(self, message, from_cli=False):
if inspect.iscoroutinefunction(handler):
await handler(copy.deepcopy(message))
else:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
await self._loop.run_in_executor(
self._executor, handler, copy.deepcopy(message)
)
except ForceEndSession:
Expand Down Expand Up @@ -277,6 +278,13 @@ async def _read_from_microphone(self):
rate=self.audio_settings.sample_rate,
input=True,
)

async def async_stream_read():
# audio_chunk size is 128 * 2 = 256 bytes which is about 8ms
return await self._loop.run_in_executor(
self._executor, stream.read, 128, False
)

try:
while True:
if self._session_needs_closing or self._conversation_ended.is_set():
Expand All @@ -287,14 +295,11 @@ async def _read_from_microphone(self):
timeout=self.connection_settings.semaphore_timeout_seconds,
)

# audio_chunk size is 128 * 2 = 256 bytes which is about 8ms
audio_chunk = stream.read(num_frames=128, exception_on_overflow=False)
audio_chunk = await async_stream_read()

self.client_seq_no += 1
self._call_middleware(ClientMessageType.AddAudio, audio_chunk, True)
await self.websocket.send(audio_chunk)
# send audio at a constant rate
await asyncio.sleep(0.01)
except KeyboardInterrupt:
await self.websocket.send(self._end_of_audio())
finally:
Expand Down Expand Up @@ -377,11 +382,13 @@ async def _playback_handler(self):
format=pyaudio.paInt16,
channels=1,
rate=self.playback_settings.sample_rate,
frames_per_buffer=self.playback_settings.chunk_size,
output=True,
)
chunk_size = self.playback_settings.chunk_size

async def async_stream_write(chunk):
return await self._loop.run_in_executor(self._executor, stream.write, chunk)

try:
while not self._session_needs_closing or self._conversation_ended.is_set():
# Wait for the server to start sending audio
Expand All @@ -397,8 +404,7 @@ async def _playback_handler(self):
async with self._audio_buffer_lock:
audio_chunk = bytes(self._audio_buffer[:chunk_size])
self._audio_buffer = self._audio_buffer[chunk_size:]
stream.write(audio_chunk)
await asyncio.sleep(0.005)
await async_stream_write(audio_chunk)
except Exception as e:
LOGGER.error(f"Error during audio playback: {e}", exc_info=True)
raise e
Expand Down
19 changes: 19 additions & 0 deletions speechmatics_flow/playback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio

import pyaudio


async def audio_playback(audio_queue: asyncio.Queue):
"""Continuously read from the audio queue and play audio back to the user."""

p = pyaudio.PyAudio()
player_stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)
try:
while True:
audio = await audio_queue.get()
player_stream.write(audio)
await asyncio.sleep(0.005)
finally:
player_stream.stop_stream()
player_stream.close()
p.terminate()

0 comments on commit 197fe3f

Please sign in to comment.