Skip to content

Commit 7f93d95

Browse files
committed
fix: refactor mcp
1 parent 780bffa commit 7f93d95

File tree

12 files changed

+1138
-1018
lines changed

12 files changed

+1138
-1018
lines changed

internal/app/app.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/co-browser/agent-browser/internal/events"
1212
"github.com/co-browser/agent-browser/internal/log"
1313
"github.com/co-browser/agent-browser/internal/mcp"
14+
"github.com/co-browser/agent-browser/internal/mcp/config"
1415
"github.com/co-browser/agent-browser/internal/web"
1516
"github.com/co-browser/agent-browser/internal/web/client"
1617
"github.com/co-browser/agent-browser/internal/web/handlers"
@@ -60,7 +61,7 @@ var DatabaseModule = fx.Module("database",
6061
// Currently loads MCP configuration.
6162
var ConfigModule = fx.Module("config",
6263
fx.Provide(
63-
mcp.NewMCPConfig,
64+
config.NewConfig,
6465
),
6566
)
6667

internal/mcp/client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"github.com/mark3labs/mcp-go/client"
66
)
77

8-
// Re-export the SSEMCPClient from mark3labs/mcp-go/client
8+
// SSEMCPClient provides the Server-Sent Events implementation of the MCP client
99
type SSEMCPClient = client.SSEMCPClient
1010

11-
// NewSSEMCPClient creates a new SSE MCP client with a 60 second timeout
11+
// NewSSEMCPClient creates a new SSE MCP client
1212
func NewSSEMCPClient(url string) (*SSEMCPClient, error) {
1313
return client.NewSSEMCPClient(url)
1414
}

internal/mcp/config.go renamed to internal/mcp/config/config.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
// Package mcp implements the MCP server logic.
2-
package mcp
1+
// Package config provides configuration structures for the MCP component.
2+
package config
33

44
import (
55
"time"
@@ -13,11 +13,11 @@ type RemoteMCPServer struct {
1313
URL string `json:"url"`
1414
Name string `json:"name"`
1515
Description string `json:"description"`
16-
ID int64 `json:"-"` // Add ID field, exclude from JSON config if needed
16+
ID int64 `json:"-"` // ID field, excluded from JSON config
1717
}
1818

19-
// MCPServerConfig holds the configuration for the MCP server
20-
type MCPServerConfig struct {
19+
// ServerConfig holds the configuration for the MCP server
20+
type ServerConfig struct {
2121
Port int `json:"port" default:"8087"`
2222
HeartbeatInterval time.Duration `json:"heartbeat_interval" default:"15s"`
2323
HealthCheckInterval time.Duration `json:"health_check_interval" default:"30s"`
@@ -36,12 +36,12 @@ type ConfigParams struct {
3636
type ConfigResult struct {
3737
fx.Out
3838

39-
Config MCPServerConfig
39+
Config ServerConfig
4040
}
4141

42-
// NewMCPConfig creates a new MCP configuration
43-
func NewMCPConfig(p ConfigParams) (ConfigResult, error) {
44-
config := MCPServerConfig{
42+
// NewConfig creates a new MCP configuration
43+
func NewConfig(p ConfigParams) (ConfigResult, error) {
44+
config := ServerConfig{
4545
Port: 8087,
4646
HeartbeatInterval: 15 * time.Second,
4747
HealthCheckInterval: 30 * time.Second,
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Package connection implements MCP connection handling.
2+
package connection
3+
4+
import (
5+
"context"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/co-browser/agent-browser/internal/log"
11+
mcpclient "github.com/co-browser/agent-browser/internal/mcp/client"
12+
"github.com/co-browser/agent-browser/internal/mcp/config"
13+
"github.com/mark3labs/mcp-go/mcp"
14+
)
15+
16+
// MCPConnection represents a connection to a remote MCP server
17+
type MCPConnection struct {
18+
Client *mcpclient.SSEMCPClient
19+
URL string
20+
Ctx context.Context
21+
Cancel context.CancelFunc
22+
Tools []mcp.Tool
23+
mu sync.RWMutex
24+
}
25+
26+
// GetTools returns the tools list in a thread-safe way
27+
func (c *MCPConnection) GetTools() []mcp.Tool {
28+
c.mu.RLock()
29+
defer c.mu.RUnlock()
30+
return c.Tools
31+
}
32+
33+
// RemoteConnections stores active connections to remote servers
34+
var RemoteConnections = make(map[string]*MCPConnection)
35+
var ConnectionsMutex sync.RWMutex
36+
37+
// ConnectToRemoteServer establishes a connection to a remote MCP server
38+
func ConnectToRemoteServer(ctx context.Context, logger log.Logger, remote config.RemoteMCPServer) (*MCPConnection, error) {
39+
// Check for existing connection
40+
ConnectionsMutex.RLock()
41+
existingConn, alreadyConnected := RemoteConnections[remote.URL]
42+
ConnectionsMutex.RUnlock()
43+
44+
if alreadyConnected && existingConn.Ctx != nil && existingConn.Ctx.Err() == nil {
45+
// Verify connection is actually healthy before reusing
46+
if existingConn.Client != nil {
47+
checkCtx, checkCancel := context.WithTimeout(existingConn.Ctx, 3*time.Second)
48+
toolsRequest := mcp.ListToolsRequest{}
49+
_, err := existingConn.Client.ListTools(checkCtx, toolsRequest)
50+
checkCancel()
51+
if err == nil {
52+
logger.Debug().Str("url", remote.URL).Msg("Reusing existing healthy connection")
53+
return existingConn, nil
54+
}
55+
logger.Warn().Err(err).Str("url", remote.URL).Msg("Existing connection found but failed health check, proceeding to reconnect.")
56+
}
57+
}
58+
59+
// Create new connection
60+
logger.Debug().Str("url", remote.URL).Msg("Creating new MCP client")
61+
mcpClient, err := mcpclient.NewSSEMCPClient(remote.URL)
62+
if err != nil {
63+
return nil, fmt.Errorf("failed to create client for %s: %w", remote.URL, err)
64+
}
65+
66+
// Create connection context
67+
connCtx, baseCancel := context.WithCancel(ctx)
68+
cancel := func() {
69+
logger.Warn().Str("url", remote.URL).Msgf("Cancelling connection context")
70+
baseCancel()
71+
}
72+
73+
// Start client
74+
logger.Debug().Str("url", remote.URL).Msg("Starting new MCP client")
75+
if err := mcpClient.Start(connCtx); err != nil {
76+
cancel()
77+
return nil, fmt.Errorf("failed to start client: %w", err)
78+
}
79+
80+
// Initialize the client
81+
logger.Debug().Str("url", remote.URL).Msg("Initializing new MCP client")
82+
initRequest := mcp.InitializeRequest{}
83+
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
84+
initRequest.Params.ClientInfo = mcp.Implementation{
85+
Name: "cobrowser-agent",
86+
Version: "1.0.0",
87+
}
88+
if _, err := mcpClient.Initialize(connCtx, initRequest); err != nil {
89+
cancel()
90+
mcpClient.Close()
91+
return nil, fmt.Errorf("failed to initialize client: %w", err)
92+
}
93+
94+
// Get initial tools list
95+
logger.Debug().Str("url", remote.URL).Msg("Listing tools from new MCP client")
96+
toolsRequest := mcp.ListToolsRequest{}
97+
toolsResult, err := mcpClient.ListTools(connCtx, toolsRequest)
98+
if err != nil {
99+
cancel()
100+
mcpClient.Close()
101+
return nil, fmt.Errorf("failed to list tools: %w", err)
102+
}
103+
104+
// Create the new connection object
105+
newConn := &MCPConnection{
106+
Client: mcpClient,
107+
URL: remote.URL,
108+
Ctx: connCtx,
109+
Cancel: cancel,
110+
Tools: toolsResult.Tools,
111+
}
112+
113+
// Replace any existing connection
114+
ConnectionsMutex.Lock()
115+
connToReplace, existsNow := RemoteConnections[remote.URL]
116+
var oldConnToCleanup *MCPConnection
117+
if existsNow && connToReplace != nil {
118+
if connToReplace != newConn {
119+
logger.Warn().Str("url", remote.URL).Msg("Marking existing connection for cleanup.")
120+
oldConnToCleanup = connToReplace
121+
}
122+
} else if alreadyConnected && existingConn != nil {
123+
if existingConn != newConn {
124+
logger.Info().Str("url", remote.URL).Msg("Marking previous connection for cleanup.")
125+
oldConnToCleanup = existingConn
126+
}
127+
}
128+
129+
// Store the new connection
130+
RemoteConnections[remote.URL] = newConn
131+
ConnectionsMutex.Unlock()
132+
133+
// Clean up old connection asynchronously if necessary
134+
if oldConnToCleanup != nil {
135+
go func(connToClose *MCPConnection) {
136+
logger.Info().Str("url", connToClose.URL).Msg("Starting asynchronous cleanup of old connection...")
137+
time.Sleep(50 * time.Millisecond)
138+
if connToClose.Cancel != nil {
139+
logger.Info().Str("url", connToClose.URL).Msg("Asynchronously cancelling old connection context.")
140+
connToClose.Cancel()
141+
}
142+
if connToClose.Client != nil {
143+
logger.Info().Str("url", connToClose.URL).Msg("Asynchronously closing old connection client.")
144+
connToClose.Client.Close()
145+
}
146+
logger.Info().Str("url", connToClose.URL).Msg("Asynchronous cleanup of old connection finished.")
147+
}(oldConnToCleanup)
148+
}
149+
150+
logger.Info().
151+
Str("url", remote.URL).
152+
Int("tool_count", len(newConn.Tools)).
153+
Msg("Successfully established and stored new MCP connection")
154+
155+
return newConn, nil
156+
}
157+
158+
// CleanupConnections closes all active connections
159+
func CleanupConnections(logger log.Logger) {
160+
ConnectionsMutex.Lock()
161+
defer ConnectionsMutex.Unlock()
162+
163+
for url, conn := range RemoteConnections {
164+
if conn.Cancel != nil {
165+
conn.Cancel()
166+
}
167+
if conn.Client != nil {
168+
conn.Client.Close()
169+
}
170+
delete(RemoteConnections, url)
171+
}
172+
173+
logger.Info().Msg("All connections cleaned up")
174+
}

internal/mcp/handlers/handlers.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Package handlers provides event handler functionality for MCP.
2+
package handlers
3+
4+
import (
5+
"github.com/co-browser/agent-browser/internal/backend/models"
6+
"github.com/co-browser/agent-browser/internal/events"
7+
"github.com/co-browser/agent-browser/internal/log"
8+
"github.com/co-browser/agent-browser/internal/mcp/config"
9+
)
10+
11+
// ConnectionStateManager defines the interface for connection state management
12+
type ConnectionStateManager interface {
13+
SetConnectionState(url string, state models.ConnectionState)
14+
GetConnectionState(url string) models.ConnectionState
15+
ConnectWithRetry(remote config.RemoteMCPServer)
16+
UpdateServerTools(serverURL string, fetchedTools interface{})
17+
RefreshMCPServerTools()
18+
}
19+
20+
// HandleServerAdded handles a ServerAddedEvent
21+
func HandleServerAdded(event events.Event, manager ConnectionStateManager, logger log.Logger) {
22+
// Type assert to the specific event type
23+
addedEvent, ok := event.(*events.ServerAddedEvent)
24+
if !ok {
25+
logger.Error().Str("eventType", string(event.Type())).Msg("Received event of unexpected type in handleServerAdded")
26+
return
27+
}
28+
29+
serverURL := addedEvent.Server.URL
30+
serverName := addedEvent.Server.Name
31+
serverID := addedEvent.Server.ID
32+
33+
logger.Info().
34+
Str("url", serverURL).
35+
Str("name", serverName).
36+
Int64("id", serverID).
37+
Msg("Received ServerAddedEvent, attempting connection")
38+
39+
// Start connection attempt
40+
remote := config.RemoteMCPServer{
41+
URL: serverURL,
42+
Name: serverName,
43+
ID: serverID,
44+
}
45+
46+
go manager.ConnectWithRetry(remote)
47+
}
48+
49+
// HandleServerRemoved handles a ServerRemovedEvent
50+
func HandleServerRemoved(event events.Event, manager ConnectionStateManager, logger log.Logger, eventBus events.Bus) {
51+
// Type assert to the specific event type
52+
removedEvent, ok := event.(*events.ServerRemovedEvent)
53+
if !ok {
54+
logger.Error().Str("eventType", string(event.Type())).Msg("Received event of unexpected type in handleServerRemoved")
55+
return
56+
}
57+
58+
logger.Info().
59+
Str("url", removedEvent.ServerURL).
60+
Int64("id", removedEvent.ServerID).
61+
Msg("Received ServerRemovedEvent, stopping connection")
62+
63+
// Update server tools to remove the tools from this server
64+
manager.UpdateServerTools(removedEvent.ServerURL, nil)
65+
66+
// Refresh server tools to update available tools
67+
manager.RefreshMCPServerTools()
68+
69+
// Publish event to signal local tools changed
70+
logger.Info().Msg("Publishing LocalToolsRefreshedEvent after server removal.")
71+
eventBus.Publish(events.NewLocalToolsRefreshedEvent())
72+
}
73+
74+
// HandleToolsProcessed handles a ToolsProcessedInDBEvent
75+
func HandleToolsProcessed(event events.Event, manager ConnectionStateManager, logger log.Logger, eventBus events.Bus) {
76+
processedEvent, ok := event.(*events.ToolsProcessedInDBEvent)
77+
if !ok {
78+
logger.Error().Str("eventType", string(event.Type())).Msg("Received event of unexpected type in handleToolsProcessed")
79+
return
80+
}
81+
82+
logger.Info().
83+
Int64("serverID", processedEvent.ServerID).
84+
Str("url", processedEvent.ServerURL).
85+
Msg("Received ToolsProcessedInDBEvent, refreshing MCP server tools.")
86+
87+
// Refresh the tools served by the MCP server
88+
manager.RefreshMCPServerTools()
89+
90+
// Publish event to signal that local tools might have changed
91+
logger.Info().Msg("Publishing LocalToolsRefreshedEvent after MCP server tool refresh.")
92+
eventBus.Publish(events.NewLocalToolsRefreshedEvent())
93+
}
94+
95+
// PublishServerStateChange publishes a ServerStatusChangedEvent
96+
func PublishServerStateChange(
97+
serverURL string,
98+
serverID int64,
99+
state models.ConnectionState,
100+
eventBus events.Bus,
101+
logger log.Logger,
102+
) {
103+
// Determine error string for event
104+
var errStr *string
105+
if state == models.ConnectionStateFailed {
106+
errMsg := "connection failed"
107+
errStr = &errMsg
108+
}
109+
110+
logger.Info().
111+
Str("url", serverURL).
112+
Int64("id", serverID).
113+
Str("state", string(state)).
114+
Msg("Publishing ServerStatusChangedEvent")
115+
116+
// Publish event with the server ID
117+
eventBus.Publish(events.NewServerStatusChangedEvent(
118+
serverID,
119+
serverURL,
120+
state,
121+
errStr,
122+
))
123+
}

0 commit comments

Comments
 (0)