Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement database persistence for chat history #596

Open
5 tasks
AtlantisPleb opened this issue Jan 24, 2025 · 6 comments
Open
5 tasks

Implement database persistence for chat history #596

AtlantisPleb opened this issue Jan 24, 2025 · 6 comments

Comments

@AtlantisPleb
Copy link
Contributor

Database Persistence for Chat History

Overview

Implement proper database-based persistence for chat history and conversations, replacing the proposed in-memory approach from #589.

Required Changes

1. Database Schema

-- Conversations table
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id TEXT NOT NULL,
    title TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Messages table
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id),
    role TEXT NOT NULL, -- 'user', 'assistant', 'system'
    content TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    metadata JSONB -- For tool usage, execution results etc
);

2. Database Service

// Add database service for chat persistence
pub struct ChatDatabase {
    pool: PgPool
}

impl ChatDatabase {
    // Create new conversation
    pub async fn create_conversation(&self, user_id: &str) -> Result<Uuid>;
    
    // Add message to conversation
    pub async fn add_message(&self, conv_id: Uuid, msg: ChatMessage) -> Result<Uuid>;
    
    // Get conversation history
    pub async fn get_conversation(&self, conv_id: Uuid) -> Result<Vec<ChatMessage>>;
    
    // List user's conversations
    pub async fn list_conversations(&self, user_id: &str) -> Result<Vec<Conversation>>;
}

3. Chat Handler Integration

pub struct ChatHandler {
    ws_state: Arc<WebSocketState>,
    deepseek_service: Arc<DeepSeekService>,
    github_service: Arc<GitHubService>,
    db: Arc<ChatDatabase>, // Add database service
}

impl ChatHandler {
    // Update message handling to use database
    async fn process_message(&self, msg: ChatMessage, conv_id: Uuid) -> Result<()> {
        // Store user message
        self.db.add_message(conv_id, msg.clone()).await?;
        
        // Get conversation history
        let history = self.db.get_conversation(conv_id).await?;
        
        // Process with DeepSeek including history
        let response = self.deepseek_service
            .chat_with_tools(msg.content, &history)
            .await?;
            
        // Store assistant response
        self.db.add_message(conv_id, response.clone()).await?;
        
        Ok(())
    }
}

4. API Endpoints

// Add routes for:
- GET /api/conversations - List user's conversations
- GET /api/conversations/{id} - Get specific conversation
- POST /api/conversations - Create new conversation
- POST /api/conversations/{id}/messages - Add message to conversation

5. UI Updates

  • Add conversation sidebar
  • Show conversation history
  • Allow switching between conversations
  • Display message timestamps

Implementation Steps

  1. Database Setup
  • Add migrations for schema
  • Implement database service
  • Add proper indexes
  1. Chat Handler Updates
  • Integrate database service
  • Update message processing
  • Add conversation management
  1. API Implementation
  • Add conversation endpoints
  • Update WebSocket handlers
  • Add proper error handling
  1. UI Changes
  • Add conversation management UI
  • Update chat display
  • Add loading states

Success Criteria

  • Messages persist between page reloads
  • Users can manage multiple conversations
  • Full conversation history available
  • Proper error handling
  • Clean UI integration

Related Issues

@AtlantisPleb
Copy link
Contributor Author

Additional Context: Tool Integration and Persistence

The CLI tool integration from #595 needs to be considered in the database schema and persistence layer. We should extend the schema to handle tool execution:

-- Add tool_executions table
CREATE TABLE tool_executions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    message_id UUID NOT NULL REFERENCES messages(id),
    tool_name TEXT NOT NULL,
    input JSONB NOT NULL,
    output JSONB,
    status TEXT NOT NULL, -- 'pending', 'success', 'error'
    error_message TEXT,
    started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    execution_time_ms INTEGER -- For performance tracking
);

-- Add indexes
CREATE INDEX tool_executions_message_id_idx ON tool_executions(message_id);
CREATE INDEX tool_executions_tool_name_idx ON tool_executions(tool_name);
CREATE INDEX tool_executions_status_idx ON tool_executions(status);

This allows us to:

  1. Track tool usage and performance
  2. Resume interrupted operations
  3. Analyze tool effectiveness
  4. Debug tool execution issues
  5. Provide execution history to the model

The CLI tool functionality (repo cloning, testing, analysis) will be tracked here, enabling:

  • Persistent workspace references
  • Test execution history
  • Analysis result caching
  • Progress tracking across sessions

We should also consider adding a workspace_id column to track isolated environments from #592.

(Comment from OpenAgents)

@AtlantisPleb
Copy link
Contributor Author

Initial implementation progress:

  1. Created database migrations:
  • conversations table with user_id, title, timestamps
  • messages table with role, content, metadata, tool_calls
  • Added appropriate indexes
  • ON DELETE CASCADE for messages
  1. Created models (src/server/models/chat.rs):
  • Conversation struct with FromRow derive
  • Message struct with FromRow derive
  • Request structs for creation
  • Helper methods for instantiation
  1. Implemented ChatDatabase service (src/server/services/chat_database.rs):
  • create_conversation
  • add_message (with conversation timestamp update)
  • get_conversation
  • get_conversation_messages
  • list_user_conversations
  • delete_conversation

Next steps:

  1. Write integration tests
  2. Add chat handlers
  3. Update WebSocket handlers to use persistence
  4. Add conversation management UI components

The current implementation provides a solid foundation for both chat history persistence and tool execution tracking (via the tool_calls JSONB field in messages).

(Comment from OpenAgents)

@AtlantisPleb
Copy link
Contributor Author

Added comprehensive integration tests in tests/chat_database.rs:

  1. Conversation lifecycle:
  • Creation with user_id and title
  • Retrieval by ID
  • Listing by user_id
  • Deletion with cascade
  1. Message handling:
  • Adding messages with different roles
  • Storing metadata and tool_calls as JSONB
  • Retrieving conversation messages
  • Verifying message ordering
  • Testing cascade deletion
  1. Error cases:
  • Non-existent conversation handling
  • Post-deletion verification

The tests follow the pattern established in tests/user.rs with thorough assertions and cleanup.

Next steps:

  1. Add chat handlers for HTTP endpoints
  2. Update WebSocket handlers to use persistence
  3. Add UI components for conversation management

Would you like me to proceed with implementing the chat handlers next?

(Comment from OpenAgents)

@AtlantisPleb
Copy link
Contributor Author

WebSocket Integration Plan

After reviewing the WebSocket implementation, here's how we'll integrate chat persistence without adding HTTP endpoints:

1. Extend WebSocket Message Types (src/server/ws/types.rs)

pub enum ChatMessage {
    // Existing
    UserMessage { content: String },
    AgentResponse { content: String },
    Error { message: String },
    
    // New conversation management messages
    CreateConversation { user_id: String, title: Option<String> },
    ListConversations { user_id: String },
    SelectConversation { conversation_id: Uuid },
    DeleteConversation { conversation_id: Uuid },
}

2. Enhance ChatHandler State (src/server/ws/handlers/chat.rs)

pub struct ChatHandler {
    ws_state: Arc<WebSocketState>,
    github_service: Arc<GitHubService>,
    chat_db: Arc<ChatDatabase>,              // Add database service
    active_conversations: DashMap<String, Uuid>, // Track active conversation per connection
}

3. Message Flow

Connection Setup

  1. When WS connects, no active conversation
  2. Client must either:
    • Create new conversation
    • List conversations and select one

Conversation Creation

  1. Client sends CreateConversation
  2. Server creates in DB
  3. Sets as active conversation for connection
  4. Returns conversation details

Message Processing

  1. Check for active conversation
  2. Store message in DB
  3. Process with model
  4. Store response in DB
  5. Stream response to client

Conversation Management

  • List: Return user's conversations
  • Select: Switch active conversation
  • Delete: Remove conversation and messages

4. Client Message Examples

// Create conversation
{
    "type": "chat",
    "type": "create_conversation",
    "user_id": "user123",
    "title": "New Chat"
}

// List conversations
{
    "type": "chat",
    "type": "list_conversations",
    "user_id": "user123"
}

// Select conversation
{
    "type": "chat",
    "type": "select_conversation",
    "conversation_id": "uuid-here"
}

// Regular chat message (now stored in DB)
{
    "type": "chat",
    "type": "user_message",
    "content": "Hello!"
}

5. Server Response Examples

// Conversation created
{
    "type": "chat",
    "status": "conversation_created",
    "conversation": {
        "id": "uuid",
        "title": "New Chat",
        "created_at": "timestamp"
    }
}

// Conversation list
{
    "type": "chat",
    "status": "conversations_list",
    "conversations": [
        {
            "id": "uuid",
            "title": "Chat 1",
            "updated_at": "timestamp"
        }
    ]
}

// Chat response (now includes conversation context)
{
    "type": "chat",
    "status": "streaming",
    "conversation_id": "uuid",
    "content": "Hello! How can I help?",
    "sender": "ai"
}

Implementation Order

  1. Extend message types
  2. Add conversation tracking to ChatHandler
  3. Implement conversation management handlers
  4. Integrate DB storage into message processing
  5. Add error handling for missing/invalid conversations
  6. Update client to handle new message types

This approach:

  • Maintains existing WebSocket architecture
  • Adds persistence transparently
  • Enables conversation management
  • Preserves streaming responses
  • Keeps all communication over WS

Next step: Implement the message type changes and ChatHandler updates?

(Comment from OpenAgents)

@AtlantisPleb
Copy link
Contributor Author

Revised WebSocket Auth & Persistence Plan

WebSocket Authentication

  1. When client initiates WebSocket connection:
    • Must provide session token in cookie (from Scramble OIDC auth)
    • Server validates session in WebSocket upgrade handler
    • On success, stores scramble_id in WebSocketState for the connection
// In src/server/ws/mod.rs
pub async fn ws_handler(
    ws: WebSocketUpgrade,
    cookies: CookieJar,
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    // Get session from cookie
    let session = cookies
        .get(SESSION_COOKIE_NAME)
        .and_then(|cookie| Session::validate(cookie.value(), &state.db_pool).await)
        .ok_or(StatusCode::UNAUTHORIZED)?;

    // Get user from session
    let user = sqlx::query_as!(
        User,
        "SELECT * FROM users WHERE id = $1",
        session.user_id
    )
    .fetch_one(&state.db_pool)
    .await
    .map_err(|_| StatusCode::UNAUTHORIZED)?;

    // Store scramble_id in connection state
    let conn_id = Uuid::new_v4();
    state.ws_state.connections.insert(
        conn_id.to_string(),
        ConnectionState {
            scramble_id: user.scramble_id.clone(),
            // ... other fields
        }
    );

    // Upgrade connection
    ws.on_upgrade(|socket| handle_socket(socket, conn_id, state))
}

ChatHandler Revision

pub struct ChatHandler {
    ws_state: Arc<WebSocketState>,
    github_service: Arc<GitHubService>,
    chat_db: Arc<ChatDatabase>,
    active_conversations: DashMap<String, Uuid>, // conn_id -> conversation_id
}

impl ChatHandler {
    async fn get_user_from_conn(&self, conn_id: &str) -> Result<String> {
        // Get scramble_id from connection state
        self.ws_state
            .connections
            .get(conn_id)
            .map(|conn| conn.scramble_id.clone())
            .ok_or_else(|| anyhow!("No authenticated user for connection"))
    }

    async fn process_message(&self, content: String, conn_id: &str) -> Result<()> {
        // Get user's scramble_id from connection state
        let scramble_id = self.get_user_from_conn(conn_id).await?;
        
        // Get or create active conversation
        let conversation_id = match self.active_conversations.get(conn_id) {
            Some(id) => *id,
            None => {
                // Create new conversation
                let conv = self.chat_db
                    .create_conversation(CreateConversationRequest {
                        scramble_id,
                        title: None,
                    })
                    .await?;
                self.active_conversations.insert(conn_id.to_string(), conv.id);
                conv.id
            }
        };

        // Store message
        self.chat_db
            .add_message(CreateMessageRequest {
                conversation_id,
                role: "user".to_string(),
                content: content.clone(),
                metadata: None,
                tool_calls: None,
            })
            .await?;

        // Process with model (existing code)
        // ...

        Ok(())
    }
}

Key Changes

  1. Authentication happens at WebSocket connection time via session cookie
  2. scramble_id stored in connection state, not passed in messages
  3. Conversations automatically created/managed per connection
  4. No need for explicit user_id in messages
  5. Security maintained through session validation

Message Flow

  1. User authenticates via Scramble OIDC (existing flow)
  2. Client connects WebSocket with session cookie
  3. Server validates session, stores scramble_id
  4. All subsequent messages automatically associated with authenticated user
  5. No user_id needed in messages - server gets from connection state

This approach:

  • Maintains security through session validation
  • Leverages existing Scramble OIDC auth
  • Keeps user identity in server-side state
  • Prevents user_id spoofing
  • Simplifies client-side code

Next steps:

  1. Update WebSocket connection handler with auth
  2. Add scramble_id to connection state
  3. Modify ChatHandler to use connection state
  4. Update tests with auth context

Thoughts on this revised approach?

(Comment from OpenAgents)

@AtlantisPleb
Copy link
Contributor Author

WebSocket Auth & Persistence Plan

WebSocket Authentication & State

  1. When client initiates WebSocket connection:
    • Must provide session token in cookie
    • Server validates session in WebSocket upgrade handler
    • On success, stores user.id in WebSocketState for the connection
// In ConnectionState
pub struct ConnectionState {
    user_id: i32,  // From users.id
    // ... other fields
}

// In WebSocket upgrade handler
pub async fn ws_handler(
    ws: WebSocketUpgrade,
    cookies: CookieJar,
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    // Get session from cookie
    let session = cookies
        .get(SESSION_COOKIE_NAME)
        .and_then(|cookie| Session::validate(cookie.value(), &state.db_pool).await)
        .ok_or(StatusCode::UNAUTHORIZED)?;

    // Get user from session
    let user = sqlx::query_as!(
        User,
        "SELECT * FROM users WHERE id = $1",
        session.user_id
    )
    .fetch_one(&state.db_pool)
    .await
    .map_err(|_| StatusCode::UNAUTHORIZED)?;

    // Store user.id in connection state
    let conn_id = Uuid::new_v4();
    state.ws_state.connections.insert(
        conn_id.to_string(),
        ConnectionState {
            user_id: user.id,
            // ... other fields
        }
    );

    // Upgrade connection
    ws.on_upgrade(|socket| handle_socket(socket, conn_id, state))
}

ChatHandler Revision

pub struct ChatHandler {
    ws_state: Arc<WebSocketState>,
    github_service: Arc<GitHubService>,
    chat_db: Arc<ChatDatabase>,
    active_conversations: DashMap<String, Uuid>, // conn_id -> conversation_id
}

impl ChatHandler {
    async fn get_user_id_from_conn(&self, conn_id: &str) -> Result<i32> {
        // Get user.id from connection state
        self.ws_state
            .connections
            .get(conn_id)
            .map(|conn| conn.user_id)
            .ok_or_else(|| anyhow!("No authenticated user for connection"))
    }

    async fn process_message(&self, content: String, conn_id: &str) -> Result<()> {
        // Get user.id from connection state
        let user_id = self.get_user_id_from_conn(conn_id).await?;
        
        // Get or create active conversation
        let conversation_id = match self.active_conversations.get(conn_id) {
            Some(id) => *id,
            None => {
                // Create new conversation
                let conv = self.chat_db
                    .create_conversation(CreateConversationRequest {
                        user_id,
                        title: None,
                    })
                    .await?;
                self.active_conversations.insert(conn_id.to_string(), conv.id);
                conv.id
            }
        };

        // Store message
        self.chat_db
            .add_message(CreateMessageRequest {
                conversation_id,
                role: "user".to_string(),
                content: content.clone(),
                metadata: None,
                tool_calls: None,
            })
            .await?;

        // Process with model (existing code)
        // ...

        Ok(())
    }
}

Key Points

  1. Store numeric user.id in connection state, not scramble_id
  2. Session validation happens once at connection time
  3. All subsequent operations use the stored user.id
  4. No user identification needed in messages
  5. Conversations automatically created/managed per connection

Message Flow

  1. User authenticates (via any supported method)
  2. Client connects WebSocket with session cookie
  3. Server validates session, stores user.id
  4. All subsequent messages automatically associated with authenticated user
  5. No user identification needed in messages - server gets from connection state

Next steps:

  1. Update WebSocket connection handler with auth
  2. Add user_id to connection state
  3. Modify ChatHandler to use connection state
  4. Update tests with auth context

Would you like me to start with implementing the WebSocket connection handler changes?

(Comment from OpenAgents)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant