Skip to content

Commit

Permalink
process custom flags on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
benxu3 committed Sep 23, 2024
1 parent 197417a commit 12efc95
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 58 deletions.
108 changes: 55 additions & 53 deletions software/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion software/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ livekit-plugins-openai = "^0.8.1"
livekit-plugins-silero = "^0.6.4"
livekit-plugins-elevenlabs = "^0.7.3"
segno = "^1.6.1"
open-interpreter = {extras = ["os", "server"], version = "^0.3.12"} # You should add a "browser" extra, so selenium isn't in the main package
open-interpreter = {git = "https://github.com/openinterpreter/open-interpreter.git", rev = "development", extras = ["os", "server"]}
ngrok = "^1.4.0"
realtimetts = {extras = ["all"], version = "^0.4.5"}
realtimestt = "^0.2.41"
Expand Down
131 changes: 127 additions & 4 deletions software/source/server/livekit/worker.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,130 @@
import asyncio
import copy
import os
import re
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.llm import ChatContext, ChatMessage
from livekit.agents.transcription import STTSegmentsForwarder
from livekit.agents.llm import ChatContext, ChatMessage, LLMStream, ChatChunk, ChoiceDelta, Choice
from livekit import rtc
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero, elevenlabs
from dotenv import load_dotenv
import sys
import numpy as np

from typing import AsyncIterator
load_dotenv()

start_message = """Hi! You can hold the white circle below to speak to me.
Try asking what I can do."""

class ProcessedLLMStream(LLMStream):
def __init__(
self,
original_stream: LLMStream,
regex_pattern: str = r'<unvoiced code="([^"]+)"></unvoiced>',
) -> None:
super().__init__(chat_ctx=original_stream.chat_ctx, fnc_ctx=original_stream.fnc_ctx)
self.original_stream = original_stream
self.regex_pattern = regex_pattern
self.init_match = '<.*?' # match for the '<' and any characters to the left of it
self.accumulating = False
self._aiter = self._process_stream()
self._buffer = ""


async def _process_stream(self) -> AsyncIterator[ChatChunk]:
async for chunk in self.original_stream:
new_choices = []
for choice in chunk.choices:
content = choice.delta.content

if content:
init_match = re.search(self.init_match, content)
if init_match:
print("INITIAL MATCH FOUND!!!!!!")
print("INITIAL MATCH FOUND!!!!!!")
print("INITIAL MATCH FOUND!!!!!!")
print("INITIAL MATCH FOUND!!!!!!")
print("INITIAL MATCH FOUND!!!!!!")
print("INITIAL MATCH FOUND!!!!!!")
print("INITIAL MATCH FOUND!!!!!!")
self.accumulating = True
if self.accumulating:
self._buffer += content
print("ACCUMULATING BUFFER!!!")
print("ACCUMULATING BUFFER!!!")
print("ACCUMULATING BUFFER!!!")
print("ACCUMULATING BUFFER!!!")
print("ACCUMULATING BUFFER!!!")
print("ACCUMULATING BUFFER!!!")
match = re.search(self.regex_pattern, self._buffer)
if match:
code = match.group(1)
print(f"Extracted Code: {code}")

# Create a confirmation message
confirmation_msg = ChatMessage(
role="assistant",
content=f"Code extracted: {code}",
)

# Wrap the confirmation message in ChoiceDelta and Choice
choice_delta = ChoiceDelta(
role=confirmation_msg.role,
content=str(confirmation_msg.content) # we know confirmation_msg.content is a string
)
new_choice = Choice(
delta=choice_delta,
index=choice.index
)

# Create a new ChatChunk with the confirmation Choice
confirmation_chunk = ChatChunk(choices=[new_choice])

# Yield the confirmation chunk
yield confirmation_chunk
self.accumulating = False
self._buffer = ""
continue # Skip yielding the original content
new_choices.append(choice)
if new_choices:
yield ChatChunk(choices=new_choices)

async def __anext__(self) -> ChatChunk:
try:
return await self._aiter.__anext__()
except StopAsyncIteration:
await self.aclose()
raise

def _01_synthesize_assistant_reply(
assistant: VoiceAssistant, chat_ctx: ChatContext
) -> LLMStream:
"""
Custom function to process the OpenAI compatible endpoint's output.
Extracts code from responses matching the <unvoiced code=...></unvoiced> pattern.
Args:
assistant (VoiceAssistant): The VoiceAssistant instance.
chat_ctx (ChatContext): The current chat context.
Returns:
LLMStream: The processed LLMStream.
"""
llm_stream = assistant.llm.chat(chat_ctx=chat_ctx, fnc_ctx=assistant.fnc_ctx)
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")
print("HELLO FROM INSIDE OUR CUSTOM LLM STREAM")

return ProcessedLLMStream(original_stream=llm_stream)

# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):
# Create an initial chat context with a system prompt
Expand Down Expand Up @@ -96,6 +205,7 @@ async def publish_static_image():
llm=open_interpreter, # Language Model
tts=tts, # Text-to-Speech
chat_ctx=initial_ctx, # Chat history context
# will_synthesize_assistant_reply=_01_synthesize_assistant_reply,
)

chat = rtc.ChatManager(ctx.room)
Expand All @@ -118,13 +228,26 @@ def on_chat_received(msg: rtc.ChatMessage):

await asyncio.sleep(1)

print("HELLO FROM INSIDE THE WORKER")
print("HELLO FROM INSIDE THE WORKER")
print("HELLO FROM INSIDE THE WORKER")
print("HELLO FROM INSIDE THE WORKER")
print("HELLO FROM INSIDE THE WORKER")

# Greets the user with an initial message
await assistant.say(start_message,
allow_interruptions=True)

stt_forwarder = STTSegmentsForwarder(room=ctx.room, participant=ctx.room.local_participant)
await stt_forwarder._run()


def main(livekit_url):

print("Starting worker!!!!!!! 🦅🦅🦅🦅🦅🦅")
print("Starting worker!!!!!!! 🦅🦅🦅🦅🦅🦅")
print("Starting worker!!!!!!! 🦅🦅🦅🦅🦅🦅")
print("Starting worker!!!!!!! 🦅🦅🦅🦅🦅🦅")
print("Starting worker!!!!!!! 🦅🦅🦅🦅🦅🦅")
# Workers have to be run as CLIs right now.
# So we need to simualte running "[this file] dev"

Expand All @@ -134,5 +257,5 @@ def main(livekit_url):

# Initialize the worker with the entrypoint
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=livekit_url)
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=livekit_url, port=8082)
)

0 comments on commit 12efc95

Please sign in to comment.