Skip to content
Merged

Dev #14

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
37d15d0
feat: refine memory storage
Zhaoyang-Chu Sep 30, 2025
86e982f
feat: enhance CustomChatOpenAI with logging and retry mechanism
dcloud347 Oct 2, 2025
60758f9
Adjust service names
Zhaoyang-Chu Oct 4, 2025
544f4e1
feat: add MemoryContext and Query models, and implement SemanticMemor…
dcloud347 Oct 4, 2025
6ae8cde
feat: update SemanticMemoryService to support async memory storage an…
dcloud347 Oct 4, 2025
d17e929
Adjust service name
Zhaoyang-Chu Oct 4, 2025
257b3bd
refactor: streamline database initialization by removing retry logic …
dcloud347 Oct 4, 2025
f21ac7e
refactor: rename memory.py to episodic_memory.py for clarity
dcloud347 Oct 4, 2025
5f588ad
feat: add SemanticMemoryUnitDB model for persistent storage of semant…
dcloud347 Oct 5, 2025
9d75220
refactor: clean up imports and improve code readability in memory ser…
dcloud347 Oct 5, 2025
5c0442e
feat: implement vector indexing for episodic and semantic memory serv…
dcloud347 Oct 5, 2025
7e13e79
feat: add embedding index settings for semantic and episodic memory s…
dcloud347 Oct 5, 2025
3552114
feat: add vector extension creation during database service initializ…
dcloud347 Oct 5, 2025
79e4db8
feat: enhance database service with logging and refactor semantic mem…
dcloud347 Oct 5, 2025
1cc0f19
feat: add semantic memory settings and refactor embedding service for…
dcloud347 Oct 5, 2025
b825e70
feat: implement episodic and semantic memory services with API endpoints
dcloud347 Oct 5, 2025
98ce71c
feat: implement episodic and semantic memory services with API endpoints
dcloud347 Oct 5, 2025
01cce63
fix semantic_memory_service
dcloud347 Oct 5, 2025
ac22ee4
feat: debug and improve episodic memory services
Zhaoyang-Chu Oct 6, 2025
8fcf9e3
add minimum similarity for semantic_memory_service
dcloud347 Oct 6, 2025
e5b02c8
add minimum similarity for semantic_memory_service
dcloud347 Oct 6, 2025
93a8b5c
Merge remote-tracking branch 'origin/dev' into dev
dcloud347 Oct 6, 2025
eb9d380
add delete semantic_memory API service
dcloud347 Oct 6, 2025
33e08ef
style formating
dcloud347 Oct 6, 2025
7abe6b4
fix storing format
dcloud347 Oct 7, 2025
bb51e05
delete context overview field
dcloud347 Oct 7, 2025
a8a3c71
fix
dcloud347 Oct 7, 2025
6682cdb
update semantic memory settings and adjust weight distribution for qu…
dcloud347 Oct 8, 2025
89adec3
add semantic memory settings to docker-compose
dcloud347 Oct 8, 2025
7d3a1e0
update postgres container name and adjust semantic memory similarity …
dcloud347 Oct 8, 2025
6c2ad4a
refactor: streamline SemanticMemoryUnit instantiation
dcloud347 Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 8 additions & 31 deletions athena/app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,11 @@
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi import APIRouter

from athena.configuration.config import settings
from athena.app.api.routes import episodic_memory, semantic_memory

api_router = APIRouter()


@api_router.get("/memory/search", tags=["memory"])
async def search_memory(
request: Request,
q: str,
field: str = "task_state",
limit: int = Query(default=settings.MEMORY_SEARCH_LIMIT, ge=1, le=100),
):
services = getattr(request.app.state, "service", {})
memory_service = services.get("memory_service")
if memory_service is None:
raise HTTPException(status_code=503, detail="Memory service not initialized")
if field not in {"task_state", "task", "state"}:
raise HTTPException(status_code=400, detail=f"Invalid field: {field}")
results = await memory_service.search_memory(q, limit=limit, field=field)
return [m.model_dump() for m in results]


@api_router.get("/memory/{memory_id}", tags=["memory"])
async def get_memory(request: Request, memory_id: str):
services = getattr(request.app.state, "service", {})
memory_service = services.get("memory_service")
if memory_service is None:
raise HTTPException(status_code=503, detail="Memory service not initialized")
result = await memory_service.get_memory_by_key(memory_id)
if result is None:
raise HTTPException(status_code=404, detail="Memory not found")
return result.model_dump()
api_router.include_router(
episodic_memory.router, prefix="/episodic-memory", tags=["episodic-memory"]
)
api_router.include_router(
semantic_memory.router, prefix="/semantic-memory", tags=["semantic-memory"]
)
Empty file.
36 changes: 36 additions & 0 deletions athena/app/api/routes/episodic_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from fastapi import APIRouter, HTTPException, Query, Request

from athena.configuration.config import settings

router = APIRouter()


@router.get("/search/", summary="Search Episodic Memory from database")
async def search_episodic_memory(
request: Request,
q: str,
field: str = "task_state",
limit: int = Query(default=settings.MEMORY_SEARCH_LIMIT, ge=1, le=100),
):
# Get the episodic memory service from the app state
episodic_memory_service = request.app.state.service["episodic_memory_service"]

# Validate field
if field not in {"task_state", "task", "state"}:
raise HTTPException(status_code=400, detail=f"Invalid field: {field}")

# Perform the search
results = await episodic_memory_service.search_memory(q, limit=limit, field=field)
return [m.model_dump() for m in results]


@router.get("/{memory_id}/", summary="Get Episodic Memory from database")
async def get_episodic_memory(request: Request, memory_id: str):
# Get the episodic memory service from the app state
episodic_memory_service = request.app.state.service["episodic_memory_service"]

# Fetch the memory by ID
result = await episodic_memory_service.get_memory_by_key(memory_id)
if result is None:
raise HTTPException(status_code=404, detail="Episodic memory not found")
return result.model_dump()
129 changes: 129 additions & 0 deletions athena/app/api/routes/semantic_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from typing import List

from fastapi import APIRouter, Request

from athena.app.services.semantic_memory_service import SemanticMemoryService
from athena.models.query import Query
from athena.models.requests.semantic_memory import (
StoreSemanticMemoryRequest,
)
from athena.models.responses.response import Response
from athena.models.responses.semantic_memory import SemanticMemoryResponse
from athena.models.semantic_memory import SemanticMemoryUnit

router = APIRouter()


@router.post("/store/", summary="Store semantic memory", response_model=Response)
async def store_semantic_memory(
request: Request,
body: StoreSemanticMemoryRequest,
):
"""
Store a semantic memory unit with vector embeddings.

Args:
request: FastAPI request object
body: Request containing repository_id, query, and memory_context

Returns:
Success message
"""
# Get the semantic memory service from the app state
semantic_memory_service: SemanticMemoryService = request.app.state.service[
"semantic_memory_service"
]

# Create SemanticMemoryUnit from request
memory_unit = SemanticMemoryUnit(query=body.query, contexts=body.contexts)

# Store the memory
await semantic_memory_service.store_memory(
repository_id=body.repository_id,
memory=memory_unit,
)

return Response()


@router.get(
"/retrieve/{repository_id}/",
response_model=Response[List[SemanticMemoryResponse]],
summary="Retrieve semantic memories",
)
async def retrieve_semantic_memory(
request: Request,
repository_id: int,
essential_query: str,
extra_requirements: str = "",
purpose: str = "",
):
"""
Retrieve semantic memories similar to the given query.

Uses weighted multi-vector similarity search across:
- Essential query (50% weight)
- Extra requirements (25% weight)
- Purpose (25% weight)

Args:
request: FastAPI request object
repository_id: Repository identifier
essential_query: The main query string
extra_requirements: Additional constraints or filters for the query
purpose: The intent or context behind the query

Returns:
List of semantic memory units ordered by similarity
"""
# Get the semantic memory service from the app state
semantic_memory_service = request.app.state.service["semantic_memory_service"]

# Retrieve memories
results = await semantic_memory_service.retrieve_memory(
repository_id=repository_id,
query=Query(
essential_query=essential_query, extra_requirements=extra_requirements, purpose=purpose
),
)

# Convert to response models
return Response(
data=[
SemanticMemoryResponse(
query_essential_query=memory.query.essential_query,
query_extra_requirements=memory.query.extra_requirements,
query_purpose=memory.query.purpose,
memory_context_contexts=memory.contexts,
)
for memory in results
]
)


@router.delete(
"/{repository_id}/", summary="Delete semantic memories by repository", response_model=Response
)
async def delete_semantic_memories_by_repository(
request: Request,
repository_id: int,
):
"""
Delete all semantic memories for a given repository.

Args:
request: FastAPI request object
repository_id: Repository identifier to filter memories for deletion

Returns:
Success message
"""
# Get the semantic memory service from the app state
semantic_memory_service: SemanticMemoryService = request.app.state.service[
"semantic_memory_service"
]

# Delete memories
await semantic_memory_service.delete_memories_by_repository(repository_id=repository_id)

return Response()
38 changes: 21 additions & 17 deletions athena/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from athena.app.services.base_service import BaseService
from athena.app.services.database_service import DatabaseService
from athena.app.services.embedding_service import EmbeddingService
from athena.app.services.episodic_memory_service import EpisodicMemoryService
from athena.app.services.episodic_memory_storage_service import EpisodicMemoryStorageService
from athena.app.services.llm_service import LLMService
from athena.app.services.memory_service import MemoryService
from athena.app.services.memory_storage_service import MemoryStorageService
from athena.app.services.semantic_memory_service import SemanticMemoryService
from athena.configuration.config import settings


Expand All @@ -33,33 +34,36 @@ def initialize_services() -> Dict[str, BaseService]:
llm_service = LLMService(
settings.MODEL_NAME,
settings.MODEL_TEMPERATURE,
settings.MODEL_MAX_INPUT_TOKENS,
settings.MODEL_MAX_OUTPUT_TOKENS,
settings.OPENAI_FORMAT_API_KEY,
settings.OPENAI_FORMAT_BASE_URL,
settings.ANTHROPIC_API_KEY,
settings.GEMINI_API_KEY,
settings.GOOGLE_APPLICATION_CREDENTIALS,
)

embedding_service = None
api_key = settings.EMBEDDING_API_KEY or settings.MISTRAL_API_KEY
if settings.EMBEDDING_MODEL and api_key and settings.EMBEDDING_BASE_URL:
embedding_service = EmbeddingService(
model=settings.EMBEDDING_MODEL,
api_key=api_key,
base_url=settings.EMBEDDING_BASE_URL,
embed_dim=settings.EMBEDDING_DIM or 1024,
)
embedding_service = EmbeddingService(
model=settings.EMBEDDING_MODEL,
api_key=settings.EMBEDDING_API_KEY,
base_url=settings.EMBEDDING_BASE_URL,
embed_dim=settings.EMBEDDING_DIM,
)

memory_store = MemoryStorageService(database_service.get_sessionmaker(), embedding_service)
episodic_memory_store = EpisodicMemoryStorageService(
database_service.get_sessionmaker(), embedding_service
)

memory_service = MemoryService(
storage_backend=settings.MEMORY_STORAGE_BACKEND, store=memory_store
episodic_memory_service = EpisodicMemoryService(
database_service=database_service,
storage_backend=settings.MEMORY_STORAGE_BACKEND,
store=episodic_memory_store,
)
semantic_memory_service = SemanticMemoryService(
database_service=database_service, embedding_service=embedding_service
)

return {
"llm_service": llm_service,
"database_service": database_service,
"memory_service": memory_service,
"episodic_memory_service": episodic_memory_service,
"semantic_memory_service": semantic_memory_service,
}
12 changes: 2 additions & 10 deletions athena/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,5 @@ def custom_generate_unique_id(route: APIRoute) -> str:


@app.get("/health", tags=["health"])
async def health_check():
services = getattr(app.state, "service", {})
db = services.get("database_service")
db_ok = await db.health_check() if db is not None else False
status = "healthy" if db_ok else "degraded"
return {
"status": status,
"database": db_ok,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def health_check():
return {"status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat()}
69 changes: 13 additions & 56 deletions athena/app/services/database_service.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,41 @@
import asyncio

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel

from athena.app.services.base_service import BaseService
from athena.configuration.config import settings
from athena.utils.logger_manager import get_logger


class DatabaseService(BaseService):
def __init__(self, DATABASE_URL: str, max_retries: int = 5, initial_backoff: float = 1.0):
def __init__(self, DATABASE_URL: str):
self.engine = create_async_engine(DATABASE_URL, echo=True)
self.sessionmaker = async_sessionmaker(
self.engine, expire_on_commit=False, class_=AsyncSession
)
self._logger = get_logger(__name__)
self._max_retries = max_retries
self._initial_backoff = initial_backoff

async def create_vector_extension(
self,
):
async with self.engine.begin() as conn:
# Ensure pgvector extension exists
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
self._logger.info("pgvector extension ensured.")

# Create the database and tables
async def create_db_and_tables(self):
async with self.engine.begin() as conn:
# Ensure pgvector extension exists (safe to ignore if unavailable)
try:
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
except Exception:
pass
# Create all tables
await conn.run_sync(SQLModel.metadata.create_all)
# Create ivfflat indexes for vector columns (if extension present)
try:
lists = settings.EMBEDDING_IVFFLAT_LISTS or 100
await conn.exec_driver_sql(
f"CREATE INDEX IF NOT EXISTS idx_memory_units_task_embedding ON memory_units USING ivfflat (task_embedding vector_cosine_ops) WITH (lists = {lists})"
)
await conn.exec_driver_sql(
f"CREATE INDEX IF NOT EXISTS idx_memory_units_state_embedding ON memory_units USING ivfflat (state_embedding vector_cosine_ops) WITH (lists = {lists})"
)
await conn.exec_driver_sql(
f"CREATE INDEX IF NOT EXISTS idx_memory_units_task_state_embedding ON memory_units USING ivfflat (task_state_embedding vector_cosine_ops) WITH (lists = {lists})"
)
except Exception:
# Index creation failed (likely no pgvector). Continue without indexes.
pass
self._logger.info("Database and tables created.")

async def start(self):
"""
Start the database service by creating the database and tables.
This method is called when the service is initialized.
"""
attempt = 0
backoff = self._initial_backoff
while True:
try:
await self.create_db_and_tables()
self._logger.info("Database and tables created successfully.")
break
except Exception as exc:
attempt += 1
if attempt > self._max_retries:
self._logger.error(
f"Database start failed after {self._max_retries} retries: {exc}"
)
raise
self._logger.warning(
f"Database start failed (attempt {attempt}/{self._max_retries}): {exc}. "
f"Retrying in {backoff:.1f}s..."
)
await asyncio.sleep(backoff)
backoff *= 2
await self.create_vector_extension()
await self.create_db_and_tables()

async def close(self):
"""
Expand All @@ -80,13 +47,3 @@ async def close(self):
def get_sessionmaker(self) -> async_sessionmaker[AsyncSession]:
"""Return the async sessionmaker for dependency injection."""
return self.sessionmaker

async def health_check(self) -> bool:
"""Perform a lightweight connectivity check (SELECT 1)."""
try:
async with self.engine.connect() as conn:
await conn.exec_driver_sql("SELECT 1")
return True
except Exception as exc:
self._logger.warning(f"Database health_check failed: {exc}")
return False
Loading