Skip to content

[Feature Request]: Resource access audit trail for compliance and security #1223

@crivetimihai

Description

@crivetimihai

Feature Summary

Add comprehensive audit logging for resource access operations (reads, lists, queries) to complement the existing permission check auditing. This would provide a complete audit trail for compliance, security monitoring, and usage analytics.

Motivation

Currently, MCP Gateway only logs permission checks via PermissionAuditLog, but does not track:

  • When users access/read specific resources
  • What resources are being queried/listed
  • Usage patterns and access frequency
  • Potential unauthorized access attempts (even if permissions passed)

This creates gaps in:

  1. Compliance auditing: Cannot prove who accessed what data and when
  2. Security monitoring: Cannot detect unusual access patterns
  3. Usage analytics: Cannot track resource popularity or user behavior
  4. Forensic investigation: Limited ability to trace actions after security incidents

Proposed Solution

1. New Database Table: resource_access_log

class ResourceAccessLog(Base):
    """Audit log for resource access operations."""
    __tablename__ = "resource_access_log"
    
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utc_now, index=True)
    
    # User and request info
    user_email: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
    ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)
    user_agent: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    
    # Resource info
    resource_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True)  # tool, server, gateway, etc.
    resource_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
    resource_name: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    
    # Action and context
    action: Mapped[str] = mapped_column(String(50), nullable=False)  # read, list, query, invoke
    team_id: Mapped[Optional[str]] = mapped_column(String(36), nullable=True, index=True)
    
    # Result
    success: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
    error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    
    # Performance
    response_time_ms: Mapped[Optional[float]] = mapped_column(Float, nullable=True)

2. Audit Service Implementation

class ResourceAuditService:
    """Service for logging resource access."""
    
    @staticmethod
    async def log_access(
        db: Session,
        user_email: str,
        resource_type: str,
        resource_id: str,
        action: str,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None,
        team_id: Optional[str] = None,
        success: bool = True,
        error_message: Optional[str] = None,
        response_time_ms: Optional[float] = None
    ):
        """Log a resource access event."""
        try:
            audit_entry = ResourceAccessLog(
                user_email=user_email,
                resource_type=resource_type,
                resource_id=resource_id,
                action=action,
                ip_address=ip_address,
                user_agent=user_agent,
                team_id=team_id,
                success=success,
                error_message=error_message,
                response_time_ms=response_time_ms,
                timestamp=utc_now()
            )
            db.add(audit_entry)
            # Commit asynchronously to not block the request
            db.flush()
        except Exception as e:
            logger.error(f"Failed to log resource access: {e}")
            # Don't fail the request if audit logging fails

3. Integration Points

Add audit logging to:

Read operations:

async def get_server(self, db: Session, server_id: str, user_email: str) -> ServerRead:
    server = db.get(DbServer, server_id)
    if not server:
        raise ServerNotFoundError(...)
    
    # Log the access
    await ResourceAuditService.log_access(
        db=db,
        user_email=user_email,
        resource_type="server",
        resource_id=server_id,
        action="read"
    )
    
    return self._convert_server_to_read(server)

List operations:

async def list_servers_for_user(self, db: Session, user_email: str, ...) -> List[ServerRead]:
    servers = db.execute(query).scalars().all()
    
    # Log the list access with count
    await ResourceAuditService.log_access(
        db=db,
        user_email=user_email,
        resource_type="server",
        resource_id="_list",
        action="list",
        response_time_ms=elapsed_ms
    )
    
    return [self._convert_server_to_read(s) for s in servers]

Tool invocations (already have metrics, extend with audit):

async def invoke_tool(self, tool_id: str, user_email: str, ...):
    # ... existing code ...
    
    # Log tool invocation
    await ResourceAuditService.log_access(
        db=db,
        user_email=user_email,
        resource_type="tool",
        resource_id=tool_id,
        action="invoke",
        success=not result.is_error,
        response_time_ms=response_time * 1000
    )

4. Configuration

Add environment variables for audit control:

# Enable/disable resource access auditing
MCPGATEWAY_AUDIT_ENABLED=true

# Retention policy (days)
MCPGATEWAY_AUDIT_RETENTION_DAYS=90

# Log only specific resource types
MCPGATEWAY_AUDIT_RESOURCE_TYPES=tool,server,gateway

# Log only specific actions
MCPGATEWAY_AUDIT_ACTIONS=read,invoke,delete

5. Admin UI Enhancements

Add new "Audit Log" tab showing:

  • Recent resource access events
  • Filter by user, resource type, date range
  • Export audit logs for compliance
  • Access pattern visualizations

6. API Endpoints

GET /admin/audit/access?user_email=&resource_type=&start_date=&end_date=
GET /admin/audit/access/export
GET /admin/audit/access/stats

Benefits

  1. Compliance: Meet regulatory requirements (SOC 2, HIPAA, GDPR)
  2. Security: Detect anomalous access patterns
  3. Analytics: Understand resource usage and user behavior
  4. Forensics: Investigate security incidents
  5. Accountability: Clear audit trail of all actions

Related Standards

  • SOC 2 Type II: Requires comprehensive audit logging
  • HIPAA: Mandates access logs for PHI
  • GDPR: Requires audit trails for data access
  • ISO 27001: Access logging is a key control

Alternative Approaches

  1. Use existing PermissionAuditLog: Could extend this table, but mixing permission checks with resource access makes queries complex
  2. External logging: Use Elasticsearch/Splunk, but adds infrastructure complexity
  3. No logging: Current state - not suitable for production/enterprise use

Implementation Priority

Priority: Medium-High
Effort: Medium (2-3 days)
Risk: Low (audit logging shouldn't break existing functionality)

Related Issue

Split from #969 (Backend Multi-Tenancy Issues) - Issue #5

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestpythonPython / backend development (FastAPI)

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions