diff --git a/.changeset/websocket-transport.md b/.changeset/websocket-transport.md new file mode 100644 index 00000000..95d92957 --- /dev/null +++ b/.changeset/websocket-transport.md @@ -0,0 +1,5 @@ +--- +'@cloudflare/sandbox': minor +--- + +Add WebSocket transport to avoid sub-request limits in Workers and Durable Objects. Set `SANDBOX_TRANSPORT=websocket` environment variable to multiplex all SDK calls over a single persistent connection. diff --git a/.github/workflows/cleanup-stale.yml b/.github/workflows/cleanup-stale.yml index 9ae4f9c2..ae13feed 100644 --- a/.github/workflows/cleanup-stale.yml +++ b/.github/workflows/cleanup-stale.yml @@ -3,6 +3,7 @@ name: Cleanup Stale PR Resources permissions: contents: read pull-requests: read + actions: write # Required for gh cache delete on: schedule: @@ -37,13 +38,13 @@ jobs: # List all workers and filter for PR test workers WORKERS=$(npx wrangler workers list --json 2>/dev/null || echo "[]") - echo "$WORKERS" | jq -r '.[] | select(.name | test("^sandbox-e2e-test-worker-pr-[0-9]+$")) | .name' | while read -r WORKER_NAME; do + echo "$WORKERS" | jq -r '.[] | select(.name | test("^sandbox-e2e-test-worker-pr-[0-9]+(-http|-websocket)?$")) | .name' | while read -r WORKER_NAME; do if [ -z "$WORKER_NAME" ]; then continue fi - # Extract PR number from worker name - PR_NUMBER=$(echo "$WORKER_NAME" | sed 's/sandbox-e2e-test-worker-pr-//') + # Extract PR number from worker name (strip prefix and any transport suffix) + PR_NUMBER=$(echo "$WORKER_NAME" | sed 's/sandbox-e2e-test-worker-pr-//' | sed 's/-http$//' | sed 's/-websocket$//') echo "" echo "Checking worker: $WORKER_NAME (PR #$PR_NUMBER)" @@ -86,9 +87,83 @@ jobs: done echo "" - echo "=== Stale resource cleanup complete ===" + echo "=== Stale worker cleanup complete ===" working-directory: tests/e2e/test-worker env: CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Cleanup stale registry images + run: | + echo "=== Checking for stale registry images ===" + + # List all images and find PR-tagged ones + IMAGES=$(npx wrangler containers images list 2>/dev/null || echo "") + + # Extract unique PR numbers from image tags + PR_NUMBERS=$(echo "$IMAGES" | grep -oE 'pr-[0-9]+' | sed 's/pr-//' | sort -u) + + for PR_NUMBER in $PR_NUMBERS; do + if [ -z "$PR_NUMBER" ]; then + continue + fi + + echo "" + echo "Checking images for PR #$PR_NUMBER..." + + # Check if PR is closed/merged + PR_STATE=$(gh pr view "$PR_NUMBER" --json state -q '.state' 2>/dev/null || echo "NOT_FOUND") + + if [ "$PR_STATE" = "CLOSED" ] || [ "$PR_STATE" = "MERGED" ] || [ "$PR_STATE" = "NOT_FOUND" ]; then + echo " PR is $PR_STATE - cleaning up images..." + + for image in sandbox sandbox-python sandbox-opencode sandbox-standalone; do + echo " Deleting $image:pr-$PR_NUMBER..." + npx wrangler containers images delete "$image:pr-$PR_NUMBER" 2>/dev/null || true + done + else + echo " PR is $PR_STATE - skipping" + fi + done + + echo "" + echo "=== Stale image cleanup complete ===" + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Cleanup stale GHA caches + run: | + echo "=== Checking for stale GHA caches ===" + + # List all caches and find PR branch ones + CACHES=$(gh cache list --json ref -q '.[].ref' 2>/dev/null || echo "") + + # Filter for PR merge refs + echo "$CACHES" | grep -E 'refs/pull/[0-9]+/merge' | while read -r REF; do + if [ -z "$REF" ]; then + continue + fi + + PR_NUMBER=$(echo "$REF" | grep -oE '[0-9]+') + + echo "" + echo "Checking cache for PR #$PR_NUMBER..." + + # Check if PR is closed/merged + PR_STATE=$(gh pr view "$PR_NUMBER" --json state -q '.state' 2>/dev/null || echo "NOT_FOUND") + + if [ "$PR_STATE" = "CLOSED" ] || [ "$PR_STATE" = "MERGED" ] || [ "$PR_STATE" = "NOT_FOUND" ]; then + echo " PR is $PR_STATE - cleaning up cache..." + gh cache delete --all --ref "$REF" || true + else + echo " PR is $PR_STATE - skipping" + fi + done + + echo "" + echo "=== Stale cache cleanup complete ===" + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/cleanup.yml b/.github/workflows/cleanup.yml index 345101b2..f287e908 100644 --- a/.github/workflows/cleanup.yml +++ b/.github/workflows/cleanup.yml @@ -2,6 +2,7 @@ name: Cleanup PR Resources permissions: contents: read + actions: write # Required for gh cache delete on: pull_request: @@ -23,16 +24,43 @@ jobs: - name: Install dependencies run: npm ci - - name: Set worker name + - name: Set worker name prefix id: worker-name run: | - echo "worker_name=sandbox-e2e-test-worker-pr-${{ github.event.pull_request.number }}" >> $GITHUB_OUTPUT + echo "worker_prefix=sandbox-e2e-test-worker-pr-${{ github.event.pull_request.number }}" >> $GITHUB_OUTPUT - - name: Cleanup test deployment + - name: Cleanup test deployments continue-on-error: true run: | cd tests/e2e/test-worker - ../../../scripts/cleanup-test-deployment.sh ${{ steps.worker-name.outputs.worker_name }} + # Clean up both transport variants + for suffix in "" "-http" "-websocket"; do + WORKER="${{ steps.worker-name.outputs.worker_prefix }}${suffix}" + echo "Cleaning up: $WORKER" + ../../../scripts/cleanup-test-deployment.sh "$WORKER" || true + done env: CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + + - name: Cleanup Cloudflare registry images + continue-on-error: true + run: | + PR=${{ github.event.pull_request.number }} + echo "Deleting PR images from Cloudflare registry..." + + for image in sandbox sandbox-python sandbox-opencode sandbox-standalone; do + echo "Deleting $image:pr-$PR..." + npx wrangler containers images delete "$image:pr-$PR" || true + done + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + + - name: Cleanup GHA cache for PR branch + continue-on-error: true + run: | + echo "Deleting GHA caches for PR #${{ github.event.pull_request.number }}..." + gh cache delete --all --ref "refs/pull/${{ github.event.pull_request.number }}/merge" || true + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/pullrequest.yml b/.github/workflows/pullrequest.yml index e035d268..8e5eab11 100644 --- a/.github/workflows/pullrequest.yml +++ b/.github/workflows/pullrequest.yml @@ -74,12 +74,121 @@ jobs: - name: Run container unit tests run: npm run test -w @repo/sandbox-container - # E2E tests against deployed worker - e2e-tests: + # Build Docker images and push to Cloudflare registry + build-docker: needs: unit-tests if: ${{ !contains(github.event.pull_request.title, 'Version Packages') }} - timeout-minutes: 30 + timeout-minutes: 15 runs-on: ubuntu-latest + outputs: + image-tag: ${{ steps.tags.outputs.tag }} + steps: + - uses: actions/checkout@v4 + + - uses: docker/setup-buildx-action@v3 + + - uses: actions/setup-node@v4 + with: + node-version: 24 + + - name: Install wrangler + run: npm install -g wrangler + + - name: Set image tags + id: tags + run: | + echo "tag=pr-${{ github.event.pull_request.number }}" >> $GITHUB_OUTPUT + + # Build base, python, opencode in parallel (independent targets) + - name: Build base, python, opencode images in parallel + run: | + set -e + VERSION="${{ needs.unit-tests.outputs.version }}" + + echo "Starting parallel builds..." + + docker buildx build \ + --cache-from type=gha,scope=sandbox-base \ + --cache-to type=gha,mode=max,scope=sandbox-base \ + --load -t sandbox:local \ + --build-arg SANDBOX_VERSION=$VERSION \ + -f packages/sandbox/Dockerfile --target default . & + PID_BASE=$! + + docker buildx build \ + --cache-from type=gha,scope=sandbox-python \ + --cache-to type=gha,mode=max,scope=sandbox-python \ + --load -t sandbox-python:local \ + --build-arg SANDBOX_VERSION=$VERSION \ + -f packages/sandbox/Dockerfile --target python . & + PID_PYTHON=$! + + docker buildx build \ + --cache-from type=gha,scope=sandbox-opencode \ + --cache-to type=gha,mode=max,scope=sandbox-opencode \ + --load -t sandbox-opencode:local \ + --build-arg SANDBOX_VERSION=$VERSION \ + -f packages/sandbox/Dockerfile --target opencode . & + PID_OPENCODE=$! + + # Wait for all builds to complete + echo "Waiting for builds to complete..." + wait $PID_BASE || { echo "Base build failed"; exit 1; } + wait $PID_PYTHON || { echo "Python build failed"; exit 1; } + wait $PID_OPENCODE || { echo "Opencode build failed"; exit 1; } + echo "All builds completed successfully" + + # Push base first (standalone needs it in registry) + - name: Push base image to Cloudflare + run: | + docker tag sandbox:local sandbox:${{ steps.tags.outputs.tag }} + wrangler containers push sandbox:${{ steps.tags.outputs.tag }} + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + + # Build standalone (references base from registry) + - name: Build standalone image + run: | + docker buildx build \ + --cache-from type=gha,scope=sandbox-standalone \ + --cache-to type=gha,mode=max,scope=sandbox-standalone \ + --load -t sandbox-standalone:local \ + --build-arg BASE_IMAGE=registry.cloudflare.com/${{ secrets.CLOUDFLARE_ACCOUNT_ID }}/sandbox:${{ steps.tags.outputs.tag }} \ + -f tests/e2e/test-worker/Dockerfile.standalone tests/e2e/test-worker + + # Push remaining images in parallel + - name: Push python, opencode, standalone images to Cloudflare + run: | + set -e + TAG="${{ steps.tags.outputs.tag }}" + + docker tag sandbox-python:local sandbox-python:$TAG + wrangler containers push sandbox-python:$TAG & + PID1=$! + + docker tag sandbox-opencode:local sandbox-opencode:$TAG + wrangler containers push sandbox-opencode:$TAG & + PID2=$! + + docker tag sandbox-standalone:local sandbox-standalone:$TAG + wrangler containers push sandbox-standalone:$TAG & + PID3=$! + + wait $PID1 $PID2 $PID3 + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + + # E2E tests against deployed worker + e2e-tests: + needs: [unit-tests, build-docker] + timeout-minutes: 20 + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + transport: [http, websocket] steps: - uses: actions/checkout@v4 @@ -98,52 +207,31 @@ jobs: - name: Build packages run: npm run build - # Set environment name (pr-X for PRs, branch name for pushes) + # Set environment name (pr-X for PRs, branch name for pushes) + transport mode - name: Set environment name id: env-name run: | if [ "${{ github.event_name }}" = "pull_request" ]; then - echo "env_name=pr-${{ github.event.pull_request.number }}" >> $GITHUB_OUTPUT - echo "worker_name=sandbox-e2e-test-worker-pr-${{ github.event.pull_request.number }}" >> $GITHUB_OUTPUT + echo "env_name=pr-${{ github.event.pull_request.number }}-${{ matrix.transport }}" >> $GITHUB_OUTPUT + echo "worker_name=sandbox-e2e-test-worker-pr-${{ github.event.pull_request.number }}-${{ matrix.transport }}" >> $GITHUB_OUTPUT else - echo "env_name=${{ github.ref_name }}" >> $GITHUB_OUTPUT - echo "worker_name=sandbox-e2e-test-worker-${{ github.ref_name }}" >> $GITHUB_OUTPUT + echo "env_name=${{ github.ref_name }}-${{ matrix.transport }}" >> $GITHUB_OUTPUT + echo "worker_name=sandbox-e2e-test-worker-${{ github.ref_name }}-${{ matrix.transport }}" >> $GITHUB_OUTPUT fi - # Generate unique wrangler config for this PR/branch + # Generate wrangler config with pre-pushed image references - name: Generate wrangler config run: | cd tests/e2e/test-worker - ./generate-config.sh ${{ steps.env-name.outputs.worker_name }} - - # Build Docker image for test worker with caching - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + ./generate-config.sh \ + "${{ steps.env-name.outputs.worker_name }}" \ + "${{ steps.env-name.outputs.worker_name }}" \ + "${{ matrix.transport }}" \ + "registry:${{ needs.build-docker.outputs.image-tag }}" + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} - - name: Build test worker Docker images (base + python + opencode + standalone) - run: | - VERSION=${{ needs.unit-tests.outputs.version || '0.0.0' }} - # Build base image (no Python) - used by Sandbox binding - docker build -f packages/sandbox/Dockerfile --target default --platform linux/amd64 \ - --build-arg SANDBOX_VERSION=$VERSION -t cloudflare/sandbox-test:$VERSION . - # Build python image - used by SandboxPython binding - docker build -f packages/sandbox/Dockerfile --target python --platform linux/amd64 \ - --build-arg SANDBOX_VERSION=$VERSION -t cloudflare/sandbox-test:$VERSION-python . - # Build opencode image - used by SandboxOpencode binding - docker build -f packages/sandbox/Dockerfile --target opencode --platform linux/amd64 \ - --build-arg SANDBOX_VERSION=$VERSION -t cloudflare/sandbox-test:$VERSION-opencode . - # Build standalone image (arbitrary base with binary) - used by SandboxStandalone binding - # Use regex to replace any version number, avoiding hardcoded version mismatch - # Build from test-worker directory so COPY startup-test.sh works - cd tests/e2e/test-worker - sed -E "s|cloudflare/sandbox-test:[0-9]+\.[0-9]+\.[0-9]+|cloudflare/sandbox-test:$VERSION|g" \ - Dockerfile.standalone > Dockerfile.standalone.tmp - docker build -f Dockerfile.standalone.tmp --platform linux/amd64 \ - -t cloudflare/sandbox-test:$VERSION-standalone . - rm Dockerfile.standalone.tmp - cd ../../.. - - # Deploy test worker using official Cloudflare action + # Deploy test worker (images are pre-pushed to Cloudflare registry) - name: Deploy test worker id: deploy uses: cloudflare/wrangler-action@v3 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 0ae8ffa7..45a09c97 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -81,15 +81,18 @@ jobs: run: npm run test -w @repo/sandbox-container # E2E tests - runs on every push to main + # Uses buildx + GHA cache (shared with publish-release) + CF registry for fast deploys e2e-tests: needs: [unit-tests] if: ${{ github.repository_owner == 'cloudflare' }} runs-on: ubuntu-latest - timeout-minutes: 30 + timeout-minutes: 20 steps: - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: actions/setup-node@v4 with: node-version: 24 @@ -102,40 +105,101 @@ jobs: - name: Install dependencies run: npm ci + - name: Install wrangler + run: npm install -g wrangler + - name: Build packages run: npm run build - - name: Generate wrangler config + # Build base, python, opencode in parallel (shared cache with publish-release) + - name: Build base, python, opencode images in parallel run: | - cd tests/e2e/test-worker - ./generate-config.sh ${{ env.E2E_WORKER_NAME }} + set -e + VERSION="${{ needs.unit-tests.outputs.version }}" + + echo "Starting parallel builds..." + + docker buildx build \ + --cache-from type=gha,scope=release-default \ + --cache-to type=gha,mode=max,scope=release-default \ + --load -t sandbox:local \ + --build-arg SANDBOX_VERSION=$VERSION \ + -f packages/sandbox/Dockerfile --target default . & + PID_BASE=$! + + docker buildx build \ + --cache-from type=gha,scope=release-python \ + --cache-to type=gha,mode=max,scope=release-python \ + --load -t sandbox-python:local \ + --build-arg SANDBOX_VERSION=$VERSION \ + -f packages/sandbox/Dockerfile --target python . & + PID_PYTHON=$! + + docker buildx build \ + --cache-from type=gha,scope=release-opencode \ + --cache-to type=gha,mode=max,scope=release-opencode \ + --load -t sandbox-opencode:local \ + --build-arg SANDBOX_VERSION=$VERSION \ + -f packages/sandbox/Dockerfile --target opencode . & + PID_OPENCODE=$! + + # Wait for all builds to complete + echo "Waiting for builds to complete..." + wait $PID_BASE || { echo "Base build failed"; exit 1; } + wait $PID_PYTHON || { echo "Python build failed"; exit 1; } + wait $PID_OPENCODE || { echo "Opencode build failed"; exit 1; } + echo "All builds completed successfully" + + # Push base first (standalone needs it in registry) + - name: Push base image to Cloudflare + run: | + docker tag sandbox:local sandbox:main + wrangler containers push sandbox:main + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + # Build standalone (references base from registry) + - name: Build standalone image + run: | + docker buildx build \ + --cache-from type=gha,scope=release-standalone \ + --cache-to type=gha,mode=max,scope=release-standalone \ + --load -t sandbox-standalone:local \ + --build-arg BASE_IMAGE=registry.cloudflare.com/${{ secrets.CLOUDFLARE_ACCOUNT_ID }}/sandbox:main \ + -f tests/e2e/test-worker/Dockerfile.standalone tests/e2e/test-worker + + # Push remaining images in parallel + - name: Push python, opencode, standalone images to Cloudflare + run: | + set -e + + docker tag sandbox-python:local sandbox-python:main + wrangler containers push sandbox-python:main & + PID1=$! + + docker tag sandbox-opencode:local sandbox-opencode:main + wrangler containers push sandbox-opencode:main & + PID2=$! + + docker tag sandbox-standalone:local sandbox-standalone:main + wrangler containers push sandbox-standalone:main & + PID3=$! + + wait $PID1 $PID2 $PID3 + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} - - name: Build test worker Docker images (base + python + opencode + standalone) + # Generate wrangler config with pre-pushed image references + - name: Generate wrangler config run: | - VERSION=${{ needs.unit-tests.outputs.version }} - # Build base image (no Python) - used by Sandbox binding - docker build -f packages/sandbox/Dockerfile --target default --platform linux/amd64 \ - --build-arg SANDBOX_VERSION=$VERSION -t cloudflare/sandbox-test:$VERSION . - # Build python image - used by SandboxPython binding - docker build -f packages/sandbox/Dockerfile --target python --platform linux/amd64 \ - --build-arg SANDBOX_VERSION=$VERSION -t cloudflare/sandbox-test:$VERSION-python . - # Build opencode image - used by SandboxOpencode binding - docker build -f packages/sandbox/Dockerfile --target opencode --platform linux/amd64 \ - --build-arg SANDBOX_VERSION=$VERSION -t cloudflare/sandbox-test:$VERSION-opencode . - # Build standalone image (arbitrary base with binary) - used by SandboxStandalone binding - # Use regex to replace any version number, avoiding hardcoded version mismatch - # Build from test-worker directory so COPY startup-test.sh works cd tests/e2e/test-worker - sed -E "s|cloudflare/sandbox-test:[0-9]+\.[0-9]+\.[0-9]+|cloudflare/sandbox-test:$VERSION|g" \ - Dockerfile.standalone > Dockerfile.standalone.tmp - docker build -f Dockerfile.standalone.tmp --platform linux/amd64 \ - -t cloudflare/sandbox-test:$VERSION-standalone . - rm Dockerfile.standalone.tmp - cd ../../.. + ./generate-config.sh ${{ env.E2E_WORKER_NAME }} ${{ env.E2E_WORKER_NAME }} http "registry:main" + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + # Deploy test worker (images are pre-pushed to Cloudflare registry) - name: Deploy test worker uses: cloudflare/wrangler-action@v3 with: diff --git a/packages/sandbox-container/src/handlers/ws-adapter.ts b/packages/sandbox-container/src/handlers/ws-adapter.ts new file mode 100644 index 00000000..23d74bfb --- /dev/null +++ b/packages/sandbox-container/src/handlers/ws-adapter.ts @@ -0,0 +1,372 @@ +/** + * WebSocket Protocol Adapter for Container + * + * Adapts WebSocket messages to HTTP requests for routing through existing handlers. + * This enables multiplexing multiple requests over a single WebSocket connection, + * reducing sub-request count when the SDK runs inside Workers/Durable Objects. + */ + +import type { Logger } from '@repo/shared'; +import { + isWSRequest, + type WSError, + type WSRequest, + type WSResponse, + type WSServerMessage, + type WSStreamChunk +} from '@repo/shared'; +import type { ServerWebSocket } from 'bun'; +import type { Router } from '../core/router'; + +/** Container server port - must match SERVER_PORT in server.ts */ +const SERVER_PORT = 3000; + +/** + * WebSocket data attached to each connection + */ +export interface WSData { + /** Connection ID for logging */ + connectionId: string; +} + +/** + * WebSocket protocol adapter that bridges WebSocket messages to HTTP handlers + * + * Converts incoming WebSocket requests to HTTP Request objects and routes them + * through the standard router. Supports both regular responses and SSE streaming. + */ +export class WebSocketAdapter { + private router: Router; + private logger: Logger; + + constructor(router: Router, logger: Logger) { + this.router = router; + this.logger = logger.child({ component: 'container' }); + } + + /** + * Handle WebSocket connection open + */ + onOpen(ws: ServerWebSocket): void { + this.logger.debug('WebSocket connection opened', { + connectionId: ws.data.connectionId + }); + } + + /** + * Handle WebSocket connection close + */ + onClose(ws: ServerWebSocket, code: number, reason: string): void { + this.logger.debug('WebSocket connection closed', { + connectionId: ws.data.connectionId, + code, + reason + }); + } + + /** + * Handle incoming WebSocket message + */ + async onMessage( + ws: ServerWebSocket, + message: string | Buffer + ): Promise { + const messageStr = + typeof message === 'string' ? message : message.toString('utf-8'); + + let parsed: unknown; + try { + parsed = JSON.parse(messageStr); + } catch (error) { + this.sendError(ws, undefined, 'PARSE_ERROR', 'Invalid JSON message', 400); + return; + } + + if (!isWSRequest(parsed)) { + this.sendError( + ws, + undefined, + 'INVALID_REQUEST', + 'Message must be a valid WSRequest', + 400 + ); + return; + } + + const request = parsed as WSRequest; + + this.logger.debug('WebSocket request received', { + connectionId: ws.data.connectionId, + id: request.id, + method: request.method, + path: request.path + }); + + try { + await this.handleRequest(ws, request); + } catch (error) { + this.logger.error( + 'Error handling WebSocket request', + error instanceof Error ? error : new Error(String(error)), + { requestId: request.id } + ); + this.sendError( + ws, + request.id, + 'INTERNAL_ERROR', + error instanceof Error ? error.message : 'Unknown error', + 500 + ); + } + } + + /** + * Handle a WebSocket request by routing it to HTTP handlers + */ + private async handleRequest( + ws: ServerWebSocket, + request: WSRequest + ): Promise { + // Build URL for the request + const url = `http://localhost:${SERVER_PORT}${request.path}`; + + // Build headers + const headers: Record = { + 'Content-Type': 'application/json', + ...request.headers + }; + + // Build request options + const requestInit: RequestInit = { + method: request.method, + headers + }; + + // Add body for POST/PUT + if ( + request.body !== undefined && + (request.method === 'POST' || request.method === 'PUT') + ) { + requestInit.body = JSON.stringify(request.body); + } + + // Create a fetch Request object + const httpRequest = new Request(url, requestInit); + + // Route through the existing router + const httpResponse = await this.router.route(httpRequest); + + // Check if this is a streaming response + const contentType = httpResponse.headers.get('Content-Type') || ''; + const isStreaming = contentType.includes('text/event-stream'); + + if (isStreaming && httpResponse.body) { + // Handle SSE streaming response + await this.handleStreamingResponse(ws, request.id, httpResponse); + } else { + // Handle regular response + await this.handleRegularResponse(ws, request.id, httpResponse); + } + } + + /** + * Handle a regular (non-streaming) HTTP response + */ + private async handleRegularResponse( + ws: ServerWebSocket, + requestId: string, + response: Response + ): Promise { + let body: unknown; + + try { + const text = await response.text(); + body = text ? JSON.parse(text) : undefined; + } catch { + body = undefined; + } + + const wsResponse: WSResponse = { + type: 'response', + id: requestId, + status: response.status, + body, + done: true + }; + + this.send(ws, wsResponse); + } + + /** + * Handle a streaming (SSE) HTTP response + */ + private async handleStreamingResponse( + ws: ServerWebSocket, + requestId: string, + response: Response + ): Promise { + if (!response.body) { + this.sendError(ws, requestId, 'STREAM_ERROR', 'No response body', 500); + return; + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + + if (done) { + break; + } + + // Decode chunk and add to buffer + buffer += decoder.decode(value, { stream: true }); + + // Parse SSE events from buffer + const events = this.parseSSEEvents(buffer); + buffer = events.remaining; + + // Send each parsed event as a stream chunk + for (const event of events.events) { + const chunk: WSStreamChunk = { + type: 'stream', + id: requestId, + event: event.event, + data: event.data + }; + if (!this.send(ws, chunk)) { + return; // Connection dead, stop processing + } + } + } + + // Send final response to close the stream + const wsResponse: WSResponse = { + type: 'response', + id: requestId, + status: response.status, + done: true + }; + this.send(ws, wsResponse); + } catch (error) { + this.logger.error( + 'Error reading stream', + error instanceof Error ? error : new Error(String(error)), + { requestId } + ); + this.sendError( + ws, + requestId, + 'STREAM_ERROR', + error instanceof Error ? error.message : 'Stream read failed', + 500 + ); + } finally { + reader.releaseLock(); + } + } + + /** + * Parse SSE events from a buffer + * + * Returns parsed events and any remaining unparsed content (incomplete lines + * waiting for more data from the next chunk). + * + * Note: This is a minimal SSE parser that only handles `event:` and `data:` + * fields - sufficient for our streaming handlers which only emit these. + * Per the SSE spec, we intentionally ignore: + * - `id:` field (event IDs for reconnection) + * - `retry:` field (reconnection timing hints) + * - Comment lines (starting with `:`) + */ + private parseSSEEvents(buffer: string): { + events: Array<{ event?: string; data: string }>; + remaining: string; + } { + const events: Array<{ event?: string; data: string }> = []; + let currentEvent: { event?: string; data: string[] } = { data: [] }; + let i = 0; + + while (i < buffer.length) { + const newlineIndex = buffer.indexOf('\n', i); + if (newlineIndex === -1) break; // Incomplete line, keep in buffer + + const line = buffer.substring(i, newlineIndex); + i = newlineIndex + 1; + + // Check if we have a complete event (empty line after data) + if (line === '' && currentEvent.data.length > 0) { + events.push({ + event: currentEvent.event, + data: currentEvent.data.join('\n') + }); + currentEvent = { data: [] }; + continue; + } + + if (line.startsWith('event:')) { + currentEvent.event = line.substring(6).trim(); + } else if (line.startsWith('data:')) { + currentEvent.data.push(line.substring(5).trim()); + } + // Other lines (including empty lines without pending data) are ignored + } + + return { + events, + remaining: buffer.substring(i) + }; + } + + /** + * Send a message over WebSocket + * @returns true if send succeeded, false if it failed (connection will be closed) + */ + private send(ws: ServerWebSocket, message: WSServerMessage): boolean { + try { + ws.send(JSON.stringify(message)); + return true; + } catch (error) { + this.logger.error( + 'Failed to send WebSocket message, closing connection', + error instanceof Error ? error : new Error(String(error)) + ); + try { + ws.close(1011, 'Send failed'); // 1011 = unexpected condition + } catch { + // Connection already closed + } + return false; + } + } + + /** + * Send an error message over WebSocket + */ + private sendError( + ws: ServerWebSocket, + requestId: string | undefined, + code: string, + message: string, + status: number + ): void { + const error: WSError = { + type: 'error', + id: requestId, + code, + message, + status + }; + this.send(ws, error); + } +} + +/** + * Generate a unique connection ID + */ +export function generateConnectionId(): string { + return `conn_${Date.now()}_${Math.random().toString(36).substring(2, 8)}`; +} diff --git a/packages/sandbox-container/src/server.ts b/packages/sandbox-container/src/server.ts index 77292782..3dbababb 100644 --- a/packages/sandbox-container/src/server.ts +++ b/packages/sandbox-container/src/server.ts @@ -2,6 +2,11 @@ import { createLogger } from '@repo/shared'; import { serve } from 'bun'; import { Container } from './core/container'; import { Router } from './core/router'; +import { + generateConnectionId, + WebSocketAdapter, + type WSData +} from './handlers/ws-adapter'; import { setupRoutes } from './routes/setup'; const logger = createLogger({ component: 'container' }); @@ -13,8 +18,12 @@ export interface ServerInstance { } async function createApplication(): Promise<{ - fetch: (req: Request) => Promise; + fetch: ( + req: Request, + server: ReturnType> + ) => Promise; container: Container; + wsAdapter: WebSocketAdapter; }> { const container = new Container(); await container.initialize(); @@ -23,9 +32,37 @@ async function createApplication(): Promise<{ router.use(container.get('corsMiddleware')); setupRoutes(router, container); + // Create WebSocket adapter with the router for control plane multiplexing + const wsAdapter = new WebSocketAdapter(router, logger); + return { - fetch: (req: Request) => router.route(req), - container + fetch: async ( + req: Request, + server: ReturnType> + ): Promise => { + // Check for WebSocket upgrade request + const upgradeHeader = req.headers.get('Upgrade'); + if (upgradeHeader?.toLowerCase() === 'websocket') { + // Handle WebSocket upgrade for control plane + const url = new URL(req.url); + if (url.pathname === '/ws' || url.pathname === '/api/ws') { + const upgraded = server.upgrade(req, { + data: { + connectionId: generateConnectionId() + } + }); + if (upgraded) { + return undefined as unknown as Response; // Bun handles the upgrade + } + return new Response('WebSocket upgrade failed', { status: 500 }); + } + } + + // Regular HTTP request + return router.route(req); + }, + container, + wsAdapter }; } @@ -36,14 +73,47 @@ async function createApplication(): Promise<{ export async function startServer(): Promise { const app = await createApplication(); - serve({ + serve({ idleTimeout: 255, - fetch: app.fetch, + fetch: (req, server) => app.fetch(req, server), hostname: '0.0.0.0', port: SERVER_PORT, + // WebSocket adapter for control plane multiplexing websocket: { - async message() { - // WebSocket placeholder for future streaming features + open(ws) { + try { + app.wsAdapter.onOpen(ws); + } catch (error) { + logger.error( + 'Error in WebSocket open handler', + error instanceof Error ? error : new Error(String(error)) + ); + } + }, + close(ws, code, reason) { + try { + app.wsAdapter.onClose(ws, code, reason); + } catch (error) { + logger.error( + 'Error in WebSocket close handler', + error instanceof Error ? error : new Error(String(error)) + ); + } + }, + async message(ws, message) { + try { + await app.wsAdapter.onMessage(ws, message); + } catch (error) { + logger.error( + 'Error in WebSocket message handler', + error instanceof Error ? error : new Error(String(error)) + ); + try { + ws.close(1011, 'Internal error'); + } catch { + // Connection already closed + } + } } } }); @@ -55,6 +125,9 @@ export async function startServer(): Promise { return { port: SERVER_PORT, + // Cleanup handles application-level resources (processes, ports). + // WebSocket connections are closed automatically when the process exits - + // Bun's serve() handles transport cleanup on shutdown. cleanup: async () => { if (!app.container.isInitialized()) return; diff --git a/packages/sandbox-container/tests/handlers/ws-adapter.test.ts b/packages/sandbox-container/tests/handlers/ws-adapter.test.ts new file mode 100644 index 00000000..93d91ab0 --- /dev/null +++ b/packages/sandbox-container/tests/handlers/ws-adapter.test.ts @@ -0,0 +1,366 @@ +import type { Logger, WSError, WSRequest, WSResponse } from '@repo/shared'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { Router } from '../../src/core/router'; +import { + generateConnectionId, + WebSocketAdapter, + type WSData +} from '../../src/handlers/ws-adapter'; + +// Mock ServerWebSocket +class MockServerWebSocket { + data: WSData; + sentMessages: string[] = []; + + constructor(data: WSData) { + this.data = data; + } + + send(message: string) { + this.sentMessages.push(message); + } + + getSentMessages(): T[] { + return this.sentMessages.map((m) => JSON.parse(m)); + } + + getLastMessage(): T { + return JSON.parse(this.sentMessages[this.sentMessages.length - 1]); + } +} + +// Mock Router +function createMockRouter(): Router { + return { + route: vi.fn() + } as unknown as Router; +} + +// Mock Logger +function createMockLogger(): Logger { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + child: vi.fn(() => createMockLogger()) + } as unknown as Logger; +} + +describe('WebSocketAdapter', () => { + let adapter: WebSocketAdapter; + let mockRouter: Router; + let mockLogger: Logger; + let mockWs: MockServerWebSocket; + + beforeEach(() => { + vi.clearAllMocks(); + mockRouter = createMockRouter(); + mockLogger = createMockLogger(); + adapter = new WebSocketAdapter(mockRouter, mockLogger); + mockWs = new MockServerWebSocket({ connectionId: 'test-conn-123' }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('onMessage', () => { + it('should handle valid request and return response', async () => { + const request: WSRequest = { + type: 'request', + id: 'req-123', + method: 'GET', + path: '/api/health' + }; + + // Mock router to return a successful response + (mockRouter.route as any).mockResolvedValue( + new Response(JSON.stringify({ status: 'ok' }), { + status: 200, + headers: { 'Content-Type': 'application/json' } + }) + ); + + await adapter.onMessage(mockWs as any, JSON.stringify(request)); + + expect(mockRouter.route).toHaveBeenCalled(); + + const response = mockWs.getLastMessage(); + expect(response.type).toBe('response'); + expect(response.id).toBe('req-123'); + expect(response.status).toBe(200); + expect(response.body).toEqual({ status: 'ok' }); + expect(response.done).toBe(true); + }); + + it('should handle POST request with body', async () => { + const request: WSRequest = { + type: 'request', + id: 'req-456', + method: 'POST', + path: '/api/execute', + body: { command: 'echo hello', sessionId: 'sess-1' } + }; + + (mockRouter.route as any).mockResolvedValue( + new Response( + JSON.stringify({ + success: true, + stdout: 'hello\n', + exitCode: 0 + }), + { status: 200 } + ) + ); + + await adapter.onMessage(mockWs as any, JSON.stringify(request)); + + // Verify router was called with correct Request + const routerCall = (mockRouter.route as any).mock.calls[0][0] as Request; + expect(routerCall.method).toBe('POST'); + expect(routerCall.url).toContain('/api/execute'); + + const body = (await routerCall.clone().json()) as { command: string }; + expect(body.command).toBe('echo hello'); + }); + + it('should return error for invalid JSON', async () => { + await adapter.onMessage(mockWs as any, 'not valid json'); + + const response = mockWs.getLastMessage(); + expect(response.type).toBe('error'); + expect(response.code).toBe('PARSE_ERROR'); + expect(response.status).toBe(400); + }); + + it('should return error for invalid request format', async () => { + await adapter.onMessage( + mockWs as any, + JSON.stringify({ notARequest: true }) + ); + + const response = mockWs.getLastMessage(); + expect(response.type).toBe('error'); + expect(response.code).toBe('INVALID_REQUEST'); + expect(response.status).toBe(400); + }); + + it('should handle router errors gracefully', async () => { + const request: WSRequest = { + type: 'request', + id: 'req-err', + method: 'GET', + path: '/api/fail' + }; + + (mockRouter.route as any).mockRejectedValue(new Error('Router failed')); + + await adapter.onMessage(mockWs as any, JSON.stringify(request)); + + const response = mockWs.getLastMessage(); + expect(response.type).toBe('error'); + expect(response.id).toBe('req-err'); + expect(response.code).toBe('INTERNAL_ERROR'); + expect(response.message).toContain('Router failed'); + expect(response.status).toBe(500); + }); + + it('should handle 404 responses', async () => { + const request: WSRequest = { + type: 'request', + id: 'req-404', + method: 'GET', + path: '/api/notfound' + }; + + (mockRouter.route as any).mockResolvedValue( + new Response( + JSON.stringify({ + code: 'NOT_FOUND', + message: 'Resource not found' + }), + { status: 404 } + ) + ); + + await adapter.onMessage(mockWs as any, JSON.stringify(request)); + + const response = mockWs.getLastMessage(); + expect(response.type).toBe('response'); + expect(response.id).toBe('req-404'); + expect(response.status).toBe(404); + }); + + it('should handle streaming responses', async () => { + const request: WSRequest = { + type: 'request', + id: 'req-stream', + method: 'POST', + path: '/api/execute/stream', + body: { command: 'echo test' } + }; + + // Create a mock SSE stream + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('event: start\ndata: {"type":"start"}\n\n') + ); + controller.enqueue( + encoder.encode('data: {"type":"stdout","text":"test\\n"}\n\n') + ); + controller.enqueue( + encoder.encode( + 'event: complete\ndata: {"type":"complete","exitCode":0}\n\n' + ) + ); + controller.close(); + } + }); + + (mockRouter.route as any).mockResolvedValue( + new Response(stream, { + status: 200, + headers: { 'Content-Type': 'text/event-stream' } + }) + ); + + await adapter.onMessage(mockWs as any, JSON.stringify(request)); + + // Should have received stream chunks and final response + const messages = mockWs.getSentMessages(); + + // Find stream chunks + const streamChunks = messages.filter((m) => m.type === 'stream'); + expect(streamChunks.length).toBeGreaterThan(0); + + // Find final response + const finalResponse = messages.find((m) => m.type === 'response'); + expect(finalResponse).toBeDefined(); + expect(finalResponse.done).toBe(true); + }); + + it('should handle Buffer messages', async () => { + const request: WSRequest = { + type: 'request', + id: 'req-buffer', + method: 'GET', + path: '/api/test' + }; + + (mockRouter.route as any).mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { status: 200 }) + ); + + // Send as Buffer + const buffer = Buffer.from(JSON.stringify(request)); + await adapter.onMessage(mockWs as any, buffer); + + expect(mockRouter.route).toHaveBeenCalled(); + }); + }); + + describe('generateConnectionId', () => { + it('should generate unique connection IDs', () => { + const id1 = generateConnectionId(); + const id2 = generateConnectionId(); + + expect(id1).toMatch(/^conn_\d+_[a-z0-9]+$/); + expect(id2).toMatch(/^conn_\d+_[a-z0-9]+$/); + expect(id1).not.toBe(id2); + }); + }); +}); + +describe('WebSocket Integration', () => { + let adapter: WebSocketAdapter; + let mockRouter: Router; + let mockLogger: Logger; + + beforeEach(() => { + mockRouter = createMockRouter(); + mockLogger = createMockLogger(); + adapter = new WebSocketAdapter(mockRouter, mockLogger); + }); + + it('should handle multiple concurrent requests', async () => { + const mockWs = new MockServerWebSocket({ connectionId: 'concurrent-test' }); + + const requests: WSRequest[] = [ + { type: 'request', id: 'req-1', method: 'GET', path: '/api/one' }, + { type: 'request', id: 'req-2', method: 'GET', path: '/api/two' }, + { type: 'request', id: 'req-3', method: 'GET', path: '/api/three' } + ]; + + // Router returns different responses based on path + (mockRouter.route as any).mockImplementation((req: Request) => { + const path = new URL(req.url).pathname; + return new Response(JSON.stringify({ path }), { status: 200 }); + }); + + // Process all requests concurrently + await Promise.all( + requests.map((req) => + adapter.onMessage(mockWs as any, JSON.stringify(req)) + ) + ); + + const responses = mockWs.getSentMessages(); + expect(responses).toHaveLength(3); + + // Verify each request got its correct response + const responseIds = responses.map((r) => r.id).sort(); + expect(responseIds).toEqual(['req-1', 'req-2', 'req-3']); + + // Verify response bodies match request paths + responses.forEach((r) => { + expect(r.body).toBeDefined(); + }); + }); + + it('should maintain request isolation', async () => { + const mockWs = new MockServerWebSocket({ connectionId: 'isolation-test' }); + + // First request fails + const failRequest: WSRequest = { + type: 'request', + id: 'fail-req', + method: 'GET', + path: '/api/fail' + }; + + // Second request succeeds + const successRequest: WSRequest = { + type: 'request', + id: 'success-req', + method: 'GET', + path: '/api/success' + }; + + (mockRouter.route as any).mockImplementation((req: Request) => { + const path = new URL(req.url).pathname; + if (path === '/api/fail') { + throw new Error('Intentional failure'); + } + return new Response(JSON.stringify({ ok: true }), { status: 200 }); + }); + + // Process both requests + await adapter.onMessage(mockWs as any, JSON.stringify(failRequest)); + await adapter.onMessage(mockWs as any, JSON.stringify(successRequest)); + + const messages = mockWs.getSentMessages(); + expect(messages).toHaveLength(2); + + // First should be error + const errorMsg = messages.find((m) => m.id === 'fail-req'); + expect(errorMsg.type).toBe('error'); + + // Second should succeed + const successMsg = messages.find((m) => m.id === 'success-req'); + expect(successMsg.type).toBe('response'); + expect(successMsg.status).toBe(200); + }); +}); diff --git a/packages/sandbox/src/clients/base-client.ts b/packages/sandbox/src/clients/base-client.ts index 7d1b0221..dce838df 100644 --- a/packages/sandbox/src/clients/base-client.ts +++ b/packages/sandbox/src/clients/base-client.ts @@ -1,80 +1,62 @@ import type { Logger } from '@repo/shared'; import { createNoOpLogger } from '@repo/shared'; -import { getHttpStatus } from '@repo/shared/errors'; import type { ErrorResponse as NewErrorResponse } from '../errors'; import { createErrorFromResponse, ErrorCode } from '../errors'; import type { SandboxError } from '../errors/classes'; +import { createTransport, type ITransport } from './transport'; import type { HttpClientOptions, ResponseHandler } from './types'; -// Container startup retry configuration -const TIMEOUT_MS = 120_000; // 2 minutes total retry budget -const MIN_TIME_FOR_RETRY_MS = 15_000; // Need at least 15s remaining to retry (allows for longer container startups) - /** - * Abstract base class providing common HTTP functionality for all domain clients + * Abstract base class providing common HTTP/WebSocket functionality for all domain clients + * + * All requests go through the Transport abstraction layer, which handles: + * - HTTP and WebSocket modes transparently + * - Automatic retry for 503 errors (container starting) + * - Streaming responses + * + * WebSocket mode is useful when running inside Workers/Durable Objects + * where sub-request limits apply. */ export abstract class BaseHttpClient { - protected baseUrl: string; protected options: HttpClientOptions; protected logger: Logger; + protected transport: ITransport; constructor(options: HttpClientOptions = {}) { this.options = options; this.logger = options.logger ?? createNoOpLogger(); - this.baseUrl = this.options.baseUrl!; + + // Always create a Transport - it handles both HTTP and WebSocket modes + if (options.transport) { + this.transport = options.transport; + } else { + const mode = options.transportMode ?? 'http'; + this.transport = createTransport({ + mode, + baseUrl: options.baseUrl ?? 'http://localhost:3000', + wsUrl: options.wsUrl, + logger: this.logger, + stub: options.stub, + port: options.port + }); + } + } + + /** + * Check if using WebSocket transport + */ + protected isWebSocketMode(): boolean { + return this.transport.getMode() === 'websocket'; } /** - * Core HTTP request method with automatic retry for container startup delays - * Retries on 503 (Service Unavailable) which indicates container is starting + * Core fetch method - delegates to Transport which handles retry logic */ protected async doFetch( path: string, options?: RequestInit ): Promise { - const startTime = Date.now(); - let attempt = 0; - - while (true) { - const response = await this.executeFetch(path, options); - - // Check if this is a retryable container error (503 = transient) - const shouldRetry = this.isRetryableContainerError(response); - - if (shouldRetry) { - const elapsed = Date.now() - startTime; - const remaining = TIMEOUT_MS - elapsed; - - // Check if we have enough time for another attempt - if (remaining > MIN_TIME_FOR_RETRY_MS) { - // Exponential backoff with longer delays for container ops: 3s, 6s, 12s, 24s, 30s - const delay = Math.min(3000 * 2 ** attempt, 30000); - - this.logger.info('Container not ready, retrying', { - status: response.status, - attempt: attempt + 1, - delayMs: delay, - remainingSec: Math.floor(remaining / 1000) - }); - - await new Promise((resolve) => setTimeout(resolve, delay)); - attempt++; - continue; - } - - // Timeout exhausted - this.logger.error( - 'Container failed to become ready', - new Error( - `Failed after ${attempt + 1} attempts over ${Math.floor(elapsed / 1000)}s` - ) - ); - return response; - } - - // Not a retryable error or request succeeded - return response; - } + return this.transport.fetch(path, options); } /** @@ -201,6 +183,41 @@ export abstract class BaseHttpClient { return response.body; } + /** + * Stream request handler + * + * For HTTP mode, uses doFetch + handleStreamResponse to get proper error typing. + * For WebSocket mode, uses Transport's streaming support. + * + * @param path - The API path to call + * @param body - Optional request body (for POST requests) + * @param method - HTTP method (default: POST, use GET for process logs) + */ + protected async doStreamFetch( + path: string, + body?: unknown, + method: 'GET' | 'POST' = 'POST' + ): Promise> { + // WebSocket mode uses Transport's streaming directly + if (this.transport.getMode() === 'websocket') { + try { + return await this.transport.fetchStream(path, body, method); + } catch (error) { + this.logError(`stream ${method} ${path}`, error); + throw error; + } + } + + // HTTP mode: use doFetch + handleStreamResponse for proper error typing + const response = await this.doFetch(path, { + method, + headers: { 'Content-Type': 'application/json' }, + body: body && method === 'POST' ? JSON.stringify(body) : undefined + }); + + return this.handleStreamResponse(response); + } + /** * Utility method to log successful operations */ @@ -237,52 +254,4 @@ export abstract class BaseHttpClient { ); } } - - /** - * Check if response indicates a retryable container error - * - * The Sandbox DO returns proper HTTP status codes: - * - 503 Service Unavailable: Transient errors (container starting, port not ready) - * - 500 Internal Server Error: Permanent errors (bad config, missing image) - * - * We only retry on 503, which indicates the container is starting up. - * The Retry-After header suggests how long to wait. - * - * @param response - HTTP response to check - * @returns true if error is retryable (503), false otherwise - */ - private isRetryableContainerError(response: Response): boolean { - // 503 = transient, retry - // 500 = permanent, don't retry - // Everything else = not a container error - return response.status === 503; - } - - private async executeFetch( - path: string, - options?: RequestInit - ): Promise { - const url = this.options.stub - ? `http://localhost:${this.options.port}${path}` - : `${this.baseUrl}${path}`; - - try { - if (this.options.stub) { - return await this.options.stub.containerFetch( - url, - options || {}, - this.options.port - ); - } else { - return await fetch(url, options); - } - } catch (error) { - this.logger.error( - 'HTTP request error', - error instanceof Error ? error : new Error(String(error)), - { method: options?.method || 'GET', url } - ); - throw error; - } - } } diff --git a/packages/sandbox/src/clients/command-client.ts b/packages/sandbox/src/clients/command-client.ts index f97c0114..c4e88b99 100644 --- a/packages/sandbox/src/clients/command-client.ts +++ b/packages/sandbox/src/clients/command-client.ts @@ -105,15 +105,8 @@ export class CommandClient extends BaseHttpClient { ...(options?.cwd !== undefined && { cwd: options.cwd }) }; - const response = await this.doFetch('/api/execute/stream', { - method: 'POST', - headers: { - 'Content-Type': 'application/json' - }, - body: JSON.stringify(data) - }); - - const stream = await this.handleStreamResponse(response); + // Use doStreamFetch which handles both WebSocket and HTTP streaming + const stream = await this.doStreamFetch('/api/execute/stream', data); this.logSuccess('Command stream started', command); diff --git a/packages/sandbox/src/clients/file-client.ts b/packages/sandbox/src/clients/file-client.ts index ab99cfcc..af81c797 100644 --- a/packages/sandbox/src/clients/file-client.ts +++ b/packages/sandbox/src/clients/file-client.ts @@ -158,15 +158,8 @@ export class FileClient extends BaseHttpClient { sessionId }; - const response = await this.doFetch('/api/read/stream', { - method: 'POST', - headers: { - 'Content-Type': 'application/json' - }, - body: JSON.stringify(data) - }); - - const stream = await this.handleStreamResponse(response); + // Use doStreamFetch which handles both WebSocket and HTTP streaming + const stream = await this.doStreamFetch('/api/read/stream', data); this.logSuccess('File stream started', path); return stream; } catch (error) { diff --git a/packages/sandbox/src/clients/index.ts b/packages/sandbox/src/clients/index.ts index 840ef1ce..83b636f4 100644 --- a/packages/sandbox/src/clients/index.ts +++ b/packages/sandbox/src/clients/index.ts @@ -1,10 +1,45 @@ +// ============================================================================= // Main client exports +// ============================================================================= -// Command client types -export type { ExecuteRequest, ExecuteResponse } from './command-client'; +// Main aggregated client +export { SandboxClient } from './sandbox-client'; +// ============================================================================= // Domain-specific clients +// ============================================================================= + export { CommandClient } from './command-client'; +export { FileClient } from './file-client'; +export { GitClient } from './git-client'; +export { InterpreterClient } from './interpreter-client'; +export { PortClient } from './port-client'; +export { ProcessClient } from './process-client'; +export { UtilityClient } from './utility-client'; + +// ============================================================================= +// Transport layer +// ============================================================================= + +export type { + ITransport, + TransportConfig, + TransportMode, + TransportOptions +} from './transport'; +export { + BaseTransport, + createTransport, + HttpTransport, + WebSocketTransport +} from './transport'; + +// ============================================================================= +// Client types and interfaces +// ============================================================================= + +// Command client types +export type { ExecuteRequest, ExecuteResponse } from './command-client'; // File client types export type { FileOperationRequest, @@ -12,14 +47,10 @@ export type { ReadFileRequest, WriteFileRequest } from './file-client'; -export { FileClient } from './file-client'; // Git client types export type { GitCheckoutRequest, GitCheckoutResult } from './git-client'; -export { GitClient } from './git-client'; -export { - type ExecutionCallbacks, - InterpreterClient -} from './interpreter-client'; +// Interpreter client types +export type { ExecutionCallbacks } from './interpreter-client'; // Port client types export type { ExposePortRequest, @@ -28,7 +59,6 @@ export type { PortListResult, UnexposePortRequest } from './port-client'; -export { PortClient } from './port-client'; // Process client types export type { ProcessCleanupResult, @@ -39,9 +69,7 @@ export type { ProcessStartResult, StartProcessRequest } from './process-client'; -export { ProcessClient } from './process-client'; -export { SandboxClient } from './sandbox-client'; -// Types and interfaces +// Core types export type { BaseApiResponse, ContainerStub, @@ -51,6 +79,7 @@ export type { ResponseHandler, SessionRequest } from './types'; + // Utility client types export type { CommandsResponse, @@ -61,4 +90,3 @@ export type { PingResponse, VersionResponse } from './utility-client'; -export { UtilityClient } from './utility-client'; diff --git a/packages/sandbox/src/clients/interpreter-client.ts b/packages/sandbox/src/clients/interpreter-client.ts index 9ce9d8b6..4e9a31b0 100644 --- a/packages/sandbox/src/clients/interpreter-client.ts +++ b/packages/sandbox/src/clients/interpreter-client.ts @@ -104,31 +104,16 @@ export class InterpreterClient extends BaseHttpClient { timeoutMs?: number ): Promise { return this.executeWithRetry(async () => { - const response = await this.doFetch('/api/execute/code', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Accept: 'text/event-stream' - }, - body: JSON.stringify({ - context_id: contextId, - code, - language, - ...(timeoutMs !== undefined && { timeout_ms: timeoutMs }) - }) + // Use doStreamFetch which handles both WebSocket and HTTP streaming + const stream = await this.doStreamFetch('/api/execute/code', { + context_id: contextId, + code, + language, + ...(timeoutMs !== undefined && { timeout_ms: timeoutMs }) }); - if (!response.ok) { - const error = await this.parseErrorResponse(response); - throw error; - } - - if (!response.body) { - throw new Error('No response body for streaming execution'); - } - // Process streaming response - for await (const chunk of this.readLines(response.body)) { + for await (const chunk of this.readLines(stream)) { await this.parseExecutionResult(chunk, callbacks); } }); @@ -175,6 +160,22 @@ export class InterpreterClient extends BaseHttpClient { }); } + /** + * Get a raw stream for code execution. + * Used by CodeInterpreter.runCodeStreaming() for direct stream access. + */ + async streamCode( + contextId: string, + code: string, + language?: string + ): Promise> { + return this.doStreamFetch('/api/execute/code', { + context_id: contextId, + code, + language + }); + } + /** * Execute an operation with automatic retry for transient errors */ diff --git a/packages/sandbox/src/clients/process-client.ts b/packages/sandbox/src/clients/process-client.ts index 4dbb32de..1c308551 100644 --- a/packages/sandbox/src/clients/process-client.ts +++ b/packages/sandbox/src/clients/process-client.ts @@ -181,11 +181,8 @@ export class ProcessClient extends BaseHttpClient { ): Promise> { try { const url = `/api/process/${processId}/stream`; - const response = await this.doFetch(url, { - method: 'GET' - }); - - const stream = await this.handleStreamResponse(response); + // Use doStreamFetch with GET method (process log streaming is GET) + const stream = await this.doStreamFetch(url, undefined, 'GET'); this.logSuccess('Process log stream started', `ID: ${processId}`); diff --git a/packages/sandbox/src/clients/sandbox-client.ts b/packages/sandbox/src/clients/sandbox-client.ts index fa1599eb..67aca6f1 100644 --- a/packages/sandbox/src/clients/sandbox-client.ts +++ b/packages/sandbox/src/clients/sandbox-client.ts @@ -4,12 +4,23 @@ import { GitClient } from './git-client'; import { InterpreterClient } from './interpreter-client'; import { PortClient } from './port-client'; import { ProcessClient } from './process-client'; +import { + createTransport, + type ITransport, + type TransportMode +} from './transport'; import type { HttpClientOptions } from './types'; import { UtilityClient } from './utility-client'; /** * Main sandbox client that composes all domain-specific clients * Provides organized access to all sandbox functionality + * + * Supports two transport modes: + * - HTTP (default): Each request is a separate HTTP call + * - WebSocket: All requests multiplexed over a single connection + * + * WebSocket mode reduces sub-request count when running inside Workers/Durable Objects. */ export class SandboxClient { public readonly commands: CommandClient; @@ -20,11 +31,27 @@ export class SandboxClient { public readonly interpreter: InterpreterClient; public readonly utils: UtilityClient; + private transport: ITransport | null = null; + constructor(options: HttpClientOptions) { + // Create shared transport if WebSocket mode is enabled + if (options.transportMode === 'websocket' && options.wsUrl) { + this.transport = createTransport({ + mode: 'websocket', + wsUrl: options.wsUrl, + baseUrl: options.baseUrl, + logger: options.logger, + stub: options.stub, + port: options.port + }); + } + // Ensure baseUrl is provided for all clients const clientOptions: HttpClientOptions = { baseUrl: 'http://localhost:3000', - ...options + ...options, + // Share transport across all clients + transport: this.transport ?? options.transport }; // Initialize all domain clients with shared options @@ -36,4 +63,39 @@ export class SandboxClient { this.interpreter = new InterpreterClient(clientOptions); this.utils = new UtilityClient(clientOptions); } + + /** + * Get the current transport mode + */ + getTransportMode(): TransportMode { + return this.transport?.getMode() ?? 'http'; + } + + /** + * Check if WebSocket is connected (only relevant in WebSocket mode) + */ + isWebSocketConnected(): boolean { + return this.transport?.isConnected() ?? false; + } + + /** + * Connect WebSocket transport (no-op in HTTP mode) + * Called automatically on first request, but can be called explicitly + * to establish connection upfront. + */ + async connect(): Promise { + if (this.transport) { + await this.transport.connect(); + } + } + + /** + * Disconnect WebSocket transport (no-op in HTTP mode) + * Should be called when the sandbox is destroyed. + */ + disconnect(): void { + if (this.transport) { + this.transport.disconnect(); + } + } } diff --git a/packages/sandbox/src/clients/transport/base-transport.ts b/packages/sandbox/src/clients/transport/base-transport.ts new file mode 100644 index 00000000..68cde407 --- /dev/null +++ b/packages/sandbox/src/clients/transport/base-transport.ts @@ -0,0 +1,102 @@ +import type { Logger } from '@repo/shared'; +import { createNoOpLogger } from '@repo/shared'; +import type { ITransport, TransportConfig, TransportMode } from './types'; + +/** + * Container startup retry configuration + */ +const TIMEOUT_MS = 120_000; // 2 minutes total retry budget +const MIN_TIME_FOR_RETRY_MS = 15_000; // Need at least 15s remaining to retry + +/** + * Abstract base transport with shared retry logic + * + * Handles 503 retry for container startup - shared by all transports. + * Subclasses implement the transport-specific fetch and stream logic. + */ +export abstract class BaseTransport implements ITransport { + protected config: TransportConfig; + protected logger: Logger; + + constructor(config: TransportConfig) { + this.config = config; + this.logger = config.logger ?? createNoOpLogger(); + } + + abstract getMode(): TransportMode; + abstract connect(): Promise; + abstract disconnect(): void; + abstract isConnected(): boolean; + + /** + * Fetch with automatic retry for 503 (container starting) + * + * This is the primary entry point for making requests. It wraps the + * transport-specific doFetch() with retry logic for container startup. + */ + async fetch(path: string, options?: RequestInit): Promise { + const startTime = Date.now(); + let attempt = 0; + + while (true) { + const response = await this.doFetch(path, options); + + // Check for retryable 503 (container starting) + if (response.status === 503) { + const elapsed = Date.now() - startTime; + const remaining = TIMEOUT_MS - elapsed; + + if (remaining > MIN_TIME_FOR_RETRY_MS) { + const delay = Math.min(3000 * 2 ** attempt, 30000); + + this.logger.info('Container not ready, retrying', { + status: response.status, + attempt: attempt + 1, + delayMs: delay, + remainingSec: Math.floor(remaining / 1000), + mode: this.getMode() + }); + + await this.sleep(delay); + attempt++; + continue; + } + + this.logger.error( + 'Container failed to become ready', + new Error( + `Failed after ${attempt + 1} attempts over ${Math.floor(elapsed / 1000)}s` + ) + ); + } + + return response; + } + } + + /** + * Transport-specific fetch implementation (no retry) + * Subclasses implement the actual HTTP or WebSocket fetch. + */ + protected abstract doFetch( + path: string, + options?: RequestInit + ): Promise; + + /** + * Transport-specific stream implementation + * Subclasses implement HTTP SSE or WebSocket streaming. + */ + abstract fetchStream( + path: string, + body?: unknown, + method?: 'GET' | 'POST' + ): Promise>; + + /** + * Sleep utility for retry delays + */ + protected sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} diff --git a/packages/sandbox/src/clients/transport/factory.ts b/packages/sandbox/src/clients/transport/factory.ts new file mode 100644 index 00000000..019cde9b --- /dev/null +++ b/packages/sandbox/src/clients/transport/factory.ts @@ -0,0 +1,42 @@ +import { HttpTransport } from './http-transport'; +import type { ITransport, TransportConfig, TransportMode } from './types'; +import { WebSocketTransport } from './ws-transport'; + +/** + * Transport options with mode selection + */ +export interface TransportOptions extends TransportConfig { + /** Transport mode */ + mode: TransportMode; +} + +/** + * Create a transport instance based on mode + * + * This is the primary API for creating transports. It handles + * the selection of HTTP or WebSocket transport based on the mode. + * + * @example + * ```typescript + * // HTTP transport (default) + * const http = createTransport({ + * mode: 'http', + * baseUrl: 'http://localhost:3000' + * }); + * + * // WebSocket transport + * const ws = createTransport({ + * mode: 'websocket', + * wsUrl: 'ws://localhost:3000/ws' + * }); + * ``` + */ +export function createTransport(options: TransportOptions): ITransport { + switch (options.mode) { + case 'websocket': + return new WebSocketTransport(options); + + default: + return new HttpTransport(options); + } +} diff --git a/packages/sandbox/src/clients/transport/http-transport.ts b/packages/sandbox/src/clients/transport/http-transport.ts new file mode 100644 index 00000000..6bcdf1c1 --- /dev/null +++ b/packages/sandbox/src/clients/transport/http-transport.ts @@ -0,0 +1,101 @@ +import { BaseTransport } from './base-transport'; +import type { TransportConfig, TransportMode } from './types'; + +/** + * HTTP transport implementation + * + * Uses standard fetch API for communication with the container. + * HTTP is stateless, so connect/disconnect are no-ops. + */ +export class HttpTransport extends BaseTransport { + private baseUrl: string; + + constructor(config: TransportConfig) { + super(config); + this.baseUrl = config.baseUrl ?? 'http://localhost:3000'; + } + + getMode(): TransportMode { + return 'http'; + } + + async connect(): Promise { + // No-op for HTTP - stateless protocol + } + + disconnect(): void { + // No-op for HTTP - stateless protocol + } + + isConnected(): boolean { + return true; // HTTP is always "connected" + } + + protected async doFetch( + path: string, + options?: RequestInit + ): Promise { + const url = this.buildUrl(path); + + if (this.config.stub) { + return this.config.stub.containerFetch( + url, + options || {}, + this.config.port + ); + } + return globalThis.fetch(url, options); + } + + async fetchStream( + path: string, + body?: unknown, + method: 'GET' | 'POST' = 'POST' + ): Promise> { + const url = this.buildUrl(path); + const options = this.buildStreamOptions(body, method); + + let response: Response; + if (this.config.stub) { + response = await this.config.stub.containerFetch( + url, + options, + this.config.port + ); + } else { + response = await globalThis.fetch(url, options); + } + + if (!response.ok) { + const errorBody = await response.text(); + throw new Error(`HTTP error! status: ${response.status} - ${errorBody}`); + } + + if (!response.body) { + throw new Error('No response body for streaming'); + } + + return response.body; + } + + private buildUrl(path: string): string { + if (this.config.stub) { + return `http://localhost:${this.config.port}${path}`; + } + return `${this.baseUrl}${path}`; + } + + private buildStreamOptions( + body: unknown, + method: 'GET' | 'POST' + ): RequestInit { + return { + method, + headers: + body && method === 'POST' + ? { 'Content-Type': 'application/json' } + : undefined, + body: body && method === 'POST' ? JSON.stringify(body) : undefined + }; + } +} diff --git a/packages/sandbox/src/clients/transport/index.ts b/packages/sandbox/src/clients/transport/index.ts new file mode 100644 index 00000000..fa6ebbc2 --- /dev/null +++ b/packages/sandbox/src/clients/transport/index.ts @@ -0,0 +1,20 @@ +// ============================================================================= +// Types +// ============================================================================= + +export type { TransportOptions } from './factory'; +export type { ITransport, TransportConfig, TransportMode } from './types'; + +// ============================================================================= +// Implementations (for advanced use cases) +// ============================================================================= + +export { BaseTransport } from './base-transport'; +export { HttpTransport } from './http-transport'; +export { WebSocketTransport } from './ws-transport'; + +// ============================================================================= +// Factory (primary API) +// ============================================================================= + +export { createTransport } from './factory'; diff --git a/packages/sandbox/src/clients/transport/types.ts b/packages/sandbox/src/clients/transport/types.ts new file mode 100644 index 00000000..7eb57eb7 --- /dev/null +++ b/packages/sandbox/src/clients/transport/types.ts @@ -0,0 +1,77 @@ +import type { Logger } from '@repo/shared'; +import type { ContainerStub } from '../types'; + +/** + * Transport mode for SDK communication + */ +export type TransportMode = 'http' | 'websocket'; + +/** + * Configuration options for creating a transport + */ +export interface TransportConfig { + /** Base URL for HTTP requests */ + baseUrl?: string; + + /** WebSocket URL (required for WebSocket mode) */ + wsUrl?: string; + + /** Logger instance */ + logger?: Logger; + + /** Container stub for DO-internal requests */ + stub?: ContainerStub; + + /** Port number */ + port?: number; + + /** Request timeout in milliseconds */ + requestTimeoutMs?: number; + + /** Connection timeout in milliseconds (WebSocket only) */ + connectTimeoutMs?: number; +} + +/** + * Transport interface - all transports must implement this + * + * Provides a unified abstraction over HTTP and WebSocket communication. + * Both transports support fetch-compatible requests and streaming. + */ +export interface ITransport { + /** + * Make a fetch-compatible request + * @returns Standard Response object + */ + fetch(path: string, options?: RequestInit): Promise; + + /** + * Make a streaming request + * @returns ReadableStream for consuming SSE/streaming data + */ + fetchStream( + path: string, + body?: unknown, + method?: 'GET' | 'POST' + ): Promise>; + + /** + * Get the transport mode + */ + getMode(): TransportMode; + + /** + * Connect the transport (no-op for HTTP) + */ + connect(): Promise; + + /** + * Disconnect the transport (no-op for HTTP) + */ + disconnect(): void; + + /** + * Check if connected (always true for HTTP) + */ + isConnected(): boolean; +} diff --git a/packages/sandbox/src/clients/transport/ws-transport.ts b/packages/sandbox/src/clients/transport/ws-transport.ts new file mode 100644 index 00000000..ec5b0620 --- /dev/null +++ b/packages/sandbox/src/clients/transport/ws-transport.ts @@ -0,0 +1,599 @@ +import { + generateRequestId, + isWSError, + isWSResponse, + isWSStreamChunk, + type WSMethod, + type WSRequest, + type WSResponse, + type WSServerMessage, + type WSStreamChunk +} from '@repo/shared'; +import { BaseTransport } from './base-transport'; +import type { TransportConfig, TransportMode } from './types'; + +/** + * Pending request tracker for response matching + */ +interface PendingRequest { + resolve: (response: WSResponse) => void; + reject: (error: Error) => void; + streamController?: ReadableStreamDefaultController; + isStreaming: boolean; + timeoutId?: ReturnType; +} + +/** + * WebSocket transport state + */ +type WSTransportState = 'disconnected' | 'connecting' | 'connected' | 'error'; + +/** + * WebSocket transport implementation + * + * Multiplexes HTTP-like requests over a single WebSocket connection. + * Useful when running inside Workers/DO where sub-request limits apply. + */ +export class WebSocketTransport extends BaseTransport { + private ws: WebSocket | null = null; + private state: WSTransportState = 'disconnected'; + private pendingRequests: Map = new Map(); + private connectPromise: Promise | null = null; + + // Bound event handlers for proper add/remove + private boundHandleMessage: (event: MessageEvent) => void; + private boundHandleClose: (event: CloseEvent) => void; + + constructor(config: TransportConfig) { + super(config); + + if (!config.wsUrl) { + throw new Error('wsUrl is required for WebSocket transport'); + } + + // Bind handlers once in constructor + this.boundHandleMessage = this.handleMessage.bind(this); + this.boundHandleClose = this.handleClose.bind(this); + } + + getMode(): TransportMode { + return 'websocket'; + } + + /** + * Check if WebSocket is connected + */ + isConnected(): boolean { + return this.state === 'connected' && this.ws?.readyState === WebSocket.OPEN; + } + + /** + * Connect to the WebSocket server + * + * The connection promise is assigned synchronously so concurrent + * callers share the same connection attempt. + */ + async connect(): Promise { + // Already connected + if (this.isConnected()) { + return; + } + + // Connection in progress - wait for it + if (this.connectPromise) { + return this.connectPromise; + } + + // Assign synchronously so concurrent callers await the same promise + this.connectPromise = this.doConnect(); + + try { + await this.connectPromise; + } catch (error) { + // Clear promise AFTER await so concurrent callers see the same rejection + this.connectPromise = null; + throw error; + } + } + + /** + * Disconnect from the WebSocket server + */ + disconnect(): void { + this.cleanup(); + } + + /** + * Transport-specific fetch implementation + * Converts WebSocket response to standard Response object. + */ + protected async doFetch( + path: string, + options?: RequestInit + ): Promise { + await this.connect(); + + const method = (options?.method || 'GET') as WSMethod; + const body = this.parseBody(options?.body); + + const result = await this.request(method, path, body); + + return new Response(JSON.stringify(result.body), { + status: result.status, + headers: { 'Content-Type': 'application/json' } + }); + } + + /** + * Streaming fetch implementation + */ + async fetchStream( + path: string, + body?: unknown, + method: 'GET' | 'POST' = 'POST' + ): Promise> { + return this.requestStream(method, path, body); + } + + /** + * Parse request body from RequestInit + */ + private parseBody(body: RequestInit['body']): unknown { + if (!body) { + return undefined; + } + + if (typeof body === 'string') { + try { + return JSON.parse(body); + } catch (error) { + throw new Error( + `Request body must be valid JSON: ${error instanceof Error ? error.message : String(error)}` + ); + } + } + + throw new Error( + `WebSocket transport only supports string bodies. Got: ${typeof body}` + ); + } + + /** + * Internal connection logic + */ + private async doConnect(): Promise { + this.state = 'connecting'; + // Use fetch-based WebSocket for DO context (Workers style) + if (this.config.stub) { + await this.connectViaFetch(); + } else { + // Use standard WebSocket for browser/Node + await this.connectViaWebSocket(); + } + } + + /** + * Connect using fetch-based WebSocket (Cloudflare Workers style) + * This is required when running inside a Durable Object. + * + * Uses stub.fetch() which routes WebSocket upgrade requests through the + * parent Container class that supports the WebSocket protocol. + */ + private async connectViaFetch(): Promise { + const timeoutMs = this.config.connectTimeoutMs ?? 30000; + + // Create abort controller for timeout + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), timeoutMs); + + try { + // Build the WebSocket URL for the container + const wsPath = new URL(this.config.wsUrl!).pathname; + const httpUrl = `http://localhost:${this.config.port || 3000}${wsPath}`; + + // Create a Request with WebSocket upgrade headers + const request = new Request(httpUrl, { + headers: { + Upgrade: 'websocket', + Connection: 'Upgrade' + }, + signal: controller.signal + }); + + const response = await this.config.stub!.fetch(request); + + clearTimeout(timeout); + + // Check if upgrade was successful + if (response.status !== 101) { + throw new Error( + `WebSocket upgrade failed: ${response.status} ${response.statusText}` + ); + } + + // Get the WebSocket from the response (Workers-specific API) + const ws = (response as unknown as { webSocket?: WebSocket }).webSocket; + if (!ws) { + throw new Error('No WebSocket in upgrade response'); + } + + // Accept the WebSocket connection (Workers-specific) + (ws as unknown as { accept: () => void }).accept(); + + this.ws = ws; + this.state = 'connected'; + + // Set up event handlers + this.ws.addEventListener('close', this.boundHandleClose); + this.ws.addEventListener('message', this.boundHandleMessage); + + this.logger.debug('WebSocket connected via fetch', { + url: this.config.wsUrl + }); + } catch (error) { + clearTimeout(timeout); + this.state = 'error'; + this.logger.error( + 'WebSocket fetch connection failed', + error instanceof Error ? error : new Error(String(error)) + ); + throw error; + } + } + + /** + * Connect using standard WebSocket API (browser/Node style) + */ + private connectViaWebSocket(): Promise { + return new Promise((resolve, reject) => { + const timeoutMs = this.config.connectTimeoutMs ?? 30000; + const timeout = setTimeout(() => { + this.cleanup(); + reject(new Error(`WebSocket connection timeout after ${timeoutMs}ms`)); + }, timeoutMs); + + try { + this.ws = new WebSocket(this.config.wsUrl!); + + // One-time open handler for connection + const onOpen = () => { + clearTimeout(timeout); + this.ws?.removeEventListener('open', onOpen); + this.ws?.removeEventListener('error', onConnectError); + this.state = 'connected'; + this.logger.debug('WebSocket connected', { url: this.config.wsUrl }); + resolve(); + }; + + // One-time error handler for connection + const onConnectError = () => { + clearTimeout(timeout); + this.ws?.removeEventListener('open', onOpen); + this.ws?.removeEventListener('error', onConnectError); + this.state = 'error'; + this.logger.error( + 'WebSocket error', + new Error('WebSocket connection failed') + ); + reject(new Error('WebSocket connection failed')); + }; + + this.ws.addEventListener('open', onOpen); + this.ws.addEventListener('error', onConnectError); + this.ws.addEventListener('close', this.boundHandleClose); + this.ws.addEventListener('message', this.boundHandleMessage); + } catch (error) { + clearTimeout(timeout); + this.state = 'error'; + reject(error); + } + }); + } + + /** + * Send a request and wait for response + */ + private async request( + method: WSMethod, + path: string, + body?: unknown + ): Promise<{ status: number; body: T }> { + await this.connect(); + + const id = generateRequestId(); + const request: WSRequest = { + type: 'request', + id, + method, + path, + body + }; + + return new Promise((resolve, reject) => { + const timeoutMs = this.config.requestTimeoutMs ?? 120000; + const timeoutId = setTimeout(() => { + this.pendingRequests.delete(id); + reject( + new Error(`Request timeout after ${timeoutMs}ms: ${method} ${path}`) + ); + }, timeoutMs); + + this.pendingRequests.set(id, { + resolve: (response: WSResponse) => { + clearTimeout(timeoutId); + this.pendingRequests.delete(id); + resolve({ status: response.status, body: response.body as T }); + }, + reject: (error: Error) => { + clearTimeout(timeoutId); + this.pendingRequests.delete(id); + reject(error); + }, + isStreaming: false, + timeoutId + }); + + try { + this.send(request); + } catch (error) { + clearTimeout(timeoutId); + this.pendingRequests.delete(id); + reject(error instanceof Error ? error : new Error(String(error))); + } + }); + } + + /** + * Send a streaming request and return a ReadableStream + * + * The stream will receive data chunks as they arrive over the WebSocket. + * Format matches SSE for compatibility with existing streaming code. + */ + private async requestStream( + method: WSMethod, + path: string, + body?: unknown + ): Promise> { + await this.connect(); + + const id = generateRequestId(); + const request: WSRequest = { + type: 'request', + id, + method, + path, + body + }; + + return new ReadableStream({ + start: (controller) => { + const timeoutMs = this.config.requestTimeoutMs ?? 120000; + const timeoutId = setTimeout(() => { + this.pendingRequests.delete(id); + controller.error( + new Error(`Stream timeout after ${timeoutMs}ms: ${method} ${path}`) + ); + }, timeoutMs); + + this.pendingRequests.set(id, { + resolve: (response: WSResponse) => { + clearTimeout(timeoutId); + this.pendingRequests.delete(id); + // Final response - close the stream + if (response.status >= 400) { + controller.error( + new Error( + `Stream error: ${response.status} - ${JSON.stringify(response.body)}` + ) + ); + } else { + controller.close(); + } + }, + reject: (error: Error) => { + clearTimeout(timeoutId); + this.pendingRequests.delete(id); + controller.error(error); + }, + streamController: controller, + isStreaming: true, + timeoutId + }); + + try { + this.send(request); + } catch (error) { + clearTimeout(timeoutId); + this.pendingRequests.delete(id); + controller.error( + error instanceof Error ? error : new Error(String(error)) + ); + } + }, + cancel: () => { + const pending = this.pendingRequests.get(id); + if (pending?.timeoutId) { + clearTimeout(pending.timeoutId); + } + this.pendingRequests.delete(id); + // Could send a cancel message to server if needed + } + }); + } + + /** + * Send a message over the WebSocket + */ + private send(message: WSRequest): void { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket not connected'); + } + + this.ws.send(JSON.stringify(message)); + this.logger.debug('WebSocket sent', { + id: message.id, + method: message.method, + path: message.path + }); + } + + /** + * Handle incoming WebSocket messages + */ + private handleMessage(event: MessageEvent): void { + try { + const message = JSON.parse(event.data) as WSServerMessage; + + if (isWSResponse(message)) { + this.handleResponse(message); + } else if (isWSStreamChunk(message)) { + this.handleStreamChunk(message); + } else if (isWSError(message)) { + this.handleError(message); + } else { + this.logger.warn('Unknown WebSocket message type', { message }); + } + } catch (error) { + this.logger.error( + 'Failed to parse WebSocket message', + error instanceof Error ? error : new Error(String(error)) + ); + } + } + + /** + * Handle a response message + */ + private handleResponse(response: WSResponse): void { + const pending = this.pendingRequests.get(response.id); + if (!pending) { + this.logger.warn('Received response for unknown request', { + id: response.id + }); + return; + } + + this.logger.debug('WebSocket response', { + id: response.id, + status: response.status, + done: response.done + }); + + // Only resolve when done is true + if (response.done) { + pending.resolve(response); + } + } + + /** + * Handle a stream chunk message + */ + private handleStreamChunk(chunk: WSStreamChunk): void { + const pending = this.pendingRequests.get(chunk.id); + if (!pending || !pending.streamController) { + this.logger.warn('Received stream chunk for unknown request', { + id: chunk.id + }); + return; + } + + // Convert to SSE format for compatibility with existing parsers + const encoder = new TextEncoder(); + let sseData: string; + if (chunk.event) { + sseData = `event: ${chunk.event}\ndata: ${chunk.data}\n\n`; + } else { + sseData = `data: ${chunk.data}\n\n`; + } + + try { + pending.streamController.enqueue(encoder.encode(sseData)); + } catch (error) { + // Stream was cancelled or errored - clean up the pending request + this.logger.debug('Failed to enqueue stream chunk, cleaning up', { + id: chunk.id, + error: error instanceof Error ? error.message : String(error) + }); + // Clear timeout and remove from pending requests + if (pending.timeoutId) { + clearTimeout(pending.timeoutId); + } + this.pendingRequests.delete(chunk.id); + } + } + + /** + * Handle an error message + */ + private handleError(error: { + id?: string; + code: string; + message: string; + status: number; + }): void { + if (error.id) { + const pending = this.pendingRequests.get(error.id); + if (pending) { + pending.reject(new Error(`${error.code}: ${error.message}`)); + return; + } + } + + // Global error - log it + this.logger.error('WebSocket error message', new Error(error.message), { + code: error.code, + status: error.status + }); + } + + /** + * Handle WebSocket close + */ + private handleClose(event: CloseEvent): void { + this.state = 'disconnected'; + this.ws = null; + + const closeError = new Error( + `WebSocket closed: ${event.code} ${event.reason || 'No reason'}` + ); + + // Reject all pending requests, clear their timeouts, and error their stream controllers + for (const [, pending] of this.pendingRequests) { + // Clear timeout first to prevent memory leak + if (pending.timeoutId) { + clearTimeout(pending.timeoutId); + } + // Error stream controller if it exists + if (pending.streamController) { + try { + pending.streamController.error(closeError); + } catch { + // Stream may already be closed/errored + } + } + pending.reject(closeError); + } + this.pendingRequests.clear(); + } + + /** + * Cleanup resources + */ + private cleanup(): void { + if (this.ws) { + this.ws.removeEventListener('close', this.boundHandleClose); + this.ws.removeEventListener('message', this.boundHandleMessage); + this.ws.close(); + this.ws = null; + } + this.state = 'disconnected'; + this.connectPromise = null; + // Clear all pending request timeouts before clearing the map + for (const pending of this.pendingRequests.values()) { + if (pending.timeoutId) { + clearTimeout(pending.timeoutId); + } + } + this.pendingRequests.clear(); + } +} diff --git a/packages/sandbox/src/clients/types.ts b/packages/sandbox/src/clients/types.ts index b59d8af3..d3d098b3 100644 --- a/packages/sandbox/src/clients/types.ts +++ b/packages/sandbox/src/clients/types.ts @@ -1,4 +1,5 @@ import type { Logger } from '@repo/shared'; +import type { ITransport, TransportMode } from './transport'; /** * Minimal interface for container fetch functionality @@ -9,6 +10,12 @@ export interface ContainerStub { options: RequestInit, port?: number ): Promise; + + /** + * Fetch that can handle WebSocket upgrades (routes through parent Container class). + * Required for WebSocket transport to establish control plane connections. + */ + fetch(request: Request): Promise; } /** @@ -27,6 +34,25 @@ export interface HttpClientOptions { command: string ) => void; onError?: (error: string, command?: string) => void; + + /** + * Transport mode: 'http' (default) or 'websocket' + * WebSocket mode multiplexes all requests over a single connection, + * reducing sub-request count in Workers/Durable Objects. + */ + transportMode?: TransportMode; + + /** + * WebSocket URL for WebSocket transport mode. + * Required when transportMode is 'websocket'. + */ + wsUrl?: string; + + /** + * Shared transport instance (for internal use). + * When provided, clients will use this transport instead of creating their own. + */ + transport?: ITransport; } /** diff --git a/packages/sandbox/src/interpreter.ts b/packages/sandbox/src/interpreter.ts index ec42cc6f..e65d8b12 100644 --- a/packages/sandbox/src/interpreter.ts +++ b/packages/sandbox/src/interpreter.ts @@ -96,38 +96,12 @@ export class CodeInterpreter { context = await this.getOrCreateDefaultContext(language); } - // Create streaming response - // Note: doFetch is protected but we need direct access for raw stream response - const response = await (this.interpreterClient as any).doFetch( - '/api/execute/code', - { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Accept: 'text/event-stream' - }, - body: JSON.stringify({ - context_id: context.id, - code, - language: options.language - }) - } + // Use streamCode which handles both HTTP and WebSocket streaming + return this.interpreterClient.streamCode( + context.id, + code, + options.language ); - - if (!response.ok) { - const errorData = (await response - .json() - .catch(() => ({ error: 'Unknown error' }))) as { error?: string }; - throw new Error( - errorData.error || `Failed to execute code: ${response.status}` - ); - } - - if (!response.body) { - throw new Error('No response body for streaming execution'); - } - - return response.body; } /** diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 42d79850..2149223a 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -130,6 +130,7 @@ export class Sandbox extends Container implements ISandbox { private logger: ReturnType; private keepAliveEnabled: boolean = false; private activeMounts: Map = new Map(); + private transport: 'http' | 'websocket' = 'http'; /** * Default container startup timeouts (conservative for production) @@ -155,6 +156,21 @@ export class Sandbox extends Container implements ISandbox { */ private containerTimeouts = { ...this.DEFAULT_CONTAINER_TIMEOUTS }; + /** + * Create a SandboxClient with current transport settings + */ + private createSandboxClient(): SandboxClient { + return new SandboxClient({ + logger: this.logger, + port: 3000, + stub: this, + ...(this.transport === 'websocket' && { + transportMode: 'websocket' as const, + wsUrl: 'ws://localhost:3000/ws' + }) + }); + } + constructor(ctx: DurableObjectState<{}>, env: Env) { super(ctx, env); @@ -175,11 +191,18 @@ export class Sandbox extends Container implements ISandbox { sandboxId: this.ctx.id.toString() }); - this.client = new SandboxClient({ - logger: this.logger, - port: 3000, // Control plane port - stub: this - }); + // Read transport setting from env var + const transportEnv = envObj?.SANDBOX_TRANSPORT; + if (transportEnv === 'websocket') { + this.transport = 'websocket'; + } else if (transportEnv != null && transportEnv !== 'http') { + this.logger.warn( + `Invalid SANDBOX_TRANSPORT value: "${transportEnv}". Must be "http" or "websocket". Defaulting to "http".` + ); + } + + // Create client with transport based on env var (may be updated from storage) + this.client = this.createSandboxClient(); // Initialize code interpreter - pass 'this' after client is ready // The CodeInterpreter extracts client.interpreter from the sandbox @@ -673,6 +696,9 @@ export class Sandbox extends Container implements ISandbox { override async destroy(): Promise { this.logger.info('Destroying sandbox container'); + // Disconnect WebSocket transport if active + this.client.disconnect(); + // Unmount all mounted buckets and cleanup password files for (const [mountPath, mountInfo] of this.activeMounts.entries()) { if (mountInfo.mounted) { diff --git a/packages/sandbox/tests/base-client.test.ts b/packages/sandbox/tests/base-client.test.ts index 90fb9bfe..eaf58a93 100644 --- a/packages/sandbox/tests/base-client.test.ts +++ b/packages/sandbox/tests/base-client.test.ts @@ -292,7 +292,7 @@ describe('BaseHttpClient', () => { }) ); - const stub = { containerFetch: stubFetch }; + const stub = { containerFetch: stubFetch, fetch: vi.fn() }; const stubClient = new TestHttpClient({ baseUrl: 'http://test.com', port: 3000, @@ -316,7 +316,7 @@ describe('BaseHttpClient', () => { const stubFetch = vi .fn() .mockRejectedValue(new Error('Stub connection failed')); - const stub = { containerFetch: stubFetch }; + const stub = { containerFetch: stubFetch, fetch: vi.fn() }; const stubClient = new TestHttpClient({ baseUrl: 'http://test.com', port: 3000, diff --git a/packages/sandbox/tests/transport.test.ts b/packages/sandbox/tests/transport.test.ts new file mode 100644 index 00000000..b21c6e04 --- /dev/null +++ b/packages/sandbox/tests/transport.test.ts @@ -0,0 +1,198 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + createTransport, + HttpTransport, + WebSocketTransport +} from '../src/clients/transport'; + +describe('Transport', () => { + let mockFetch: ReturnType; + + beforeEach(() => { + vi.clearAllMocks(); + mockFetch = vi.fn(); + global.fetch = mockFetch as unknown as typeof fetch; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('HTTP mode', () => { + it('should create transport in HTTP mode by default', () => { + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000' + }); + + expect(transport.getMode()).toBe('http'); + }); + + it('should make HTTP GET request', async () => { + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000' + }); + + mockFetch.mockResolvedValue( + new Response(JSON.stringify({ data: 'test' }), { status: 200 }) + ); + + const response = await transport.fetch('/api/test', { method: 'GET' }); + + expect(response.status).toBe(200); + const body = await response.json(); + expect(body).toEqual({ data: 'test' }); + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/api/test', + expect.objectContaining({ method: 'GET' }) + ); + }); + + it('should make HTTP POST request with body', async () => { + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000' + }); + + mockFetch.mockResolvedValue( + new Response(JSON.stringify({ success: true }), { status: 200 }) + ); + + const response = await transport.fetch('/api/execute', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ command: 'echo hello' }) + }); + + expect(response.status).toBe(200); + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/api/execute', + expect.objectContaining({ + method: 'POST', + body: JSON.stringify({ command: 'echo hello' }) + }) + ); + }); + + it('should handle HTTP errors', async () => { + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000' + }); + + mockFetch.mockResolvedValue( + new Response(JSON.stringify({ error: 'Not found' }), { status: 404 }) + ); + + const response = await transport.fetch('/api/missing', { method: 'GET' }); + + expect(response.status).toBe(404); + }); + + it('should stream HTTP responses', async () => { + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000' + }); + + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: test\n\n')); + controller.close(); + } + }); + + mockFetch.mockResolvedValue( + new Response(mockStream, { + status: 200, + headers: { 'Content-Type': 'text/event-stream' } + }) + ); + + const stream = await transport.fetchStream('/api/stream', {}); + + expect(stream).toBeInstanceOf(ReadableStream); + }); + + it('should use stub.containerFetch when stub is provided', async () => { + const mockContainerFetch = vi + .fn() + .mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { status: 200 }) + ); + + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000', + stub: { containerFetch: mockContainerFetch, fetch: vi.fn() }, + port: 3000 + }); + + await transport.fetch('/api/test', { method: 'GET' }); + + expect(mockContainerFetch).toHaveBeenCalledWith( + 'http://localhost:3000/api/test', + expect.any(Object), + 3000 + ); + expect(mockFetch).not.toHaveBeenCalled(); + }); + }); + + describe('WebSocket mode', () => { + // Note: Full WebSocket tests are in ws-transport.test.ts + // These tests verify the Transport wrapper behavior + + it('should create transport in WebSocket mode', () => { + const transport = createTransport({ + mode: 'websocket', + wsUrl: 'ws://localhost:3000/ws' + }); + + expect(transport.getMode()).toBe('websocket'); + }); + + it('should report WebSocket connection state', () => { + const transport = createTransport({ + mode: 'websocket', + wsUrl: 'ws://localhost:3000/ws' + }); + + // Initially not connected + expect(transport.isConnected()).toBe(false); + }); + + it('should throw error when wsUrl is missing', () => { + // When wsUrl is missing, WebSocket transport throws an error + expect(() => { + createTransport({ + mode: 'websocket' + // wsUrl missing - should throw + }); + }).toThrow('wsUrl is required for WebSocket transport'); + }); + }); + + describe('createTransport factory', () => { + it('should create HTTP transport with minimal options', () => { + const transport = createTransport({ + mode: 'http', + baseUrl: 'http://localhost:3000' + }); + + expect(transport).toBeInstanceOf(HttpTransport); + expect(transport.getMode()).toBe('http'); + }); + + it('should create WebSocket transport with URL', () => { + const transport = createTransport({ + mode: 'websocket', + wsUrl: 'ws://localhost:3000/ws' + }); + + expect(transport).toBeInstanceOf(WebSocketTransport); + expect(transport.getMode()).toBe('websocket'); + }); + }); +}); diff --git a/packages/sandbox/tests/ws-transport.test.ts b/packages/sandbox/tests/ws-transport.test.ts new file mode 100644 index 00000000..7d66049f --- /dev/null +++ b/packages/sandbox/tests/ws-transport.test.ts @@ -0,0 +1,258 @@ +import type { + WSError, + WSRequest, + WSResponse, + WSStreamChunk +} from '@repo/shared'; +import { + generateRequestId, + isWSError, + isWSRequest, + isWSResponse, + isWSStreamChunk +} from '@repo/shared'; +import { describe, expect, it } from 'vitest'; +import { WebSocketTransport } from '../src/clients/transport'; + +/** + * Tests for WebSocket protocol types and the WebSocketTransport class. + * + * Testing Strategy: + * - Protocol tests (type guards, serialization): Full unit test coverage here + * - WebSocketTransport class tests: Limited unit tests for non-connection behavior, + * plus comprehensive E2E tests in tests/e2e/websocket-transport.test.ts + * + * Why limited WebSocketTransport unit tests: + * - Tests run in Workers runtime (vitest-pool-workers) where mocking WebSocket + * is complex and error-prone + * - The WebSocketTransport class is tightly coupled to WebSocket - most methods + * require an active connection + * - E2E tests verify the complete request/response cycle, error handling, + * streaming, and cleanup against a real container + */ +describe('WebSocket Protocol Types', () => { + describe('generateRequestId', () => { + it('should generate unique request IDs', () => { + const id1 = generateRequestId(); + const id2 = generateRequestId(); + const id3 = generateRequestId(); + + expect(id1).toMatch(/^ws_\d+_[a-z0-9]+$/); + expect(id2).toMatch(/^ws_\d+_[a-z0-9]+$/); + expect(id3).toMatch(/^ws_\d+_[a-z0-9]+$/); + + // All should be unique + expect(new Set([id1, id2, id3]).size).toBe(3); + }); + + it('should include timestamp in ID', () => { + const before = Date.now(); + const id = generateRequestId(); + const after = Date.now(); + + // Extract timestamp from ID (format: ws__) + const parts = id.split('_'); + const timestamp = parseInt(parts[1], 10); + + expect(timestamp).toBeGreaterThanOrEqual(before); + expect(timestamp).toBeLessThanOrEqual(after); + }); + }); + + describe('isWSRequest', () => { + it('should return true for valid WSRequest', () => { + const request: WSRequest = { + type: 'request', + id: 'req-123', + method: 'POST', + path: '/api/execute', + body: { command: 'echo hello' } + }; + + expect(isWSRequest(request)).toBe(true); + }); + + it('should return true for minimal WSRequest', () => { + const request = { + type: 'request', + id: 'req-456', + method: 'GET', + path: '/api/health' + }; + + expect(isWSRequest(request)).toBe(true); + }); + + it('should return false for non-request types', () => { + expect(isWSRequest(null)).toBe(false); + expect(isWSRequest(undefined)).toBe(false); + expect(isWSRequest('string')).toBe(false); + expect(isWSRequest({ type: 'response' })).toBe(false); + expect(isWSRequest({ type: 'error' })).toBe(false); + }); + }); + + describe('isWSResponse', () => { + it('should return true for valid WSResponse', () => { + const response: WSResponse = { + type: 'response', + id: 'req-123', + status: 200, + body: { data: 'test' }, + done: true + }; + + expect(isWSResponse(response)).toBe(true); + }); + + it('should return true for minimal WSResponse', () => { + const response = { + type: 'response', + id: 'req-456', + status: 404, + done: false + }; + + expect(isWSResponse(response)).toBe(true); + }); + + it('should return false for non-response types', () => { + expect(isWSResponse(null)).toBe(false); + expect(isWSResponse(undefined)).toBe(false); + expect(isWSResponse('string')).toBe(false); + expect(isWSResponse({ type: 'error' })).toBe(false); + expect(isWSResponse({ type: 'stream' })).toBe(false); + expect(isWSResponse({ type: 'request' })).toBe(false); + }); + }); + + describe('isWSError', () => { + it('should return true for valid WSError', () => { + const error: WSError = { + type: 'error', + id: 'req-123', + code: 'NOT_FOUND', + message: 'Resource not found', + status: 404 + }; + + expect(isWSError(error)).toBe(true); + }); + + it('should return true for WSError without id', () => { + const error = { + type: 'error', + code: 'PARSE_ERROR', + message: 'Invalid JSON', + status: 400 + }; + + expect(isWSError(error)).toBe(true); + }); + + it('should return false for non-error types', () => { + expect(isWSError(null)).toBe(false); + expect(isWSError(undefined)).toBe(false); + expect(isWSError({ type: 'response' })).toBe(false); + expect(isWSError({ type: 'stream' })).toBe(false); + }); + }); + + describe('isWSStreamChunk', () => { + it('should return true for valid WSStreamChunk', () => { + const chunk: WSStreamChunk = { + type: 'stream', + id: 'req-123', + data: 'chunk data' + }; + + expect(isWSStreamChunk(chunk)).toBe(true); + }); + + it('should return true for WSStreamChunk with event', () => { + const chunk = { + type: 'stream', + id: 'req-456', + event: 'output', + data: 'line of output' + }; + + expect(isWSStreamChunk(chunk)).toBe(true); + }); + + it('should return false for non-stream types', () => { + expect(isWSStreamChunk(null)).toBe(false); + expect(isWSStreamChunk({ type: 'response' })).toBe(false); + expect(isWSStreamChunk({ type: 'error' })).toBe(false); + }); + }); +}); + +describe('WebSocketTransport', () => { + describe('initial state', () => { + it('should not be connected after construction', () => { + const transport = new WebSocketTransport({ + wsUrl: 'ws://localhost:3000/ws' + }); + expect(transport.isConnected()).toBe(false); + }); + + it('should accept custom options', () => { + const transport = new WebSocketTransport({ + wsUrl: 'ws://localhost:3000/ws', + connectTimeoutMs: 5000, + requestTimeoutMs: 60000 + }); + expect(transport.isConnected()).toBe(false); + }); + + it('should throw if wsUrl is missing', () => { + expect(() => { + new WebSocketTransport({}); + }).toThrow('wsUrl is required for WebSocket transport'); + }); + }); + + describe('disconnect', () => { + it('should be safe to call disconnect when not connected', () => { + const transport = new WebSocketTransport({ + wsUrl: 'ws://localhost:3000/ws' + }); + // Should not throw + transport.disconnect(); + expect(transport.isConnected()).toBe(false); + }); + + it('should be safe to call disconnect multiple times', () => { + const transport = new WebSocketTransport({ + wsUrl: 'ws://localhost:3000/ws' + }); + transport.disconnect(); + transport.disconnect(); + transport.disconnect(); + expect(transport.isConnected()).toBe(false); + }); + }); + + describe('fetch without connection', () => { + it('should attempt to connect when making a fetch request', async () => { + const transport = new WebSocketTransport({ + wsUrl: 'ws://invalid-url:9999/ws', + connectTimeoutMs: 100 + }); + + // Fetch should fail because connection fails + await expect(transport.fetch('/test')).rejects.toThrow(); + }); + + it('should attempt to connect when making a stream request', async () => { + const transport = new WebSocketTransport({ + wsUrl: 'ws://invalid-url:9999/ws', + connectTimeoutMs: 100 + }); + + // Stream request should fail because connection fails + await expect(transport.fetchStream('/test')).rejects.toThrow(); + }); + }); +}); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index a72d04c7..6301139c 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -117,3 +117,20 @@ export { isProcessStatus, isTerminalStatus } from './types.js'; +// Export WebSocket protocol types +export type { + WSClientMessage, + WSError, + WSMethod, + WSRequest, + WSResponse, + WSServerMessage, + WSStreamChunk +} from './ws-types.js'; +export { + generateRequestId, + isWSError, + isWSRequest, + isWSResponse, + isWSStreamChunk +} from './ws-types.js'; diff --git a/packages/shared/src/ws-types.ts b/packages/shared/src/ws-types.ts new file mode 100644 index 00000000..9c8e8506 --- /dev/null +++ b/packages/shared/src/ws-types.ts @@ -0,0 +1,176 @@ +/** + * WebSocket transport protocol types + * + * Enables multiplexing HTTP-like requests over a single WebSocket connection. + * This reduces sub-request count when running inside Workers/Durable Objects. + * + * Protocol: + * - Client sends WSRequest messages + * - Server responds with WSResponse messages (matched by id) + * - For streaming endpoints, server sends multiple WSStreamChunk messages + * followed by a final WSResponse + */ + +/** + * HTTP methods supported over WebSocket + */ +export type WSMethod = 'GET' | 'POST' | 'PUT' | 'DELETE'; + +/** + * WebSocket request message sent from client to server + */ +export interface WSRequest { + /** Message type discriminator */ + type: 'request'; + + /** Unique request ID for response matching */ + id: string; + + /** HTTP method */ + method: WSMethod; + + /** Request path (e.g., '/api/execute', '/api/read') */ + path: string; + + /** Request body (for POST/PUT requests) */ + body?: unknown; + + /** Request headers (optional, for special cases) */ + headers?: Record; +} + +/** + * WebSocket response message sent from server to client + */ +export interface WSResponse { + /** Message type discriminator */ + type: 'response'; + + /** Request ID this response corresponds to */ + id: string; + + /** HTTP status code */ + status: number; + + /** Response body (JSON parsed) */ + body?: unknown; + + /** Whether this is the final response (for streaming, false until complete) */ + done: boolean; +} + +/** + * WebSocket stream chunk for streaming responses (SSE replacement) + * Sent for streaming endpoints like /api/execute/stream, /api/read/stream + */ +export interface WSStreamChunk { + /** Message type discriminator */ + type: 'stream'; + + /** Request ID this chunk belongs to */ + id: string; + + /** Stream event type (matches SSE event types) */ + event?: string; + + /** Chunk data */ + data: string; +} + +/** + * WebSocket error response + */ +export interface WSError { + /** Message type discriminator */ + type: 'error'; + + /** Request ID this error corresponds to (if available) */ + id?: string; + + /** Error code */ + code: string; + + /** Error message */ + message: string; + + /** HTTP status code equivalent */ + status: number; + + /** Additional error context */ + context?: Record; +} + +/** + * Union type for all WebSocket messages from server to client + */ +export type WSServerMessage = WSResponse | WSStreamChunk | WSError; + +/** + * Union type for all WebSocket messages from client to server + */ +export type WSClientMessage = WSRequest; + +/** + * Type guard for WSRequest + * + * Note: Only validates the discriminator field (type === 'request'). + * Does not validate other required fields (id, method, path). + * Use for routing messages; trust TypeScript for field validation. + */ +export function isWSRequest(msg: unknown): msg is WSRequest { + return ( + typeof msg === 'object' && + msg !== null && + 'type' in msg && + (msg as WSRequest).type === 'request' + ); +} + +/** + * Type guard for WSResponse + * + * Note: Only validates the discriminator field (type === 'response'). + */ +export function isWSResponse(msg: unknown): msg is WSResponse { + return ( + typeof msg === 'object' && + msg !== null && + 'type' in msg && + (msg as WSResponse).type === 'response' + ); +} + +/** + * Type guard for WSStreamChunk + * + * Note: Only validates the discriminator field (type === 'stream'). + */ +export function isWSStreamChunk(msg: unknown): msg is WSStreamChunk { + return ( + typeof msg === 'object' && + msg !== null && + 'type' in msg && + (msg as WSStreamChunk).type === 'stream' + ); +} + +/** + * Type guard for WSError + * + * Note: Only validates the discriminator field (type === 'error'). + */ +export function isWSError(msg: unknown): msg is WSError { + return ( + typeof msg === 'object' && + msg !== null && + 'type' in msg && + (msg as WSError).type === 'error' + ); +} + +/** + * Generate a unique request ID + */ +export function generateRequestId(): string { + return `ws_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; +} diff --git a/tests/e2e/helpers/wrangler-runner.ts b/tests/e2e/helpers/wrangler-runner.ts index d7042c25..97565898 100644 --- a/tests/e2e/helpers/wrangler-runner.ts +++ b/tests/e2e/helpers/wrangler-runner.ts @@ -88,7 +88,7 @@ export class WranglerDevRunner { // Check for ready pattern: "Ready on http://..." const match = output.match(/Ready on (?https?:\/\/[^\s]+)/); - if (match && match.groups?.url && !this.url) { + if (match?.groups?.url && !this.url) { this.url = match.groups.url; clearTimeout(timeoutId); resolve(this.url); diff --git a/tests/e2e/test-worker/Dockerfile.standalone b/tests/e2e/test-worker/Dockerfile.standalone index 53a5ef72..a8a5a4af 100644 --- a/tests/e2e/test-worker/Dockerfile.standalone +++ b/tests/e2e/test-worker/Dockerfile.standalone @@ -1,5 +1,9 @@ # Test the standalone binary pattern with an arbitrary base image # This validates that users can add sandbox capabilities to any Docker image + +ARG BASE_IMAGE=cloudflare/sandbox-test:0.6.6 +FROM ${BASE_IMAGE} AS sandbox-source + FROM node:20-slim # Install dependencies required by the SDK @@ -11,7 +15,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && rm -rf /var/lib/apt/lists/* # Copy standalone binary from sandbox image -COPY --from=cloudflare/sandbox-test:0.6.6 /container-server/sandbox /sandbox +COPY --from=sandbox-source /container-server/sandbox /sandbox # Copy startup script for CMD passthrough testing COPY startup-test.sh /startup-test.sh diff --git a/tests/e2e/test-worker/generate-config.sh b/tests/e2e/test-worker/generate-config.sh index 0bbd094b..aaca85bc 100755 --- a/tests/e2e/test-worker/generate-config.sh +++ b/tests/e2e/test-worker/generate-config.sh @@ -2,25 +2,71 @@ set -e # Generate wrangler.jsonc from template -# Usage: ./generate-config.sh [container-name] +# Usage: ./generate-config.sh [container-name] [transport] [image-mode] # -# If container-name is not provided, it defaults to worker-name +# Arguments: +# worker-name - Name of the worker (required) +# container-name - Name prefix for containers (defaults to worker-name) +# transport - Transport mode: http or websocket (defaults to http) +# image-mode - Image source mode (defaults to local): +# "local" - Use local Dockerfiles (for local dev) +# "registry:" - Use Cloudflare registry images (for CI) +# Requires CLOUDFLARE_ACCOUNT_ID env var WORKER_NAME="${1:-sandbox-e2e-test-worker-local}" CONTAINER_NAME="${2:-$WORKER_NAME}" +TRANSPORT="${3:-http}" +IMAGE_MODE="${4:-local}" if [ -z "$WORKER_NAME" ]; then echo "Error: WORKER_NAME is required" - echo "Usage: ./generate-config.sh [container-name]" + echo "Usage: ./generate-config.sh [container-name] [transport] [image-mode]" exit 1 fi echo "Generating wrangler.jsonc..." echo " Worker name: $WORKER_NAME" echo " Container name: $CONTAINER_NAME" +echo " Transport: $TRANSPORT" +echo " Image mode: $IMAGE_MODE" + +# Determine image references based on mode +if [[ "$IMAGE_MODE" == "local" ]]; then + IMAGE_SANDBOX="./Dockerfile" + IMAGE_PYTHON="./Dockerfile.python" + IMAGE_OPENCODE="./Dockerfile.opencode" + IMAGE_STANDALONE="./Dockerfile.standalone" +elif [[ "$IMAGE_MODE" == registry:* ]]; then + TAG="${IMAGE_MODE#registry:}" + if [ -z "$CLOUDFLARE_ACCOUNT_ID" ]; then + echo "Error: CLOUDFLARE_ACCOUNT_ID env var required for registry mode" + exit 1 + fi + IMAGE_SANDBOX="registry.cloudflare.com/$CLOUDFLARE_ACCOUNT_ID/sandbox:$TAG" + IMAGE_PYTHON="registry.cloudflare.com/$CLOUDFLARE_ACCOUNT_ID/sandbox-python:$TAG" + IMAGE_OPENCODE="registry.cloudflare.com/$CLOUDFLARE_ACCOUNT_ID/sandbox-opencode:$TAG" + IMAGE_STANDALONE="registry.cloudflare.com/$CLOUDFLARE_ACCOUNT_ID/sandbox-standalone:$TAG" +else + echo "Error: Unknown image mode: $IMAGE_MODE" + echo "Use 'local' or 'registry:'" + exit 1 +fi + +echo " Images:" +echo " Sandbox: $IMAGE_SANDBOX" +echo " Python: $IMAGE_PYTHON" +echo " Opencode: $IMAGE_OPENCODE" +echo " Standalone: $IMAGE_STANDALONE" # Read template and replace placeholders -sed "s/{{WORKER_NAME}}/$WORKER_NAME/g; s/{{CONTAINER_NAME}}/$CONTAINER_NAME/g" \ +# Using | as delimiter since image URLs contain / +sed -e "s|{{WORKER_NAME}}|$WORKER_NAME|g" \ + -e "s|{{CONTAINER_NAME}}|$CONTAINER_NAME|g" \ + -e "s|{{TRANSPORT}}|$TRANSPORT|g" \ + -e "s|{{IMAGE_SANDBOX}}|$IMAGE_SANDBOX|g" \ + -e "s|{{IMAGE_PYTHON}}|$IMAGE_PYTHON|g" \ + -e "s|{{IMAGE_OPENCODE}}|$IMAGE_OPENCODE|g" \ + -e "s|{{IMAGE_STANDALONE}}|$IMAGE_STANDALONE|g" \ wrangler.template.jsonc > wrangler.jsonc echo "✅ Generated wrangler.jsonc" diff --git a/tests/e2e/test-worker/index.ts b/tests/e2e/test-worker/index.ts index 34eb4fc0..306b4343 100644 --- a/tests/e2e/test-worker/index.ts +++ b/tests/e2e/test-worker/index.ts @@ -84,6 +84,7 @@ export default { } else { sandboxNamespace = env.Sandbox; } + const sandbox = getSandbox(sandboxNamespace, sandboxId, { keepAlive }); diff --git a/tests/e2e/test-worker/wrangler.template.jsonc b/tests/e2e/test-worker/wrangler.template.jsonc index e873cf50..ef1dde4d 100644 --- a/tests/e2e/test-worker/wrangler.template.jsonc +++ b/tests/e2e/test-worker/wrangler.template.jsonc @@ -7,7 +7,8 @@ "vars": { "SANDBOX_LOG_LEVEL": "debug", - "SANDBOX_LOG_FORMAT": "pretty" + "SANDBOX_LOG_FORMAT": "pretty", + "SANDBOX_TRANSPORT": "{{TRANSPORT}}" }, "observability": { "enabled": true @@ -16,22 +17,22 @@ "containers": [ { "class_name": "Sandbox", - "image": "./Dockerfile", + "image": "{{IMAGE_SANDBOX}}", "name": "{{CONTAINER_NAME}}" }, { "class_name": "SandboxPython", - "image": "./Dockerfile.python", + "image": "{{IMAGE_PYTHON}}", "name": "{{CONTAINER_NAME}}-python" }, { "class_name": "SandboxOpencode", - "image": "./Dockerfile.opencode", + "image": "{{IMAGE_OPENCODE}}", "name": "{{CONTAINER_NAME}}-opencode" }, { "class_name": "SandboxStandalone", - "image": "./Dockerfile.standalone", + "image": "{{IMAGE_STANDALONE}}", "name": "{{CONTAINER_NAME}}-standalone" } ], diff --git a/tests/e2e/websocket-connect.test.ts b/tests/e2e/websocket-connect.test.ts index c473c1d6..4d342977 100644 --- a/tests/e2e/websocket-connect.test.ts +++ b/tests/e2e/websocket-connect.test.ts @@ -27,7 +27,7 @@ describe('WebSocket Connections', () => { }, 120000); test('should establish WebSocket connection and echo messages', async () => { - const wsUrl = workerUrl.replace(/^http/, 'ws') + '/ws/echo'; + const wsUrl = `${workerUrl.replace(/^http/, 'ws')}/ws/echo`; const ws = new WebSocket(wsUrl, { headers: { 'X-Sandbox-Id': sandboxId } }); @@ -52,7 +52,7 @@ describe('WebSocket Connections', () => { }, 20000); test('should handle multiple concurrent connections', async () => { - const wsUrl = workerUrl.replace(/^http/, 'ws') + '/ws/echo'; + const wsUrl = `${workerUrl.replace(/^http/, 'ws')}/ws/echo`; // Open 3 connections const connections = [1, 2, 3].map( @@ -84,6 +84,8 @@ describe('WebSocket Connections', () => { expect(results).toEqual(['Message 1', 'Message 2', 'Message 3']); - connections.forEach((ws) => ws.close()); + for (const ws of connections) { + ws.close(); + } }, 20000); });