-
Notifications
You must be signed in to change notification settings - Fork 507
/
turn_detector.py
76 lines (59 loc) · 2.15 KB
/
turn_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import deepgram, openai, silero, turn_detector
load_dotenv()
logger = logging.getLogger("voice-assistant")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
# This example uses our open-weight turn detection model to detect when the user is
# done speaking. This approach is more accurate than the default VAD model, reducing
# false positive interruptions by the agent.
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)
logger.info(f"connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# wait for the first participant to connect
participant = await ctx.wait_for_participant()
logger.info(f"starting voice assistant for participant {participant.identity}")
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
turn_detector=turn_detector.EOUModel(),
)
agent.start(ctx.room, participant)
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
metrics.log_metrics(mtrcs)
usage_collector.collect(mtrcs)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: ${summary}")
ctx.add_shutdown_callback(log_usage)
await agent.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)