From 29951f3bb0e7d7ad818818ae21e91cb84a357b06 Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Thu, 17 Apr 2025 10:02:39 +0200 Subject: [PATCH 1/9] feat(sse): add sse server --- package.json | 4 + src/app.ts | 11 ++- src/commands/start-sse-server.ts | 136 +++++++++++++++++++++++++++++ src/server/init-server.ts | 141 +++++++++++++++++++++++++++++++ src/server/types.ts | 4 + 5 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 src/commands/start-sse-server.ts create mode 100644 src/server/init-server.ts create mode 100644 src/server/types.ts diff --git a/package.json b/package.json index 82e94e1..46af39f 100644 --- a/package.json +++ b/package.json @@ -19,12 +19,16 @@ "@modelcontextprotocol/sdk": "^1.11.0", "algoliasearch": "^5.23.4", "commander": "^13.1.0", + "cors": "^2.8.5", + "express": "^5.1.0", "open": "^10.1.0", "zod": "^3.24.2" }, "devDependencies": { "@eslint/js": "^9.24.0", "@modelcontextprotocol/inspector": "^0.9.0", + "@types/cors": "^2.8.17", + "@types/express": "^5.0.1", "@types/node": "^22.14.0", "@vitest/coverage-v8": "^3.1.1", "bun": "^1.2.9", diff --git a/src/app.ts b/src/app.ts index dc2992f..303fc9d 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,5 +1,5 @@ import { Command } from "commander"; -import { type StartServerOptions } from "./commands/start-server.ts"; +import type { StartServerOptions } from "./server/types.ts"; import { type ListToolsOptions } from "./commands/list-tools.ts"; const program = new Command("algolia-mcp"); @@ -67,6 +67,15 @@ program await startServer(opts); }); +program + .command("start-sse-server") + .description("Starts the remote-ready Algolia MCP server") + .option(...ALLOW_TOOLS_OPTIONS_TUPLE) + .action(async (opts: StartServerOptions) => { + const { startSseServer } = await import("./commands/start-sse-server.ts"); + await startSseServer(opts); + }); + program .command("authenticate") .description("Authenticate with Algolia") diff --git a/src/commands/start-sse-server.ts b/src/commands/start-sse-server.ts new file mode 100644 index 0000000..8244463 --- /dev/null +++ b/src/commands/start-sse-server.ts @@ -0,0 +1,136 @@ +import express, { Request, Response } from "express"; +import cors from "cors"; +import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; +import type { StartServerOptions } from "../server/types.ts"; +import { initMCPServer } from "../server/init-server.ts"; + +export async function startSseServer(opts: StartServerOptions) { + try { + const app = express(); + app.use( + cors({ + origin: process.env.ALLOWED_ORIGINS?.split(",") || "*", + methods: ["GET", "POST"], + allowedHeaders: ["Content-Type", "Authorization"], + }), + ); + + // Store active connections + const connections = new Map(); + + // Health check endpoint + app.get("/health", (req, res) => { + res.status(200).json({ + status: "ok", + version: "1.0.0", + uptime: process.uptime(), + timestamp: new Date().toISOString(), + connections: connections.size, + }); + }); + + // SSE connection establishment endpoint + app.get("/sse", async (req, res) => { + // Instantiate SSE transport object + const transport = new SSEServerTransport("/messages", res); + // Get sessionId + const sessionId = transport.sessionId; + console.log(`[${new Date().toISOString()}] New SSE connection established: ${sessionId}`); + + // Register connection + connections.set(sessionId, transport); + + // Connection interruption handling + req.on("close", () => { + console.log(`[${new Date().toISOString()}] SSE connection closed: ${sessionId}`); + connections.delete(sessionId); + }); + + // Connect the transport object to the MCP server + const mcpServer = await initMCPServer(opts); + await mcpServer.connect(transport); + console.log(`[${new Date().toISOString()}] MCP server connection successful: ${sessionId}`); + }); + + // Endpoint for receiving client messages + app.post("/messages", async (req: Request, res: Response) => { + try { + console.log(`[${new Date().toISOString()}] Received client message:`, req.query); + const sessionId = req.query.sessionId as string; + + // Find the corresponding SSE connection and process the message + if (connections.size > 0) { + const transport: SSEServerTransport = connections.get(sessionId) as SSEServerTransport; + // Use transport to process messages + if (transport) { + await transport.handlePostMessage(req, res); + } else { + throw new Error("No active SSE connection"); + } + } else { + throw new Error("No active SSE connection"); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (error: any) { + console.error(`[${new Date().toISOString()}] Failed to process client message:`, error); + res.status(500).json({ error: "Failed to process message", message: error.message }); + } + }); + + // Graceful shutdown of all connections + async function closeAllConnections() { + console.log( + `[${new Date().toISOString()}] Closing all connections (${connections.size} active)`, + ); + for (const [id, transport] of connections.entries()) { + try { + // Send shutdown event + transport.res.write( + 'event: server_shutdown\ndata: {"reason": "Server is shutting down"}\n\n', + ); + transport.res.end(); + console.log(`[${new Date().toISOString()}] Connection closed: ${id}`); + } catch (error) { + console.error(`[${new Date().toISOString()}] Failed to close connection: ${id}`, error); + } + } + connections.clear(); + } + + // Error handling + app.use((err: Error, _: Request, res: Response) => { + console.error(`[${new Date().toISOString()}] Unhandled exception:`, err); + res.status(500).json({ error: "Server internal error" }); + }); + + // Graceful shutdown + process.on("SIGTERM", async () => { + console.log(`[${new Date().toISOString()}] Received SIGTERM signal, preparing to close`); + await closeAllConnections(); + server.close(() => { + console.log(`[${new Date().toISOString()}] Server closed`); + process.exit(0); + }); + }); + + process.on("SIGINT", async () => { + console.log(`[${new Date().toISOString()}] Received SIGINT signal, preparing to close`); + await closeAllConnections(); + process.exit(0); + }); + + // Start server + const port = 4243; + const server = app.listen(port, () => { + console.log( + `[${new Date().toISOString()}] Algolia MCP SSE server started, address: http://localhost:${port}`, + ); + console.log(`- SSE connection endpoint: http://localhost:${port}/sse`); + console.log(`- Message processing endpoint: http://localhost:${port}/messages`); + console.log(`- Health check endpoint: http://localhost:${port}/health`); + }); + } catch (err) { + console.error("Error starting server:", err); + process.exit(1); + } +} diff --git a/src/server/init-server.ts b/src/server/init-server.ts new file mode 100644 index 0000000..0758efd --- /dev/null +++ b/src/server/init-server.ts @@ -0,0 +1,141 @@ +import type { StartServerOptions } from "./types.ts"; +import { AppStateManager } from "../appState.ts"; +import { authenticate } from "../authentication.ts"; +import { DashboardApi } from "../DashboardApi.ts"; +import { CONFIG } from "../config.ts"; +import { getToolFilter, isToolAllowed } from "../toolFilters.ts"; +import { operationId as GetUserInfoOperationId, registerGetUserInfo } from "../tools/registerGetUserInfo.ts"; +import { + operationId as GetApplicationsOperationId, + registerGetApplications +} from "../tools/registerGetApplications.ts"; +import { registerOpenApiTools } from "../tools/registerOpenApi.ts"; +import { + ABTestingSpec, + AnalyticsSpec, + IngestionSpec, + MonitoringSpec, + RecommendSpec, + SearchSpec, + UsageSpec +} from "../openApi.ts"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; + +export async function initMCPServer(opts: StartServerOptions): Promise { + try { + const appState = await AppStateManager.load(); + + if (!appState.get("accessToken")) { + const token = await authenticate(); + + await appState.update({ + accessToken: token.access_token, + refreshToken: token.refresh_token, + }); + } + + const dashboardApi = new DashboardApi({ + baseUrl: CONFIG.dashboardApiBaseUrl, + appState, + }); + + const server = new McpServer({ + name: "algolia", + version: "1.0.0", + capabilities: { + resources: {}, + tools: {}, + }, + }); + + const toolFilter = getToolFilter(opts); + + // Dashboard API Tools + if (isToolAllowed(GetUserInfoOperationId, toolFilter)) { + registerGetUserInfo(server, dashboardApi); + } + + if (isToolAllowed(GetApplicationsOperationId, toolFilter)) { + registerGetApplications(server, dashboardApi); + } + + // Search API Tools + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: SearchSpec, + toolFilter, + }); + + // Analytics API Tools + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: AnalyticsSpec, + toolFilter, + }); + + // Recommend API Tools + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: RecommendSpec, + toolFilter, + }); + + // AB Testing + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: ABTestingSpec, + toolFilter, + }); + + // Monitoring API Tools + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: MonitoringSpec, + toolFilter, + }); + + // Usage + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: UsageSpec, + toolFilter, + }); + + // Ingestion API Tools + registerOpenApiTools({ + server, + dashboardApi, + openApiSpec: IngestionSpec, + toolFilter, + requestMiddlewares: [ + // Dirty fix for Claud hallucinating regions + async ({ request, params }) => { + const application = await dashboardApi.getApplication(params.applicationId); + const region = application.data.attributes.log_region === "de" ? "eu" : "us"; + + const url = new URL(request.url); + const regionFromUrl = url.hostname.match(/data\.(.+)\.algolia.com/)?.[0]; + + if (regionFromUrl !== region) { + console.error("Had to adjust region from", regionFromUrl, "to", region); + url.hostname = `data.${region}.algolia.com`; + return new Request(url, request.clone()); + } + + return request; + }, + ], + }); + + return server; + } catch (err) { + console.error("Error starting server:", err); + process.exit(1); + } +} \ No newline at end of file diff --git a/src/server/types.ts b/src/server/types.ts new file mode 100644 index 0000000..7d83a9a --- /dev/null +++ b/src/server/types.ts @@ -0,0 +1,4 @@ +import type { CliFilteringOptions } from "../toolFilters.ts"; + + +export type StartServerOptions = CliFilteringOptions; \ No newline at end of file From 307055d27105f462570b64352275a02898b61030 Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Thu, 17 Apr 2025 10:41:09 +0200 Subject: [PATCH 2/9] chore: put back the bin in dist to avoid it being pushed to git --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 46af39f..54565a0 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "type-check": "tsc --noEmit", "lint": "eslint --ext .ts src", "test": "vitest", - "build": "bun build ./src/app.ts --compile", + "build": "bun build ./src/app.ts --compile --outfile dist/app", "debug": "mcp-inspector npm start" }, "type": "module", From cbbe2a1aa3f85cf61c27ca8a9d741214d231203c Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Fri, 18 Apr 2025 09:33:38 +0200 Subject: [PATCH 3/9] chore: package lock --- package-lock.json | 111 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/package-lock.json b/package-lock.json index 7416441..5590077 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,12 +12,16 @@ "@modelcontextprotocol/sdk": "^1.11.0", "algoliasearch": "^5.23.4", "commander": "^13.1.0", + "cors": "^2.8.5", + "express": "^5.1.0", "open": "^10.1.0", "zod": "^3.24.2" }, "devDependencies": { "@eslint/js": "^9.24.0", "@modelcontextprotocol/inspector": "^0.9.0", + "@types/cors": "^2.8.17", + "@types/express": "^5.0.1", "@types/node": "^22.14.0", "@vitest/coverage-v8": "^3.1.1", "bun": "^1.2.9", @@ -3089,6 +3093,27 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/body-parser": { + "version": "1.19.5", + "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.5.tgz", + "integrity": "sha512-fB3Zu92ucau0iQ0JMCFQE7b/dv8Ot07NI3KaZIkIUNXq82k4eBAqUaneXfleGY9JWskeS9y+u0nXMyspcuQrCg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/connect": "*", + "@types/node": "*" + } + }, + "node_modules/@types/connect": { + "version": "3.4.38", + "resolved": "https://registry.npmjs.org/@types/connect/-/connect-3.4.38.tgz", + "integrity": "sha512-K6uROf1LD88uDQqJCktA4yzL1YYAK6NgfsI0v/mTgyPKWsX1CnJ0XPSDhViejru1GcRkLWb8RlzFYJRqGUbaug==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/cookie": { "version": "0.6.0", "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.6.0.tgz", @@ -3096,6 +3121,16 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/estree": { "version": "1.0.7", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.7.tgz", @@ -3103,6 +3138,38 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/express": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/@types/express/-/express-5.0.1.tgz", + "integrity": "sha512-UZUw8vjpWFXuDnjFTh7/5c2TWDlQqeXHi6hcN7F2XSVT5P+WmUnnbFS3KA6Jnc6IsEqI2qCVu2bK0R0J4A8ZQQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/body-parser": "*", + "@types/express-serve-static-core": "^5.0.0", + "@types/serve-static": "*" + } + }, + "node_modules/@types/express-serve-static-core": { + "version": "5.0.6", + "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-5.0.6.tgz", + "integrity": "sha512-3xhRnjJPkULekpSzgtoNYYcTWgEZkp4myc+Saevii5JPnHNvHMRlBSHDbs7Bh1iPPoVTERHEZXyhyLbMEsExsA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*", + "@types/qs": "*", + "@types/range-parser": "*", + "@types/send": "*" + } + }, + "node_modules/@types/http-errors": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/@types/http-errors/-/http-errors-2.0.4.tgz", + "integrity": "sha512-D0CFMMtydbJAegzOyHjtiKPLlvnm3iTZyZRSZoLq2mRhDdmLfIWOCYPfQJ4cu2erKghU++QvjcUjp/5h7hESpA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/json-schema": { "version": "7.0.15", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.15.tgz", @@ -3110,6 +3177,13 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/mime": { + "version": "1.3.5", + "resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz", + "integrity": "sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/node": { "version": "22.14.0", "resolved": "https://registry.npmjs.org/@types/node/-/node-22.14.0.tgz", @@ -3120,6 +3194,43 @@ "undici-types": "~6.21.0" } }, + "node_modules/@types/qs": { + "version": "6.9.18", + "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.18.tgz", + "integrity": "sha512-kK7dgTYDyGqS+e2Q4aK9X3D7q234CIZ1Bv0q/7Z5IwRDoADNU81xXJK/YVyLbLTZCoIwUoDoffFeF+p/eIklAA==", + "dev": true, + "license": "MIT" + }, + "node_modules/@types/range-parser": { + "version": "1.2.7", + "resolved": "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.7.tgz", + "integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ==", + "dev": true, + "license": "MIT" + }, + "node_modules/@types/send": { + "version": "0.17.4", + "resolved": "https://registry.npmjs.org/@types/send/-/send-0.17.4.tgz", + "integrity": "sha512-x2EM6TJOybec7c52BX0ZspPodMsQUd5L6PRwOunVyVUhXiBSKf3AezDL8Dgvgt5o0UfKNfuA0eMLr2wLT4AiBA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/mime": "^1", + "@types/node": "*" + } + }, + "node_modules/@types/serve-static": { + "version": "1.15.7", + "resolved": "https://registry.npmjs.org/@types/serve-static/-/serve-static-1.15.7.tgz", + "integrity": "sha512-W8Ym+h8nhuRwaKPaDw34QUkwsGi6Rc4yYqvKFo5rm2FUEhCFbzVWrxXUxuKK8TASjWsysJY0nsmNCGhCOIsrOw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/http-errors": "*", + "@types/node": "*", + "@types/send": "*" + } + }, "node_modules/@types/statuses": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/@types/statuses/-/statuses-2.0.5.tgz", From 0cbf868f355f88247c5af50bad72bd223fd9f74c Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Fri, 18 Apr 2025 09:33:47 +0200 Subject: [PATCH 4/9] chore: add collections in common file --- src/server/init-server.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/server/init-server.ts b/src/server/init-server.ts index 0758efd..e21add4 100644 --- a/src/server/init-server.ts +++ b/src/server/init-server.ts @@ -12,7 +12,7 @@ import { import { registerOpenApiTools } from "../tools/registerOpenApi.ts"; import { ABTestingSpec, - AnalyticsSpec, + AnalyticsSpec, CollectionsSpec, IngestionSpec, MonitoringSpec, RecommendSpec, @@ -133,6 +133,14 @@ export async function initMCPServer(opts: StartServerOptions): Promise Date: Fri, 18 Apr 2025 09:35:33 +0200 Subject: [PATCH 5/9] chore: lint --- src/commands/start-sse-server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/start-sse-server.ts b/src/commands/start-sse-server.ts index 8244463..625d54c 100644 --- a/src/commands/start-sse-server.ts +++ b/src/commands/start-sse-server.ts @@ -1,4 +1,4 @@ -import express, { Request, Response } from "express"; +import express, { type Request, type Response } from "express"; import cors from "cors"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import type { StartServerOptions } from "../server/types.ts"; From f0e9118f8c07aded08575ab72ad943f54f1909a2 Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Fri, 18 Apr 2025 12:03:35 +0200 Subject: [PATCH 6/9] chore: match new readme --- README.md | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9b5d061..be7cdc9 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,23 @@ Here are some example prompts to get you started: > [!TIP] > You can refer to the [official documentation](https://modelcontextprotocol.io/quickstart/user) for Claude Desktop. +#### SSE Server specific configuration +To run the SSE version, as Claude Desktop doesn't natively support it yet, you'll have to use a gateway: +```json +{ + "mcpServers": { + "algolia-mcp": { + "command": "/npx", + "args": [ + "-y", + "mcp-remote", + "http://localhost:4243/sse" + ] + } + } +} +``` + ### CLI Options #### Available Commands @@ -131,14 +148,15 @@ Here are some example prompts to get you started: Usage: algolia-mcp [options] [command] Options: - -h, --help display help for command + -h, --help Display help for command Commands: - start-server [options] Starts the Algolia MCP server - authenticate Authenticate with Algolia - logout Remove all stored credentials - list-tools List all available tools - help [command] display help for command + start-server [options] Starts the Algolia MCP server + start-sse-server [options] Starts the Algolia MCP SSE server + authenticate Authenticate with Algolia + logout Remove all stored credentials + list-tools List all available tools + help [command] Display help for command ``` #### Server Options From f497fb99f743c2f44c8a6cb5d579582c791312b7 Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Tue, 22 Apr 2025 17:35:50 +0200 Subject: [PATCH 7/9] chore: merge --- src/commands/start-server.ts | 193 +------------------------------ src/commands/start-sse-server.ts | 5 +- src/server/init-server.ts | 64 +++++++++- 3 files changed, 64 insertions(+), 198 deletions(-) diff --git a/src/commands/start-server.ts b/src/commands/start-server.ts index 65e0efb..3346ba0 100644 --- a/src/commands/start-server.ts +++ b/src/commands/start-server.ts @@ -1,201 +1,14 @@ #!/usr/bin/env -S node --experimental-strip-types -import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; -import { authenticate } from "../authentication.ts"; -import { AppStateManager } from "../appState.ts"; -import { DashboardApi } from "../DashboardApi.ts"; -import { - operationId as GetUserInfoOperationId, - registerGetUserInfo, -} from "../tools/registerGetUserInfo.ts"; -import { - operationId as GetApplicationsOperationId, - registerGetApplications, -} from "../tools/registerGetApplications.ts"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; -import { registerOpenApiTools } from "../tools/registerOpenApi.ts"; -import { CONFIG } from "../config.ts"; -import { - ABTestingSpec, - AnalyticsSpec, - CollectionsSpec, - IngestionSpec, - MonitoringSpec, - QuerySuggestionsSpec, - RecommendSpec, - SearchSpec, - UsageSpec, -} from "../openApi.ts"; -import { type CliFilteringOptions, getToolFilter, isToolAllowed } from "../toolFilters.ts"; -import { - operationId as SetAttributesForFacetingOperationId, - registerSetAttributesForFaceting, -} from "../tools/registerSetAttributesForFaceting.ts"; -import { - registerSetCustomRanking, - operationId as SetCustomRankingOperationId, -} from "../tools/registerSetCustomRanking.ts"; +import { type CliFilteringOptions } from "../toolFilters.ts"; +import { initMCPServer } from "../server/init-server.ts"; export type StartServerOptions = CliFilteringOptions; export async function startServer(opts: StartServerOptions) { try { - const appState = await AppStateManager.load(); - - if (!appState.get("accessToken")) { - const token = await authenticate(); - - await appState.update({ - accessToken: token.access_token, - refreshToken: token.refresh_token, - }); - } - - const dashboardApi = new DashboardApi({ - baseUrl: CONFIG.dashboardApiBaseUrl, - appState, - }); - - const server = new McpServer({ - name: "algolia", - version: CONFIG.version, - capabilities: { - resources: {}, - tools: {}, - }, - }); - - const toolFilter = getToolFilter(opts); - - // Dashboard API Tools - if (isToolAllowed(GetUserInfoOperationId, toolFilter)) { - registerGetUserInfo(server, dashboardApi); - } - - if (isToolAllowed(GetApplicationsOperationId, toolFilter)) { - registerGetApplications(server, dashboardApi); - } - - // Search API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: SearchSpec, - toolFilter, - }); - - // Analytics API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: AnalyticsSpec, - toolFilter, - }); - - // Recommend API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: RecommendSpec, - toolFilter, - }); - - // AB Testing - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: ABTestingSpec, - toolFilter, - }); - - // Monitoring API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: MonitoringSpec, - toolFilter, - }); - - // Usage - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: UsageSpec, - toolFilter, - requestMiddlewares: [ - // The Usage API expects `name` parameter as multiple values - // rather than comma-separated. - async ({ request }) => { - const url = new URL(request.url); - const nameParams = url.searchParams.get("name"); - - if (!nameParams) { - return new Request(url, request.clone()); - } - - const nameValues = nameParams.split(","); - - url.searchParams.delete("name"); - - nameValues.forEach((value) => { - url.searchParams.append("name", value); - }); - - return new Request(url, request.clone()); - }, - ], - }); - - // Ingestion API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: IngestionSpec, - toolFilter, - requestMiddlewares: [ - // Dirty fix for Claud hallucinating regions - async ({ request, params }) => { - const application = await dashboardApi.getApplication(params.applicationId); - const region = application.data.attributes.log_region === "de" ? "eu" : "us"; - - const url = new URL(request.url); - const regionFromUrl = url.hostname.match(/data\.(.+)\.algolia.com/)?.[0]; - - if (regionFromUrl !== region) { - console.error("Had to adjust region from", regionFromUrl, "to", region); - url.hostname = `data.${region}.algolia.com`; - return new Request(url, request.clone()); - } - - return request; - }, - ], - }); - - // Collections API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: CollectionsSpec, - toolFilter, - }); - - // Query Suggestions API Tools - registerOpenApiTools({ - server, - dashboardApi, - openApiSpec: QuerySuggestionsSpec, - toolFilter, - }); - - // Custom settings Tools - if (isToolAllowed(SetAttributesForFacetingOperationId, toolFilter)) { - registerSetAttributesForFaceting(server, dashboardApi); - } - - if (isToolAllowed(SetCustomRankingOperationId, toolFilter)) { - registerSetCustomRanking(server, dashboardApi); - } + const server = await initMCPServer(opts); const transport = new StdioServerTransport(); await server.connect(transport); diff --git a/src/commands/start-sse-server.ts b/src/commands/start-sse-server.ts index 625d54c..ef3df98 100644 --- a/src/commands/start-sse-server.ts +++ b/src/commands/start-sse-server.ts @@ -3,6 +3,7 @@ import cors from "cors"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import type { StartServerOptions } from "../server/types.ts"; import { initMCPServer } from "../server/init-server.ts"; +import { CONFIG } from "../config.ts"; export async function startSseServer(opts: StartServerOptions) { try { @@ -19,10 +20,10 @@ export async function startSseServer(opts: StartServerOptions) { const connections = new Map(); // Health check endpoint - app.get("/health", (req, res) => { + app.get("/health", (_, res) => { res.status(200).json({ status: "ok", - version: "1.0.0", + version: CONFIG.version, uptime: process.uptime(), timestamp: new Date().toISOString(), connections: connections.size, diff --git a/src/server/init-server.ts b/src/server/init-server.ts index e21add4..cf4e1a8 100644 --- a/src/server/init-server.ts +++ b/src/server/init-server.ts @@ -4,22 +4,35 @@ import { authenticate } from "../authentication.ts"; import { DashboardApi } from "../DashboardApi.ts"; import { CONFIG } from "../config.ts"; import { getToolFilter, isToolAllowed } from "../toolFilters.ts"; -import { operationId as GetUserInfoOperationId, registerGetUserInfo } from "../tools/registerGetUserInfo.ts"; +import { + operationId as GetUserInfoOperationId, + registerGetUserInfo, +} from "../tools/registerGetUserInfo.ts"; import { operationId as GetApplicationsOperationId, - registerGetApplications + registerGetApplications, } from "../tools/registerGetApplications.ts"; import { registerOpenApiTools } from "../tools/registerOpenApi.ts"; import { ABTestingSpec, - AnalyticsSpec, CollectionsSpec, + AnalyticsSpec, + CollectionsSpec, IngestionSpec, MonitoringSpec, + QuerySuggestionsSpec, RecommendSpec, SearchSpec, - UsageSpec + UsageSpec, } from "../openApi.ts"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { + operationId as SetAttributesForFacetingOperationId, + registerSetAttributesForFaceting, +} from "../tools/registerSetAttributesForFaceting.ts"; +import { + operationId as SetCustomRankingOperationId, + registerSetCustomRanking, +} from "../tools/registerSetCustomRanking.ts"; export async function initMCPServer(opts: StartServerOptions): Promise { try { @@ -41,7 +54,7 @@ export async function initMCPServer(opts: StartServerOptions): Promise { + const url = new URL(request.url); + const nameParams = url.searchParams.get("name"); + + if (!nameParams) { + return new Request(url, request.clone()); + } + + const nameValues = nameParams.split(","); + + url.searchParams.delete("name"); + + nameValues.forEach((value) => { + url.searchParams.append("name", value); + }); + + return new Request(url, request.clone()); + }, + ], }); // Ingestion API Tools @@ -141,9 +176,26 @@ export async function initMCPServer(opts: StartServerOptions): Promise Date: Mon, 5 May 2025 12:22:54 +0200 Subject: [PATCH 8/9] fix: use a flag for transport choice --- src/app.ts | 29 ++++++++++++++++++----------- src/toolFilters.ts | 1 + 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/app.ts b/src/app.ts index 303fc9d..7df9f15 100644 --- a/src/app.ts +++ b/src/app.ts @@ -62,18 +62,25 @@ program .command("start-server", { isDefault: true }) .description("Starts the Algolia MCP server") .option(...ALLOW_TOOLS_OPTIONS_TUPLE) + .option("--transport [stdio|sse]", "Transport type, either `stdio` (default) or `sse`", "stdio") .action(async (opts: StartServerOptions) => { - const { startServer } = await import("./commands/start-server.ts"); - await startServer(opts); - }); - -program - .command("start-sse-server") - .description("Starts the remote-ready Algolia MCP server") - .option(...ALLOW_TOOLS_OPTIONS_TUPLE) - .action(async (opts: StartServerOptions) => { - const { startSseServer } = await import("./commands/start-sse-server.ts"); - await startSseServer(opts); + switch (opts.transport) { + case "stdio": { + console.info('Starting server with stdio transport'); + const { startServer } = await import("./commands/start-server.ts"); + await startServer(opts); + break; + } + case "sse": { + console.info('Starting server with SSE transport'); + const { startSseServer } = await import("./commands/start-sse-server.ts"); + await startSseServer(opts); + break; + } + default: + console.error(`Unknown transport type: ${opts.transport}\nAllowed values: stdio, sse`); + process.exit(1); + } }); program diff --git a/src/toolFilters.ts b/src/toolFilters.ts index 8581bd0..5804453 100644 --- a/src/toolFilters.ts +++ b/src/toolFilters.ts @@ -1,6 +1,7 @@ export type CliFilteringOptions = { allowTools?: string[]; denyTools?: string[]; + transport?: 'stdio' | 'sse'; }; export type ToolFilter = { From 1885b6d84b17814ceba57ffec86e99948de4c465 Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Mon, 5 May 2025 17:28:00 +0200 Subject: [PATCH 9/9] feat: add streamable transport --- README.md | 12 +- src/app.ts | 12 +- src/commands/start-http-server.ts | 256 ++++++++++++++++++++++++++++++ src/commands/start-sse-server.ts | 137 ---------------- src/toolFilters.ts | 2 +- 5 files changed, 272 insertions(+), 147 deletions(-) create mode 100644 src/commands/start-http-server.ts delete mode 100644 src/commands/start-sse-server.ts diff --git a/README.md b/README.md index be7cdc9..4cca651 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,8 @@ Here are some example prompts to get you started: ### Claude Desktop Setup +#### Local + 1. Open Claude Desktop settings 2. Add the following to your configuration: ```json @@ -123,8 +125,8 @@ Here are some example prompts to get you started: > [!TIP] > You can refer to the [official documentation](https://modelcontextprotocol.io/quickstart/user) for Claude Desktop. -#### SSE Server specific configuration -To run the SSE version, as Claude Desktop doesn't natively support it yet, you'll have to use a gateway: +#### Remote +To run an HTTP server, as Claude Desktop doesn't natively support it yet, you'll have to use a gateway: ```json { "mcpServers": { @@ -133,12 +135,16 @@ To run the SSE version, as Claude Desktop doesn't natively support it yet, you'l "args": [ "-y", "mcp-remote", - "http://localhost:4243/sse" + "http://localhost:4243/mcp" ] } } } ``` +> [!INFO] +> Our HTTP server leverages the [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http). +> It is also backward compatible with the [SSE transport](https://modelcontextprotocol.io/docs/concepts/transports#server-sent-events-sse). + ### CLI Options diff --git a/src/app.ts b/src/app.ts index 7df9f15..73d252e 100644 --- a/src/app.ts +++ b/src/app.ts @@ -62,7 +62,7 @@ program .command("start-server", { isDefault: true }) .description("Starts the Algolia MCP server") .option(...ALLOW_TOOLS_OPTIONS_TUPLE) - .option("--transport [stdio|sse]", "Transport type, either `stdio` (default) or `sse`", "stdio") + .option("--transport [stdio|http]", "Transport type, either `stdio` (default) or `http`", "stdio") .action(async (opts: StartServerOptions) => { switch (opts.transport) { case "stdio": { @@ -71,14 +71,14 @@ program await startServer(opts); break; } - case "sse": { - console.info('Starting server with SSE transport'); - const { startSseServer } = await import("./commands/start-sse-server.ts"); - await startSseServer(opts); + case "http": { + console.info('Starting server with HTTP transport support'); + const { startHttpServer } = await import("./commands/start-http-server.ts"); + await startHttpServer(opts); break; } default: - console.error(`Unknown transport type: ${opts.transport}\nAllowed values: stdio, sse`); + console.error(`Unknown transport type: ${opts.transport}\nAllowed values: stdio, http`); process.exit(1); } }); diff --git a/src/commands/start-http-server.ts b/src/commands/start-http-server.ts new file mode 100644 index 0000000..e756e7c --- /dev/null +++ b/src/commands/start-http-server.ts @@ -0,0 +1,256 @@ +import express, { type NextFunction, type Request, type Response } from "express"; +import cors from "cors"; +import { randomUUID } from "node:crypto"; +import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; +import { InMemoryEventStore } from "@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js"; +import type { StartServerOptions } from "../server/types.ts"; +import { initMCPServer } from "../server/init-server.ts"; +import { CONFIG } from "../config.ts"; + +export async function startHttpServer(opts: StartServerOptions) { + try { + // Create Express application + const app = express(); + app.use(express.json()); + app.use( + cors({ + origin: process.env.ALLOWED_ORIGINS?.split(",") || "*", + methods: ["GET", "POST", "DELETE"], + allowedHeaders: ["Content-Type", "Authorization"], + }), + ); + + // Store transports by session ID + const transports: Map = new Map(); + + // Health check endpoint + app.get("/health", (_, res) => { + res.status(200).json({ + status: "ok", + version: CONFIG.version, + uptime: process.uptime(), + timestamp: new Date().toISOString(), + connections: transports.size, + }); + }); + + //============================================================================= + // STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26) + //============================================================================= + + // Handle all MCP Streamable HTTP requests (GET, POST, DELETE) on a single endpoint + app.all("/mcp", async (req: Request, res: Response) => { + console.log(`Received ${req.method} request to /mcp`); + + try { + // Check for existing session ID + const sessionId = req.headers["mcp-session-id"] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId != null && transports.has(sessionId)) { + // Check if the transport is of the correct type + const existingTransport = transports.get(sessionId); + if (existingTransport instanceof StreamableHTTPServerTransport) { + // Reuse existing transport + transport = existingTransport; + } else { + // Transport exists but is not a StreamableHTTPServerTransport (could be SSEServerTransport) + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: Session exists but uses a different transport protocol", + }, + id: null, + }); + return; + } + } else if (sessionId == null && req.method === "POST" && isInitializeRequest(req.body)) { + const eventStore = new InMemoryEventStore(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, // Enable resumability + onsessioninitialized: (sessionId) => { + // Store the transport by session ID when session is initialized + console.log(`StreamableHTTP session initialized with ID: ${sessionId}`); + transports.set(sessionId, transport); + }, + }); + + // Set up onclose handler to clean up transport when closed + transport.onclose = () => { + if (transport.sessionId != null && transports.has(transport.sessionId)) { + console.log(`Transport closed for HTTP session ${transport.sessionId}, removing from transports map`); + transports.delete(transport.sessionId); + } + }; + + // Connect the transport to the MCP server + const server = await initMCPServer(opts); + await server.connect(transport); + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: null, + }); + return; + } + + // Handle the request with the transport + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error("Error handling MCP request:", error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: "2.0", + error: { + code: -32603, + message: "Internal server error", + }, + id: null, + }); + } + } + }); + + //============================================================================= + // DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05) + //============================================================================= + + app.get("/sse", async (_: Request, res: Response) => { + console.log("Received GET request to /sse (deprecated SSE transport)"); + const transport = new SSEServerTransport("/messages", res); + transports.set(transport.sessionId, transport); + res.on("close", () => { + if (transport.sessionId != null && transports.has(transport.sessionId)) { + console.log(`Transport closed for SSE session ${transport.sessionId}, removing from transports map`); + transports.delete(transport.sessionId); + } + }); + const server = await initMCPServer(opts); + await server.connect(transport); + }); + + app.post("/messages", async (req: Request, res: Response) => { + const sessionId = req.query.sessionId as string; + let transport: SSEServerTransport; + const existingTransport = transports.get(sessionId); + if (existingTransport instanceof SSEServerTransport) { + // Reuse existing transport + transport = existingTransport; + } else { + // Transport exists but is not a SSEServerTransport (could be StreamableHTTPServerTransport) + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: Session exists but uses a different transport protocol", + }, + id: null, + }); + return; + } + if (transport != null) { + await transport.handlePostMessage(req, res, req.body); + } else { + res.status(400).send({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No transport found for sessionId", + }, + id: null, + }); + } + }); + + //============================================================================= + // END OF SERVER SETUP + //============================================================================= + + // Error handling + app.use((err: Error, _: Request, res: Response, __: NextFunction) => { + console.error("Unhandled exception: ", err.stack); + res.status(500).json({ + jsonrpc: "2.0", + error: { + code: -32603, + message: "Internal Server Error", + }, + id: null, + }); + }); + + // Graceful shutdown of all connections + async function closeAllConnections() { + for (const [sessionId, transport] of transports.entries()) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transport.send({ + jsonrpc: "2.0", + method: "notifications/shutdown", + }); + await transport.close(); + } catch (error) { + console.error(`Error closing transport for session ${sessionId}: `, error); + } + } + transports.clear(); + console.log("All connections closed"); + } + + // Graceful shutdown + process.on("SIGTERM", async () => { + console.log(`Graceful shutdown initiated at ${new Date().toISOString()}`); + await closeAllConnections(); + server.close(() => { + console.log(`Graceful shutdown complete at ${new Date().toISOString()}`); + process.exit(0); + }); + }); + + // Handle server shutdown + process.on("SIGINT", async () => { + console.log("Shutting down server..."); + await closeAllConnections(); + + process.exit(0); + }); + + // Start the server + const PORT = 4243; + const server = app.listen(PORT, () => { + console.log(`HTTP MCP server listening on port ${PORT}`); + console.log(` + ============================================== + SUPPORTED TRANSPORT OPTIONS: + + 1. Streamable Http(Protocol version: 2025-03-26) + Endpoint: /mcp + Methods: GET, POST, DELETE + Usage: + - Initialize with POST to /mcp + - Establish SSE stream with GET to /mcp + - Send requests with POST to /mcp + - Terminate session with DELETE to /mcp + + 2. Http + SSE (Protocol version: 2024-11-05) + Endpoints: /sse (GET) and /messages (POST) + Usage: + - Establish SSE stream with GET to /sse + - Send requests with POST to /messages?sessionId= + ============================================== + `); + }); + } catch (err) { + console.error("Error starting server:", err); + process.exit(1); + } +} diff --git a/src/commands/start-sse-server.ts b/src/commands/start-sse-server.ts deleted file mode 100644 index ef3df98..0000000 --- a/src/commands/start-sse-server.ts +++ /dev/null @@ -1,137 +0,0 @@ -import express, { type Request, type Response } from "express"; -import cors from "cors"; -import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; -import type { StartServerOptions } from "../server/types.ts"; -import { initMCPServer } from "../server/init-server.ts"; -import { CONFIG } from "../config.ts"; - -export async function startSseServer(opts: StartServerOptions) { - try { - const app = express(); - app.use( - cors({ - origin: process.env.ALLOWED_ORIGINS?.split(",") || "*", - methods: ["GET", "POST"], - allowedHeaders: ["Content-Type", "Authorization"], - }), - ); - - // Store active connections - const connections = new Map(); - - // Health check endpoint - app.get("/health", (_, res) => { - res.status(200).json({ - status: "ok", - version: CONFIG.version, - uptime: process.uptime(), - timestamp: new Date().toISOString(), - connections: connections.size, - }); - }); - - // SSE connection establishment endpoint - app.get("/sse", async (req, res) => { - // Instantiate SSE transport object - const transport = new SSEServerTransport("/messages", res); - // Get sessionId - const sessionId = transport.sessionId; - console.log(`[${new Date().toISOString()}] New SSE connection established: ${sessionId}`); - - // Register connection - connections.set(sessionId, transport); - - // Connection interruption handling - req.on("close", () => { - console.log(`[${new Date().toISOString()}] SSE connection closed: ${sessionId}`); - connections.delete(sessionId); - }); - - // Connect the transport object to the MCP server - const mcpServer = await initMCPServer(opts); - await mcpServer.connect(transport); - console.log(`[${new Date().toISOString()}] MCP server connection successful: ${sessionId}`); - }); - - // Endpoint for receiving client messages - app.post("/messages", async (req: Request, res: Response) => { - try { - console.log(`[${new Date().toISOString()}] Received client message:`, req.query); - const sessionId = req.query.sessionId as string; - - // Find the corresponding SSE connection and process the message - if (connections.size > 0) { - const transport: SSEServerTransport = connections.get(sessionId) as SSEServerTransport; - // Use transport to process messages - if (transport) { - await transport.handlePostMessage(req, res); - } else { - throw new Error("No active SSE connection"); - } - } else { - throw new Error("No active SSE connection"); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - console.error(`[${new Date().toISOString()}] Failed to process client message:`, error); - res.status(500).json({ error: "Failed to process message", message: error.message }); - } - }); - - // Graceful shutdown of all connections - async function closeAllConnections() { - console.log( - `[${new Date().toISOString()}] Closing all connections (${connections.size} active)`, - ); - for (const [id, transport] of connections.entries()) { - try { - // Send shutdown event - transport.res.write( - 'event: server_shutdown\ndata: {"reason": "Server is shutting down"}\n\n', - ); - transport.res.end(); - console.log(`[${new Date().toISOString()}] Connection closed: ${id}`); - } catch (error) { - console.error(`[${new Date().toISOString()}] Failed to close connection: ${id}`, error); - } - } - connections.clear(); - } - - // Error handling - app.use((err: Error, _: Request, res: Response) => { - console.error(`[${new Date().toISOString()}] Unhandled exception:`, err); - res.status(500).json({ error: "Server internal error" }); - }); - - // Graceful shutdown - process.on("SIGTERM", async () => { - console.log(`[${new Date().toISOString()}] Received SIGTERM signal, preparing to close`); - await closeAllConnections(); - server.close(() => { - console.log(`[${new Date().toISOString()}] Server closed`); - process.exit(0); - }); - }); - - process.on("SIGINT", async () => { - console.log(`[${new Date().toISOString()}] Received SIGINT signal, preparing to close`); - await closeAllConnections(); - process.exit(0); - }); - - // Start server - const port = 4243; - const server = app.listen(port, () => { - console.log( - `[${new Date().toISOString()}] Algolia MCP SSE server started, address: http://localhost:${port}`, - ); - console.log(`- SSE connection endpoint: http://localhost:${port}/sse`); - console.log(`- Message processing endpoint: http://localhost:${port}/messages`); - console.log(`- Health check endpoint: http://localhost:${port}/health`); - }); - } catch (err) { - console.error("Error starting server:", err); - process.exit(1); - } -} diff --git a/src/toolFilters.ts b/src/toolFilters.ts index 5804453..01e3635 100644 --- a/src/toolFilters.ts +++ b/src/toolFilters.ts @@ -1,7 +1,7 @@ export type CliFilteringOptions = { allowTools?: string[]; denyTools?: string[]; - transport?: 'stdio' | 'sse'; + transport?: 'stdio' | 'http'; }; export type ToolFilter = {