Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/mcp/run-python.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The MCP Run Python server is distributed as a [JSR package](https://jsr.io/@pyda
```bash {title="terminal"}
deno run \
-N -R=node_modules -W=node_modules --node-modules-dir=auto \
jsr:@pydantic/mcp-run-python [stdio|sse|warmup]
jsr:@pydantic/mcp-run-python [stdio|streamable_http|sse|warmup]
```

where:
Expand All @@ -34,6 +34,10 @@ where:
- `stdio` runs the server with the
[Stdio MCP transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio)
— suitable for running the process as a subprocess locally
- `streamable_http` runs the server with the
[Streamable HTTP MCP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)
— running the server as an HTTP server to connect locally or remotely.
This supports stateful requests, but does not require the client to hold a stateful connection like SSE
- `sse` runs the server with the
[SSE MCP transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)
— running the server as an HTTP server to connect locally or remotely
Expand Down
2 changes: 1 addition & 1 deletion mcp-run-python/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"build-publish": "deno task build && deno publish"
},
"imports": {
"@modelcontextprotocol/sdk": "npm:@modelcontextprotocol/sdk@^1.8.0",
"@modelcontextprotocol/sdk": "npm:@modelcontextprotocol/sdk@^1.15.1",
"@std/cli": "jsr:@std/cli@^1.0.15",
"@std/path": "jsr:@std/path@^1.0.8",
// do NOT upgrade above this version until there is a workaround for https://github.com/pyodide/pyodide/pull/5621
Expand Down
45 changes: 25 additions & 20 deletions mcp-run-python/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

162 changes: 148 additions & 14 deletions mcp-run-python/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,32 @@

import './polyfill.ts'
import http from 'node:http'
import { randomUUID } from 'node:crypto'
import { parseArgs } from '@std/cli/parse-args'
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
import { type LoggingLevel, SetLevelRequestSchema } from '@modelcontextprotocol/sdk/types.js'
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { z } from 'zod'

import { asXml, runCode } from './runCode.ts'
import { Buffer } from 'node:buffer'

const VERSION = '0.0.13'

export async function main() {
const { args } = Deno
if (args.length === 1 && args[0] === 'stdio') {
await runStdio()
} else if (args.length >= 1 && args[0] === 'streamable_http') {
const flags = parseArgs(Deno.args, {
string: ['port'],
default: { port: '3001' },
})
const port = parseInt(flags.port)
runStreamableHttp(port)
} else if (args.length >= 1 && args[0] === 'sse') {
const flags = parseArgs(Deno.args, {
string: ['port'],
Expand All @@ -31,7 +42,7 @@ export async function main() {
`\
Invalid arguments.

Usage: deno run -N -R=node_modules -W=node_modules --node-modules-dir=auto jsr:@pydantic/mcp-run-python [stdio|sse|warmup]
Usage: deno run -N -R=node_modules -W=node_modules --node-modules-dir=auto jsr:@pydantic/mcp-run-python [stdio|streamable_http|sse|warmup]

options:
--port <port> Port to run the SSE server on (default: 3001)`,
Expand Down Expand Up @@ -103,6 +114,138 @@ print('python code here')
return server
}

/*
* Define some QOL functions for both the SSE and Streamable HTTP server implementation
*/
function httpGetUrl(req: http.IncomingMessage): URL {
return new URL(
req.url ?? '',
`http://${req.headers.host ?? 'unknown'}`,
)
}

function httpGetBody(req: http.IncomingMessage): Promise<JSON> {
// https://nodejs.org/en/learn/modules/anatomy-of-an-http-transaction#request-body
return new Promise((resolve) => {
// deno-lint-ignore no-explicit-any
const bodyParts: any[] = []
let body
req.on('data', (chunk) => {
bodyParts.push(chunk)
}).on('end', () => {
body = Buffer.concat(bodyParts).toString()
resolve(JSON.parse(body))
})
})
}

function httpSetTextResponse(res: http.ServerResponse, status: number, text: string) {
res.setHeader('Content-Type', 'text/plain')
res.statusCode = status
res.end(`${text}\n`)
}

function httpSetJsonResponse(res: http.ServerResponse, status: number, text: string, code: number) {
res.setHeader('Content-Type', 'application/json')
res.statusCode = status
res.write(JSON.stringify({
jsonrpc: '2.0',
error: {
code: code,
message: text,
},
id: null,
}))
res.end()
}

/*
* Run the MCP server using the Streamable HTTP transport
*/
function runStreamableHttp(port: number) {
// https://github.com/modelcontextprotocol/typescript-sdk?tab=readme-ov-file#with-session-management
const mcpServer = createServer()
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}

const server = http.createServer(async (req, res) => {
const url = httpGetUrl(req)
let pathMatch = false
function match(method: string, path: string): boolean {
if (url.pathname === path) {
pathMatch = true
return req.method === method
}
return false
}

// Reusable handler for GET and DELETE requests
async function handleSessionRequest() {
const sessionId = req.headers['mcp-session-id'] as string | undefined
if (!sessionId || !transports[sessionId]) {
httpSetTextResponse(res, 400, 'Invalid or missing session ID')
return
}

const transport = transports[sessionId]
await transport.handleRequest(req, res)
}

// Handle different request methods and paths
if (match('POST', '/mcp')) {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined
let transport: StreamableHTTPServerTransport

const body = await httpGetBody(req)

if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId]
} else if (!sessionId && isInitializeRequest(body)) {
// New initialization request
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// Store the transport by session ID
transports[sessionId] = transport
},
})

// Clean up transport when closed
transport.onclose = () => {
if (transport.sessionId) {
delete transports[transport.sessionId]
}
}

await mcpServer.connect(transport)
} else {
httpSetJsonResponse(res, 400, 'Bad Request: No valid session ID provided', -32000)
return
}

// Handle the request
await transport.handleRequest(req, res, body)
} else if (match('GET', '/mcp')) {
// Handle server-to-client notifications via SSE
await handleSessionRequest()
} else if (match('DELETE', '/mcp')) {
// Handle requests for session termination
await handleSessionRequest()
} else if (pathMatch) {
httpSetTextResponse(res, 405, 'Method not allowed')
} else {
httpSetTextResponse(res, 404, 'Page not found')
}
})

server.listen(port, () => {
console.log(
`Running MCP Run Python version ${VERSION} with Streamable HTTP transport on port ${port}`,
)
})
}

/*
* Run the MCP server using the SSE transport, e.g. over HTTP.
*/
Expand All @@ -111,10 +254,7 @@ function runSse(port: number) {
const transports: { [sessionId: string]: SSEServerTransport } = {}

const server = http.createServer(async (req, res) => {
const url = new URL(
req.url ?? '',
`http://${req.headers.host ?? 'unknown'}`,
)
const url = httpGetUrl(req)
let pathMatch = false
function match(method: string, path: string): boolean {
if (url.pathname === path) {
Expand All @@ -123,12 +263,6 @@ function runSse(port: number) {
}
return false
}
function textResponse(status: number, text: string) {
res.setHeader('Content-Type', 'text/plain')
res.statusCode = status
res.end(`${text}\n`)
}
// console.log(`${req.method} ${url}`)

if (match('GET', '/sse')) {
const transport = new SSEServerTransport('/messages', res)
Expand All @@ -143,12 +277,12 @@ function runSse(port: number) {
if (transport) {
await transport.handlePostMessage(req, res)
} else {
textResponse(400, `No transport found for sessionId '${sessionId}'`)
httpSetTextResponse(res, 400, `No transport found for sessionId '${sessionId}'`)
}
} else if (pathMatch) {
textResponse(405, 'Method not allowed')
httpSetTextResponse(res, 405, 'Method not allowed')
} else {
textResponse(404, 'Page not found')
httpSetTextResponse(res, 404, 'Page not found')
}
})

Expand Down
Loading