Skip to content

Create a common abstraction for storing message threads in teams #6243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
import time
from typing import List, Optional

from ...messages import BaseChatMessage
from ._message_store import MessageStore


class MemoryMessageStore(MessageStore):
def __init__(self, ttl: Optional[float] = None):
super().__init__()
self._ttl = ttl
self._lock = asyncio.Lock()
self._messages: List[tuple[BaseChatMessage, float]] = []

async def add_message(self, message: BaseChatMessage) -> None:
await self._remove_expired_messages()
async with self._lock:
self._messages.append((message, time.time()))

async def get_message_thread(self) -> List[BaseChatMessage]:
await self._remove_expired_messages()

async with self._lock:
return [message for message, _ in self._messages]

async def clear(self) -> None:
async with self._lock:
self._messages.clear()

async def _remove_expired_messages(self) -> None:
if self._ttl:
async with self._lock:
time_threshold = time.time() - self._ttl
self._messages = [
(message, timestamp) for message, timestamp in self._messages if timestamp > time_threshold
]

@property
def ttl(self) -> Optional[float]:
return self._ttl
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from abc import ABC, abstractmethod
from typing import List, Optional

from ...messages import BaseChatMessage


class MessageStore(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should have save_state and load_state methods to be used together with the agent classes.

Also, it should subclass Component to enable serializable component config.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See example in autogen_core.mode_context.ChatCompletionContext.

"""
Abstract base class for storing a message thread
"""

@abstractmethod
async def add_message(self, message: BaseChatMessage) -> None:
"""
Add a message to the store
"""
pass

@abstractmethod
async def add_messages(self, messages: List[BaseChatMessage]) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be BaseChatMessage | BaseAgentEvent to be consistent with how message threads are implemented as list in BaseGroupChatManager.

"""
Add multiple messages to the store
"""
pass

@abstractmethod
async def get_message_thread(self) -> List[BaseChatMessage]:
"""
Retrieve the current message thread
"""
pass

@abstractmethod
async def clear(self) -> None:
"""
Clear the message thread storage
"""
pass

@property
@abstractmethod
def ttl(self) -> Optional[float]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be in the concrete class not in the base class, something you can set as part of the constructor.

"""Time-To-Live for messages in seconds."""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def lang_to_cmd(lang: str) -> str:
elif shutil.which("powershell") is not None:
return "powershell"
else:
raise ValueError(f"Powershell or pwsh is not installed. Please install one of them.")
raise ValueError("Powershell or pwsh is not installed. Please install one of them.")
else:
raise ValueError(f"Unsupported language: {lang}")

Expand Down