Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions examples/avatar_agents/akool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# LiveKit Akool Avatar Agent

This example demonstrates how to create a animated avatar using [Akool](https://akool.com/).

create your avatar [here](https://akool.com/apps/upload/avatar?from=%2Fapps%2Fstreaming-avatar%2Fedit)

## Usage

* Update the environment:

```bash
# Akool Config
export AKOOL_CLIENT_ID="..."
export AKOOL_CLIENT_SECRET="..."

# OpenAI config (or other models, tts, stt)
export OPENAI_API_KEY="..."

# LiveKit config
export LIVEKIT_API_KEY="..."
export LIVEKIT_API_SECRET="..."
export LIVEKIT_URL="..."
```

* Start the agent worker:

```bash
python examples/avatar_agents/akool/agent_worker.py dev
```
80 changes: 80 additions & 0 deletions examples/avatar_agents/akool/agent_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import asyncio
import logging
import os

from dotenv import load_dotenv
from openai.types.beta.realtime.session import TurnDetection

from livekit import rtc
from livekit.agents import (
Agent,
AgentSession,
JobContext,
WorkerOptions,
WorkerType,
cli,
)
from livekit.plugins import akool, openai

logger = logging.getLogger("akool-avatar-example")
logger.setLevel(logging.INFO)

load_dotenv()


async def entrypoint(ctx: JobContext):
session = AgentSession(
llm=openai.realtime.RealtimeModel(
voice="alloy",
turn_detection=TurnDetection(
type="server_vad",
threshold=0.7,
prefix_padding_ms=200,
silence_duration_ms=800,
create_response=True,
interrupt_response=True,
),
),
)

akool_avatar = akool.AvatarSession(
avatar_config=akool.AvatarConfig(avatar_id="dvp_Tristan_cloth2_1080P"),
client_id=os.getenv("AKOOL_CLIENT_ID"),
client_secret=os.getenv("AKOOL_CLIENT_SECRET"),
api_url=os.getenv("AKOOL_API_URL"),
)

try:
await akool_avatar.start(session, room=ctx.room)

# 监听用户断开连接事件
def on_participant_disconnected(participant: rtc.RemoteParticipant):
logger.info(f"Participant {participant.identity} disconnected")
# Only one participant left, close session
remote_participants = [a.identity for a in ctx.room.remote_participants.values()]
if (
len(remote_participants) == 1
and akool_avatar.get_avatar_participant_identity() == remote_participants[0]
):
logger.info("Closing avatar session")
asyncio.create_task(akool_avatar.aclose())

ctx.room.on("participant_disconnected", on_participant_disconnected)

# start the agent, it will join the room and wait for the avatar to join
await session.start(
agent=Agent(instructions="Talk to me!"),
room=ctx.room,
)

session.generate_reply(instructions="say hello to the user")

except Exception as e:
logger.error(f"Error in entrypoint: {e}")
# Ensure cleanup resources when error occurs
await akool_avatar.aclose()
raise


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, worker_type=WorkerType.ROOM))
6 changes: 6 additions & 0 deletions livekit-plugins/livekit-plugins-akool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Akool virtual avatar plugin for LiveKit Agents

Support for the [Akool](https://akool.com/) virtual avatar.

See [https://docs.livekit.io/agents/integrations/avatar/akool/](https://docs.livekit.io/agents/integrations/avatar/akool/) for more information.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2023 LiveKit, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Akool virtual avatar plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/avatar/akool/ for more information.
"""

from .api import AkoolException
from .avatar import AvatarSession
from .schema import AvatarConfig
from .version import __version__

__all__ = [
"AkoolException",
"AvatarSession",
"AvatarConfig",
"__version__",
]

from livekit.agents import Plugin

from .log import logger


class AkoolPlugin(Plugin):
def __init__(self) -> None:
super().__init__(__name__, __version__, __package__, logger)


Plugin.register_plugin(AkoolPlugin())
133 changes: 133 additions & 0 deletions livekit-plugins/livekit-plugins-akool/livekit/plugins/akool/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import asyncio
import os
from typing import Any, Optional

import aiohttp

from livekit.agents import (
DEFAULT_API_CONNECT_OPTIONS,
NOT_GIVEN,
APIConnectionError,
APIConnectOptions,
APIStatusError,
NotGivenOr,
)

from .log import logger
from .schema import AvatarConfig, CreateSessionRequest, Credentials


class AkoolException(Exception):
"""Exception for Akool errors"""


DEFAULT_API_URL = "https://openapi.akool.com/api/open"


class AkoolAPI:
def __init__(
self,
avatar_config: AvatarConfig,
client_id: NotGivenOr[str] = NOT_GIVEN,
client_secret: NotGivenOr[str] = NOT_GIVEN,
api_url: NotGivenOr[str] = NOT_GIVEN,
*,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
session: Optional[aiohttp.ClientSession] = None,
) -> None:
self._avatar_config = avatar_config
self._client_id = client_id or os.getenv("AKOOL_CLIENT_ID")
self._client_secret = client_secret or os.getenv("AKOOL_CLIENT_SECRET")
if not self._client_id or not self._client_secret:
raise AkoolException("AKOOL_CLIENT_ID and AKOOL_CLIENT_SECRET must be set")
self._api_url = api_url or DEFAULT_API_URL
self._conn_options = conn_options
self._session = session or aiohttp.ClientSession()
self._access_token = None

async def _get_access_token(self) -> str:
"""
Get an access token from the Akool API.
https://docs.akool.com/authentication/usage
"""
url = f"{self._api_url}/v3/getToken"
payload = {"clientId": self._client_id, "clientSecret": self._client_secret}
response_data = await self._post(url, payload)
if response_data["code"] != 1000:
raise AkoolException(f"failed to get access token, error code: {response_data['code']}")
logger.info(f"get_access_token response: {response_data}")
return response_data["token"]

async def create_session(self, livekit_url: str, livekit_token: str) -> str:
"""
https://docs.akool.com/ai-tools-suite/live-avatar#create-session
"""
url = f"{self._api_url}/v4/liveAvatar/session/create"
payload = CreateSessionRequest(
stream_type="livekit",
credentials=Credentials(
livekit_url=livekit_url,
livekit_token=livekit_token,
),
**self._avatar_config.model_dump(exclude_none=True),
).model_dump(exclude_none=True)
logger.info(f"create_session payload: {payload}")
response_data = await self._post(url, payload, need_token=True)
logger.info(f"create_session response: {response_data}")
return response_data["data"] # type: ignore

async def close_session(self, session_id: str) -> None:
"""
Close avatar session
https://docs.akool.com/ai-tools-suite/live-avatar#close-session
"""
url = f"{self._api_url}/v4/liveAvatar/session/close"
payload = {"session_id": session_id}
logger.info(f"close_session payload: {payload}")
response_data = await self._post(url, payload, need_token=True)
logger.info(f"close_session response: {response_data}")

async def _post(self, url: str, payload: dict[str, Any], need_token=False) -> dict[str, Any]:
"""
Make a POST request to the Akool API with retry logic.

Args:
endpoint: API endpoint path (without leading slash)
payload: JSON payload for the request

Returns:
Response data as a dictionary

Raises:
APIConnectionError: If the request fails after all retries
"""
headers = {"Content-Type": "application/json"}
if need_token:
if not self._access_token:
self._access_token = await self._get_access_token()
headers["Authorization"] = f"Bearer {self._access_token}"

for i in range(self._conn_options.max_retry):
try:
async with self._session.post(
url,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout),
) as response:
if not response.ok:
text = await response.text()
raise APIStatusError(
"Server returned an error", status_code=response.status, body=text
)
return await response.json() # type: ignore
except Exception as e:
if isinstance(e, APIConnectionError):
logger.warning("failed to call akool api", extra={"error": str(e)})
else:
logger.exception("failed to call akool api")

if i < self._conn_options.max_retry - 1:
await asyncio.sleep(self._conn_options.retry_interval)

raise APIConnectionError("Failed to call Akool API after all retries")
Loading
Loading