-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate_manager.py
87 lines (74 loc) · 2.57 KB
/
state_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
import json
import logging
from datetime import datetime
from typing import List
from livekit import agents, rtc
from livekit.agents.llm import ChatMessage, ChatRole
class StateManager:
"""Helper class to update the UI for the Agent Playground."""
def __init__(self, room: rtc.Room, prompt: str):
self._room = room
self._agent_speaking = False
self._agent_thinking = False
self._current_transcription = ""
self._current_response = ""
self._chat_history: List[agents.llm.ChatMessage] = [
ChatMessage(role=ChatRole.SYSTEM, text=prompt)
]
@property
def agent_speaking(self):
self._update_state()
@agent_speaking.setter
def agent_speaking(self, value: bool):
self._agent_speaking = value
self._update_state()
@property
def agent_thinking(self):
self._update_state()
@agent_thinking.setter
def agent_thinking(self, value: bool):
self._agent_thinking = value
self._update_state()
@property
def chat_history(self):
return self._chat_history
def commit_user_transcription(self, transcription: str):
logging.info("Committing user transcription: %s", transcription)
asyncio.create_task(
self._room.local_participant.publish_data(
json.dumps(
{
"text": transcription,
"timestamp": int(datetime.now().timestamp() * 1000),
},
),
topic="transcription",
)
)
self._chat_history.append(ChatMessage(role=ChatRole.USER, text=transcription))
def commit_agent_response(self, response: str):
logging.info("Committing agent response: %s", response)
asyncio.create_task(
self._room.local_participant.publish_data(
json.dumps(
{
"message": response,
"timestamp": int(datetime.now().timestamp() * 1000),
}
),
topic="lk-chat-topic",
)
)
self._chat_history.append(ChatMessage(role=ChatRole.ASSISTANT, text=response))
def _update_state(self):
state = "listening"
if self._agent_speaking:
state = "speaking"
elif self._agent_thinking:
state = "thinking"
asyncio.create_task(
self._room.local_participant.update_metadata(
json.dumps({"agent_state": state})
)
)