diff --git a/.changeset/session-locking.md b/.changeset/session-locking.md new file mode 100644 index 00000000..15f366f8 --- /dev/null +++ b/.changeset/session-locking.md @@ -0,0 +1,5 @@ +--- +'@cloudflare/sandbox': patch +--- + +Add per-session mutex locking to prevent concurrent command execution race conditions diff --git a/packages/sandbox-container/src/core/types.ts b/packages/sandbox-container/src/core/types.ts index 69396192..61c82b37 100644 --- a/packages/sandbox-container/src/core/types.ts +++ b/packages/sandbox-container/src/core/types.ts @@ -56,6 +56,18 @@ export interface ServiceError { details?: Record; } +/** + * Helper functions to construct ServiceResult with proper typing. + * Use these instead of manual object construction to avoid type casts. + */ +export function serviceSuccess(data: T): ServiceResult { + return { success: true, data } as ServiceResult; +} + +export function serviceError(error: ServiceError): ServiceResult { + return { success: false, error } as ServiceResult; +} + // Handler error response structure - matches BaseHandler.createErrorResponse() export interface HandlerErrorResponse { success: false; diff --git a/packages/sandbox-container/src/managers/git-manager.ts b/packages/sandbox-container/src/managers/git-manager.ts index 847c22a8..825552de 100644 --- a/packages/sandbox-container/src/managers/git-manager.ts +++ b/packages/sandbox-container/src/managers/git-manager.ts @@ -15,6 +15,7 @@ * NO I/O operations - all infrastructure delegated to SessionManager via GitService */ +import { ErrorCode } from '@repo/shared/errors'; import type { CloneOptions } from '../core/types'; /** @@ -139,25 +140,26 @@ export class GitManager { } /** - * Determine appropriate error code based on operation and error + * Determine appropriate error code based on operation and error. + * Returns valid ErrorCode enum values for use with withSession error handling. */ determineErrorCode( operation: string, error: Error | string, exitCode?: number - ): string { + ): ErrorCode { const errorMessage = typeof error === 'string' ? error : error.message; const lowerMessage = errorMessage.toLowerCase(); // Check exit code patterns first if (exitCode === 128) { if (lowerMessage.includes('not a git repository')) { - return 'NOT_A_GIT_REPO'; + return ErrorCode.GIT_OPERATION_FAILED; } if (lowerMessage.includes('repository not found')) { - return 'REPO_NOT_FOUND'; + return ErrorCode.GIT_REPOSITORY_NOT_FOUND; } - return 'GIT_COMMAND_ERROR'; + return ErrorCode.GIT_OPERATION_FAILED; } // Common error patterns @@ -165,46 +167,45 @@ export class GitManager { lowerMessage.includes('permission denied') || lowerMessage.includes('access denied') ) { - return 'GIT_PERMISSION_DENIED'; + return ErrorCode.GIT_AUTH_FAILED; } if ( lowerMessage.includes('not found') || lowerMessage.includes('does not exist') ) { - return 'GIT_NOT_FOUND'; + return ErrorCode.GIT_REPOSITORY_NOT_FOUND; } if (lowerMessage.includes('already exists')) { - return 'GIT_ALREADY_EXISTS'; + return ErrorCode.GIT_CLONE_FAILED; } if ( lowerMessage.includes('did not match') || lowerMessage.includes('pathspec') ) { - return 'GIT_INVALID_REF'; + return ErrorCode.GIT_BRANCH_NOT_FOUND; } if ( lowerMessage.includes('authentication') || lowerMessage.includes('credentials') ) { - return 'GIT_AUTH_FAILED'; + return ErrorCode.GIT_AUTH_FAILED; } // Operation-specific defaults switch (operation) { case 'clone': - return 'GIT_CLONE_FAILED'; + return ErrorCode.GIT_CLONE_FAILED; case 'checkout': - return 'GIT_CHECKOUT_FAILED'; + return ErrorCode.GIT_CHECKOUT_FAILED; case 'getCurrentBranch': - return 'GIT_BRANCH_ERROR'; case 'listBranches': - return 'GIT_BRANCH_LIST_ERROR'; + return ErrorCode.GIT_OPERATION_FAILED; default: - return 'GIT_OPERATION_ERROR'; + return ErrorCode.GIT_OPERATION_FAILED; } } diff --git a/packages/sandbox-container/src/services/file-service.ts b/packages/sandbox-container/src/services/file-service.ts index 8aaf5088..1f1d1add 100644 --- a/packages/sandbox-container/src/services/file-service.ts +++ b/packages/sandbox-container/src/services/file-service.ts @@ -97,229 +97,152 @@ export class FileService implements FileSystemOperations { }; } - // 2. Check if file exists using session-aware check - const existsResult = await this.exists(path, sessionId); - if (!existsResult.success) { - return { - success: false, - error: existsResult.error - }; - } - - if (!existsResult.data) { - return { - success: false, - error: { - message: `File not found: ${path}`, - code: ErrorCode.FILE_NOT_FOUND, - details: { - path, - operation: Operation.FILE_READ - } satisfies FileNotFoundContext - } - }; - } - - // 3. Get file size using stat + // 2. Execute exists→stat→mime→cat sequence atomically within session const escapedPath = shellEscape(path); - const statCommand = `stat -c '%s' ${escapedPath} 2>/dev/null`; - const statResult = await this.sessionManager.executeInSession( - sessionId, - statCommand - ); - - if (!statResult.success) { - return { - success: false, - error: { - message: `Failed to get file size for '${path}'`, - code: ErrorCode.FILESYSTEM_ERROR, - details: { - path, - operation: Operation.FILE_READ, - stderr: 'Command execution failed' - } satisfies FileSystemContext - } - }; - } - - if (statResult.data.exitCode !== 0) { - return { - success: false, - error: { - message: `Failed to get file size for '${path}'`, - code: ErrorCode.FILESYSTEM_ERROR, - details: { - path, - operation: Operation.FILE_READ, - stderr: statResult.data.stderr - } satisfies FileSystemContext - } - }; - } - const fileSize = parseInt(statResult.data.stdout.trim(), 10); - - if (Number.isNaN(fileSize)) { - return { - success: false, - error: { - message: `Failed to parse file size for '${path}': invalid stat output`, - code: ErrorCode.FILESYSTEM_ERROR, - details: { - path, - operation: Operation.FILE_READ, - stderr: `Unexpected stat output: ${statResult.data.stdout}` - } satisfies FileSystemContext - } - }; - } - - // 4. Detect MIME type using file command - const mimeCommand = `file --mime-type -b ${escapedPath}`; - const mimeResult = await this.sessionManager.executeInSession( - sessionId, - mimeCommand - ); - - if (!mimeResult.success) { - return { - success: false, - error: { - message: `Failed to detect MIME type for '${path}'`, - code: ErrorCode.FILESYSTEM_ERROR, - details: { - path, - operation: Operation.FILE_READ, - stderr: 'Command execution failed' - } satisfies FileSystemContext - } - }; - } - - if (mimeResult.data.exitCode !== 0) { - return { - success: false, - error: { - message: `Failed to detect MIME type for '${path}'`, - code: ErrorCode.FILESYSTEM_ERROR, - details: { - path, - operation: Operation.FILE_READ, - stderr: mimeResult.data.stderr - } satisfies FileSystemContext + return this.sessionManager + .withSession(sessionId, async (exec) => { + // Check if file exists + const existsResult = await exec(`test -e ${escapedPath}`); + if (existsResult.exitCode !== 0) { + throw { + code: ErrorCode.FILE_NOT_FOUND, + message: `File not found: ${path}`, + details: { + path, + operation: Operation.FILE_READ + } satisfies FileNotFoundContext + }; } - }; - } - const mimeType = mimeResult.data.stdout.trim(); - - // 5. Determine if file is binary based on MIME type - const isBinary = this.isBinaryMimeType(mimeType); + // Get file size using stat + const statCommand = `stat -c '%s' ${escapedPath} 2>/dev/null`; + const statResult = await exec(statCommand); - // 6. Read file with appropriate encoding - // Respect user's encoding preference if provided, otherwise use MIME-based detection - let actualEncoding: 'utf-8' | 'base64'; - if (options.encoding === 'base64') { - actualEncoding = 'base64'; - } else if (options.encoding === 'utf-8' || options.encoding === 'utf8') { - actualEncoding = 'utf-8'; - } else { - // No explicit encoding requested - use MIME-based detection (original behavior) - actualEncoding = isBinary ? 'base64' : 'utf-8'; - } - - let content: string; - if (actualEncoding === 'base64') { - // Binary files: read as base64, return as-is (DO NOT decode) - const base64Command = `base64 -w 0 < ${escapedPath}`; - const base64Result = await this.sessionManager.executeInSession( - sessionId, - base64Command - ); - - if (!base64Result.success) { - return { - success: false, - error: { - message: `Failed to read binary file '${path}': Command execution failed`, + if (statResult.exitCode !== 0) { + throw { code: ErrorCode.FILESYSTEM_ERROR, + message: `Failed to get file size for '${path}'`, details: { path, - operation: Operation.FILE_READ + operation: Operation.FILE_READ, + stderr: statResult.stderr } satisfies FileSystemContext - } - }; - } + }; + } - if (base64Result.data.exitCode !== 0) { - return { - success: false, - error: { - message: `Failed to read binary file '${path}': ${base64Result.data.stderr}`, + const fileSize = parseInt(statResult.stdout.trim(), 10); + + if (Number.isNaN(fileSize)) { + throw { code: ErrorCode.FILESYSTEM_ERROR, + message: `Failed to parse file size for '${path}': invalid stat output`, details: { path, operation: Operation.FILE_READ, - exitCode: base64Result.data.exitCode, - stderr: base64Result.data.stderr + stderr: `Unexpected stat output: ${statResult.stdout}` } satisfies FileSystemContext - } - }; - } + }; + } - content = base64Result.data.stdout.trim(); - } else { - // Text files: read normally - const catCommand = `cat ${escapedPath}`; - const catResult = await this.sessionManager.executeInSession( - sessionId, - catCommand - ); - - if (!catResult.success) { - return { - success: false, - error: { - message: `Failed to read text file '${path}': Command execution failed`, + // Detect MIME type using file command + const mimeCommand = `file --mime-type -b ${escapedPath}`; + const mimeResult = await exec(mimeCommand); + + if (mimeResult.exitCode !== 0) { + throw { code: ErrorCode.FILESYSTEM_ERROR, + message: `Failed to detect MIME type for '${path}'`, details: { path, - operation: Operation.FILE_READ + operation: Operation.FILE_READ, + stderr: mimeResult.stderr } satisfies FileSystemContext + }; + } + + const mimeType = mimeResult.stdout.trim(); + + // Determine if file is binary based on MIME type + const isBinary = this.isBinaryMimeType(mimeType); + + // Read file with appropriate encoding + // Respect user's encoding preference if provided, otherwise use MIME-based detection + let actualEncoding: 'utf-8' | 'base64'; + if (options.encoding === 'base64') { + actualEncoding = 'base64'; + } else if ( + options.encoding === 'utf-8' || + options.encoding === 'utf8' + ) { + actualEncoding = 'utf-8'; + } else { + // No explicit encoding requested - use MIME-based detection (original behavior) + actualEncoding = isBinary ? 'base64' : 'utf-8'; + } + + let content: string; + if (actualEncoding === 'base64') { + // Binary files: read as base64, return as-is (DO NOT decode) + const base64Command = `base64 -w 0 < ${escapedPath}`; + const base64Result = await exec(base64Command); + + if (base64Result.exitCode !== 0) { + throw { + code: ErrorCode.FILESYSTEM_ERROR, + message: `Failed to read binary file '${path}': ${base64Result.stderr}`, + details: { + path, + operation: Operation.FILE_READ, + exitCode: base64Result.exitCode, + stderr: base64Result.stderr + } satisfies FileSystemContext + }; } - }; - } - if (catResult.data.exitCode !== 0) { + content = base64Result.stdout.trim(); + } else { + // Text files: read normally + const catCommand = `cat ${escapedPath}`; + const catResult = await exec(catCommand); + + if (catResult.exitCode !== 0) { + throw { + code: ErrorCode.FILESYSTEM_ERROR, + message: `Failed to read text file '${path}': ${catResult.stderr}`, + details: { + path, + operation: Operation.FILE_READ, + exitCode: catResult.exitCode, + stderr: catResult.stderr + } satisfies FileSystemContext + }; + } + + content = catResult.stdout; + } + return { - success: false, - error: { - message: `Failed to read text file '${path}': ${catResult.data.stderr}`, - code: ErrorCode.FILESYSTEM_ERROR, - details: { - path, - operation: Operation.FILE_READ, - exitCode: catResult.data.exitCode, - stderr: catResult.data.stderr - } satisfies FileSystemContext + content, + metadata: { + encoding: actualEncoding, + isBinary: actualEncoding === 'base64', + mimeType, + size: fileSize } }; - } - - content = catResult.data.stdout; - } + }) + .then((result) => { + if (!result.success) { + return result as ServiceResult; + } - return { - success: true, - data: content, - metadata: { - encoding: actualEncoding, - isBinary: actualEncoding === 'base64', - mimeType, - size: fileSize - } - }; + return { + success: true, + data: result.data.content, + metadata: result.data.metadata + }; + }); } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; @@ -490,78 +413,55 @@ export class FileService implements FileSystemOperations { }; } - // 2. Check if file exists using session-aware check - const existsResult = await this.exists(path, sessionId); - if (!existsResult.success) { - return existsResult as ServiceResult; - } + // 2. Execute exists→isdir→rm sequence atomically within session + const escapedPath = shellEscape(path); - if (!existsResult.data) { - return { - success: false, - error: { - message: `File not found: ${path}`, + return this.sessionManager.withSession(sessionId, async (exec) => { + // Check if file exists + const existsResult = await exec(`test -e ${escapedPath}`); + if (existsResult.exitCode !== 0) { + throw { code: ErrorCode.FILE_NOT_FOUND, + message: `File not found: ${path}`, details: { path, operation: Operation.FILE_DELETE } satisfies FileNotFoundContext - } - }; - } + }; + } - // 3. Check if path is a directory (deleteFile only works on files) - const statResult = await this.stat(path, sessionId); - if (statResult.success && statResult.data.isDirectory) { - return { - success: false, - error: { - message: `Cannot delete directory with deleteFile() at '${path}'. Use exec('rm -rf ') instead.`, + // Check if path is a directory (deleteFile only works on files) + const isDirResult = await exec(`test -d ${escapedPath}`); + if (isDirResult.exitCode === 0) { + throw { code: ErrorCode.IS_DIRECTORY, + message: `Cannot delete directory with deleteFile() at '${path}'. Use exec('rm -rf ') instead.`, details: { path, operation: Operation.FILE_DELETE } satisfies FileSystemContext - } - }; - } - - // 4. Delete file using SessionManager with rm command - const escapedPath = shellEscape(path); - const command = `rm ${escapedPath}`; - - const execResult = await this.sessionManager.executeInSession( - sessionId, - command - ); - - if (!execResult.success) { - return execResult as ServiceResult; - } + }; + } - const result = execResult.data; + // Delete file using rm command + const command = `rm ${escapedPath}`; + const result = await exec(command); - if (result.exitCode !== 0) { - return { - success: false, - error: { + if (result.exitCode !== 0) { + throw { + code: ErrorCode.FILESYSTEM_ERROR, message: `Failed to delete file '${path}': ${ result.stderr || `exit code ${result.exitCode}` }`, - code: ErrorCode.FILESYSTEM_ERROR, details: { path, operation: Operation.FILE_DELETE, exitCode: result.exitCode, stderr: result.stderr } satisfies FileSystemContext - } - }; - } - - return { - success: true - }; + }; + } + }); } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; diff --git a/packages/sandbox-container/src/services/git-service.ts b/packages/sandbox-container/src/services/git-service.ts index 2360555f..25fbd499 100644 --- a/packages/sandbox-container/src/services/git-service.ts +++ b/packages/sandbox-container/src/services/git-service.ts @@ -104,84 +104,70 @@ export class GitService { ); const command = this.buildCommand(args); - // Execute git clone (via SessionManager) + // Execute clone→branch sequence atomically within session const sessionId = options.sessionId || 'default'; - const execResult = await this.sessionManager.executeInSession( - sessionId, - command - ); - - if (!execResult.success) { - return execResult as ServiceResult<{ path: string; branch: string }>; - } - - const result = execResult.data; - - if (result.exitCode !== 0) { - this.logger.error('Git clone failed', undefined, { - repoUrl, - targetDirectory, - exitCode: result.exitCode, - stderr: result.stderr - }); - - const errorCode = this.manager.determineErrorCode( - 'clone', - result.stderr || 'Unknown error', - result.exitCode - ); - return this.returnError({ - message: `Failed to clone repository '${repoUrl}': ${ - result.stderr || `exit code ${result.exitCode}` - }`, - code: errorCode, - details: { - repository: repoUrl, - targetDir: targetDirectory, - exitCode: result.exitCode, - stderr: result.stderr - } satisfies GitErrorContext - }); - } - // Determine the actual branch that was checked out by querying Git - // This ensures we always return the true current branch, whether it was - // explicitly specified or defaulted to the repository's HEAD - const branchArgs = this.manager.buildGetCurrentBranchArgs(); - const branchCommand = this.buildCommand(branchArgs); - const branchExecResult = await this.sessionManager.executeInSession( - sessionId, - branchCommand, - targetDirectory - ); + return this.sessionManager + .withSession(sessionId, async (exec) => { + // Execute git clone + const result = await exec(command); + + if (result.exitCode !== 0) { + this.logger.error('Git clone failed', undefined, { + repoUrl, + targetDirectory, + exitCode: result.exitCode, + stderr: result.stderr + }); + + const errorCode = this.manager.determineErrorCode( + 'clone', + result.stderr || 'Unknown error', + result.exitCode + ); + throw { + message: `Failed to clone repository '${repoUrl}': ${ + result.stderr || `exit code ${result.exitCode}` + }`, + code: errorCode, + details: { + repository: repoUrl, + targetDir: targetDirectory, + exitCode: result.exitCode, + stderr: result.stderr + } satisfies GitErrorContext + }; + } - if (!branchExecResult.success) { - // If we can't get the branch, use fallback but don't fail the entire operation - const actualBranch = options.branch || 'unknown'; + // Determine the actual branch that was checked out by querying Git + // This ensures we always return the true current branch, whether it was + // explicitly specified or defaulted to the repository's HEAD + const branchArgs = this.manager.buildGetCurrentBranchArgs(); + const branchCommand = this.buildCommand(branchArgs); + const branchResult = await exec(branchCommand, { + cwd: targetDirectory + }); + + let actualBranch: string; + if (branchResult.exitCode === 0 && branchResult.stdout.trim()) { + actualBranch = branchResult.stdout.trim(); + } else { + // Fallback: use the requested branch or 'unknown' + actualBranch = options.branch || 'unknown'; + } - return { - success: true, - data: { + return { path: targetDirectory, branch: actualBranch + }; + }) + .then((result) => { + if (!result.success) { + return result as ServiceResult<{ path: string; branch: string }>; } - }; - } - - const branchResult = branchExecResult.data; - let actualBranch: string; - if (branchResult.exitCode === 0 && branchResult.stdout.trim()) { - actualBranch = branchResult.stdout.trim(); - } else { - // Fallback: use the requested branch or 'unknown' - actualBranch = options.branch || 'unknown'; - } - - return this.returnSuccess({ - path: targetDirectory, - branch: actualBranch - }); + return this.returnSuccess(result.data); + }); } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; diff --git a/packages/sandbox-container/src/services/process-service.ts b/packages/sandbox-container/src/services/process-service.ts index b4123d5b..6aa4cac0 100644 --- a/packages/sandbox-container/src/services/process-service.ts +++ b/packages/sandbox-container/src/services/process-service.ts @@ -209,7 +209,8 @@ export class ProcessService { cwd: options.cwd, env: options.env }, - processRecordData.id // Pass process ID as commandId for tracking and killing + processRecordData.id, // Pass process ID as commandId for tracking and killing + { background: true } // Release lock after startup ); if (!streamResult.success) { diff --git a/packages/sandbox-container/src/services/session-manager.ts b/packages/sandbox-container/src/services/session-manager.ts index a03eb740..19c8a3a3 100644 --- a/packages/sandbox-container/src/services/session-manager.ts +++ b/packages/sandbox-container/src/services/session-manager.ts @@ -7,7 +7,12 @@ import type { InternalErrorContext } from '@repo/shared/errors'; import { ErrorCode } from '@repo/shared/errors'; -import type { ServiceResult } from '../core/types'; +import { Mutex } from 'async-mutex'; +import { + type ServiceResult, + serviceError, + serviceSuccess +} from '../core/types'; import { type RawExecResult, Session, type SessionOptions } from '../session'; /** @@ -16,9 +21,113 @@ import { type RawExecResult, Session, type SessionOptions } from '../session'; */ export class SessionManager { private sessions = new Map(); + /** Per-session mutexes to prevent concurrent command execution */ + private sessionLocks = new Map(); + /** Tracks in-progress session creation to prevent duplicate creation races */ + private creatingLocks = new Map>(); constructor(private logger: Logger) {} + /** + * Get or create a mutex for a specific session + */ + private getSessionLock(sessionId: string): Mutex { + let lock = this.sessionLocks.get(sessionId); + if (!lock) { + lock = new Mutex(); + this.sessionLocks.set(sessionId, lock); + } + return lock; + } + + /** + * Get or create a session with coordination to prevent race conditions. + * If multiple requests try to create the same session simultaneously, + * only one will create it and others will wait for that result. + * + * Uses a two-phase approach: + * 1. Check if session exists (fast path) + * 2. Use creatingLocks map to coordinate creation across callers + * + * IMPORTANT: All callers (executeInSession, withSession, etc.) acquire the + * session lock before calling this method. The lock ensures only one caller + * executes this method at a time for a given sessionId, making the + * creatingLocks check-and-set atomic. + */ + private async getOrCreateSession( + sessionId: string, + options: { cwd?: string; commandTimeoutMs?: number } = {} + ): Promise> { + // Fast path: session already exists + const existing = this.sessions.get(sessionId); + if (existing) { + return { success: true, data: existing }; + } + + // Check if another request is already creating this session + // Since we're called under the session lock, only one caller can reach here + // at a time for the same sessionId + const pendingCreate = this.creatingLocks.get(sessionId); + if (pendingCreate) { + try { + const session = await pendingCreate; + return { success: true, data: session }; + } catch (error) { + // Creation failed, will retry below + } + } + + // We need to create the session - set up coordination + // Since we hold the lock, we can safely set creatingLocks without race + const createPromise = (async (): Promise => { + const session = new Session({ + id: sessionId, + cwd: options.cwd || '/workspace', + commandTimeoutMs: options.commandTimeoutMs, + logger: this.logger + }); + await session.initialize(); + this.sessions.set(sessionId, session); + return session; + })(); + + this.creatingLocks.set(sessionId, createPromise); + + try { + const session = await createPromise; + return { success: true, data: session }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + 'Failed to create session', + error instanceof Error ? error : undefined, + { + sessionId, + originalError: errorMessage + } + ); + + return { + success: false, + error: { + message: `Failed to create session '${sessionId}': ${errorMessage}`, + code: ErrorCode.INTERNAL_ERROR, + details: { + sessionId, + originalError: errorMessage + } satisfies InternalErrorContext + } + }; + } finally { + this.creatingLocks.delete(sessionId); + // Clean up orphaned lock if session creation failed + if (!this.sessions.has(sessionId)) { + this.sessionLocks.delete(sessionId); + } + } + } + /** * Create a new persistent session */ @@ -109,7 +218,8 @@ export class SessionManager { } /** - * Execute a command in a session + * Execute a command in a session with per-session locking. + * Commands to the same session are serialized; different sessions run in parallel. */ async executeInSession( sessionId: string, @@ -118,72 +228,163 @@ export class SessionManager { timeoutMs?: number, env?: Record ): Promise> { - try { - // Get or create session on demand - let sessionResult = await this.getSession(sessionId); - - // If session doesn't exist, create it automatically - if ( - !sessionResult.success && - (sessionResult.error!.details as InternalErrorContext) - ?.originalError === 'Session not found' - ) { - sessionResult = await this.createSession({ - id: sessionId, + const lock = this.getSessionLock(sessionId); + + return lock.runExclusive(async () => { + try { + // Get or create session (coordinated) + const sessionResult = await this.getOrCreateSession(sessionId, { cwd: cwd || '/workspace', - commandTimeoutMs: timeoutMs // Pass timeout to session + commandTimeoutMs: timeoutMs }); - } - if (!sessionResult.success) { - return sessionResult as ServiceResult; + if (!sessionResult.success) { + return sessionResult as ServiceResult; + } + + const session = sessionResult.data; + + const result = await session.exec( + command, + cwd || env ? { cwd, env } : undefined + ); + + return { + success: true, + data: result + }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + 'Failed to execute command', + error instanceof Error ? error : undefined, + { + sessionId, + command + } + ); + + return { + success: false, + error: { + message: `Failed to execute command '${command}' in session '${sessionId}': ${errorMessage}`, + code: ErrorCode.COMMAND_EXECUTION_ERROR, + details: { + command, + stderr: errorMessage + } satisfies CommandErrorContext + } + }; } + }); + } - const session = sessionResult.data; + /** + * Execute multiple commands atomically within a session. + * The lock is held for the entire callback duration, preventing + * other operations from interleaving. + * + * WARNING: Do not call withSession or executeInSession recursively on the same + * session - it will deadlock. Cross-session calls are safe. + * + * @param sessionId - The session identifier + * @param fn - Callback that receives an exec function for running commands + * @param cwd - Optional working directory for session creation + * @returns The result of the callback wrapped in ServiceResult + */ + async withSession( + sessionId: string, + fn: ( + exec: ( + command: string, + options?: { cwd?: string; env?: Record } + ) => Promise + ) => Promise, + cwd?: string + ): Promise> { + const lock = this.getSessionLock(sessionId); + + return lock.runExclusive(async (): Promise> => { + try { + // Get or create session (coordinated) + const sessionResult = await this.getOrCreateSession(sessionId, { + cwd: cwd || '/workspace' + }); - const result = await session.exec( - command, - cwd || env ? { cwd, env } : undefined - ); + if (!sessionResult.success) { + return serviceError(sessionResult.error); + } - return { - success: true, - data: result - }; - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : 'Unknown error'; - this.logger.error( - 'Failed to execute command', - error instanceof Error ? error : undefined, - { - sessionId, - command + const session = sessionResult.data; + + // Provide exec function that uses the session directly (already under lock) + const exec = async ( + command: string, + options?: { cwd?: string; env?: Record } + ): Promise => { + return session.exec(command, options); + }; + + const result = await fn(exec); + + return serviceSuccess(result); + } catch (error) { + // Check if error is a ServiceError-like object (from service callbacks) + // Validates that code is a known ErrorCode to avoid catching unrelated objects + if ( + error && + typeof error === 'object' && + 'code' in error && + 'message' in error && + typeof (error as { code: unknown }).code === 'string' && + Object.values(ErrorCode).includes( + (error as { code: string }).code as ErrorCode + ) + ) { + const customError = error as { + message: string; + code: string; + details?: Record; + }; + return serviceError({ + message: customError.message, + code: customError.code, + details: customError.details + }); } - ); - return { - success: false, - error: { - message: `Failed to execute command '${command}' in session '${sessionId}': ${errorMessage}`, - code: ErrorCode.COMMAND_EXECUTION_ERROR, + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + 'withSession callback failed', + error instanceof Error ? error : undefined, + { sessionId } + ); + + return serviceError({ + message: `withSession callback failed for session '${sessionId}': ${errorMessage}`, + code: ErrorCode.INTERNAL_ERROR, details: { - command, - stderr: errorMessage - } satisfies CommandErrorContext - } - }; - } + sessionId, + originalError: errorMessage + } satisfies InternalErrorContext + }); + } + }); } /** - * Execute a command with streaming output + * Execute a command with streaming output. * * @param sessionId - The session identifier * @param command - The command to execute * @param onEvent - Callback for streaming events - * @param cwd - Optional working directory override + * @param options - Optional cwd and env overrides * @param commandId - Required command identifier for tracking and killing + * @param lockOptions - Lock behavior options + * @param lockOptions.background - If true, release lock after 'start' event (for startProcess). + * If false (default), hold lock until streaming completes (for exec --stream). * @returns A promise that resolves when first event is processed, with continueStreaming promise for background execution */ async executeStreamInSession( @@ -191,119 +392,249 @@ export class SessionManager { command: string, onEvent: (event: ExecEvent) => Promise, options: { cwd?: string; env?: Record } = {}, - commandId: string + commandId: string, + lockOptions: { background?: boolean } = {} ): Promise }>> { - try { - const { cwd, env } = options; - - // Get or create session on demand - let sessionResult = await this.getSession(sessionId); - - // If session doesn't exist, create it automatically - if ( - !sessionResult.success && - (sessionResult.error!.details as InternalErrorContext) - ?.originalError === 'Session not found' - ) { - sessionResult = await this.createSession({ - id: sessionId, + const { background = false } = lockOptions; + const lock = this.getSessionLock(sessionId); + + // For background mode: acquire lock, process start event, release lock, continue streaming + // For foreground mode: acquire lock, process all events, release lock + if (background) { + return this.executeStreamBackground( + sessionId, + command, + onEvent, + options, + commandId, + lock + ); + } else { + return this.executeStreamForeground( + sessionId, + command, + onEvent, + options, + commandId, + lock + ); + } + } + + /** + * Foreground streaming: hold lock until all events are processed + */ + private async executeStreamForeground( + sessionId: string, + command: string, + onEvent: (event: ExecEvent) => Promise, + options: { cwd?: string; env?: Record }, + commandId: string, + lock: Mutex + ): Promise }>> { + return lock.runExclusive(async () => { + try { + const { cwd, env } = options; + + const sessionResult = await this.getOrCreateSession(sessionId, { cwd: cwd || '/workspace' }); - } - if (!sessionResult.success) { - return sessionResult as ServiceResult<{ - continueStreaming: Promise; - }>; - } - - const session = sessionResult.data; + if (!sessionResult.success) { + return sessionResult as ServiceResult<{ + continueStreaming: Promise; + }>; + } - const generator = session.execStream(command, { commandId, cwd, env }); + const session = sessionResult.data; + const generator = session.execStream(command, { commandId, cwd, env }); - // Process 'start' event synchronously to capture PID before returning - // All other events stream in background via continueStreaming promise - // getLogs() awaits continueStreaming for completed processes to ensure - // all output is captured (deterministic, no timing heuristics) - const firstResult = await generator.next(); + // Process ALL events under lock + for await (const event of generator) { + await onEvent(event); + } - if (firstResult.done) { return { success: true, data: { continueStreaming: Promise.resolve() } }; - } - - await onEvent(firstResult.value); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + 'Failed to execute streaming command', + error instanceof Error ? error : undefined, + { + sessionId, + command + } + ); - // If already complete/error, drain remaining events synchronously - if ( - firstResult.value.type === 'complete' || - firstResult.value.type === 'error' - ) { - for await (const event of generator) { - await onEvent(event); - } return { - success: true, - data: { continueStreaming: Promise.resolve() } + success: false, + error: { + message: `Failed to execute streaming command '${command}' in session '${sessionId}': ${errorMessage}`, + code: ErrorCode.STREAM_START_ERROR, + details: { + command, + stderr: errorMessage + } satisfies CommandErrorContext + } }; } + }); + } - // Continue streaming remaining events in background - const continueStreaming = (async () => { - try { + /** + * Background streaming: hold lock only until 'start' event, then release. + * + * This mode is used for long-running background processes (like servers) + * where we want to: + * 1. Ensure the process starts successfully (verified by 'start' event) + * 2. Allow other commands to run while the background process continues + * + * IMPORTANT SAFETY NOTE: After lock release, session state (cwd, env vars) + * may change while the background process is running. This is intentional - + * background processes capture their environment at start time and are not + * affected by subsequent session state changes. The process runs in its own + * shell context independent of the session's interactive state. + * + * Use cases: + * - Starting web servers (python -m http.server, node server.js) + * - Starting background services + * - Any long-running process that should not block other operations + */ + private async executeStreamBackground( + sessionId: string, + command: string, + onEvent: (event: ExecEvent) => Promise, + options: { cwd?: string; env?: Record }, + commandId: string, + lock: Mutex + ): Promise }>> { + // Acquire lock for startup phase only + const startupResult = await lock.runExclusive(async () => { + try { + const { cwd, env } = options; + + const sessionResult = await this.getOrCreateSession(sessionId, { + cwd: cwd || '/workspace' + }); + + if (!sessionResult.success) { + return { success: false as const, error: sessionResult.error }; + } + + const session = sessionResult.data; + const generator = session.execStream(command, { commandId, cwd, env }); + + // Process 'start' event under lock + const firstResult = await generator.next(); + + if (firstResult.done) { + return { + success: true as const, + generator: null, + firstEvent: null + }; + } + + await onEvent(firstResult.value); + + // If already complete/error, drain remaining events under lock + if ( + firstResult.value.type === 'complete' || + firstResult.value.type === 'error' + ) { for await (const event of generator) { await onEvent(event); } - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : 'Unknown error'; - this.logger.error( - 'Error during streaming', - error instanceof Error ? error : undefined, - { - sessionId, - commandId, - originalError: errorMessage - } - ); - throw error; + return { + success: true as const, + generator: null, + firstEvent: null + }; } - })(); + // Return generator for background processing (lock will be released) + return { + success: true as const, + generator, + firstEvent: firstResult.value + }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + 'Failed to start streaming command', + error instanceof Error ? error : undefined, + { + sessionId, + command + } + ); + + return { + success: false as const, + error: { + message: `Failed to execute streaming command '${command}' in session '${sessionId}': ${errorMessage}`, + code: ErrorCode.STREAM_START_ERROR, + details: { + command, + stderr: errorMessage + } satisfies CommandErrorContext + } + }; + } + }); + + if (!startupResult.success) { return { - success: true, - data: { continueStreaming } + success: false, + error: startupResult.error! }; - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : 'Unknown error'; - this.logger.error( - 'Failed to execute streaming command', - error instanceof Error ? error : undefined, - { - sessionId, - command - } - ); + } + // If generator is null, everything completed during startup + if (!startupResult.generator) { return { - success: false, - error: { - message: `Failed to execute streaming command '${command}' in session '${sessionId}': ${errorMessage}`, - code: ErrorCode.STREAM_START_ERROR, - details: { - command, - stderr: errorMessage - } satisfies CommandErrorContext - } + success: true, + data: { continueStreaming: Promise.resolve() } }; } + + // Continue streaming remaining events WITHOUT lock + const continueStreaming = (async () => { + try { + for await (const event of startupResult.generator!) { + await onEvent(event); + } + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + 'Error during background streaming', + error instanceof Error ? error : undefined, + { + sessionId, + commandId, + originalError: errorMessage + } + ); + throw error; + } + })(); + + return { + success: true, + data: { continueStreaming } + }; } /** - * Kill a running command in a session + * Kill a running command in a session. + * Does not acquire session lock - kill signals must work immediately, + * even while another command is queued or running. */ async killCommand( sessionId: string, @@ -363,82 +694,34 @@ export class SessionManager { } /** - * Set environment variables on a session + * Set environment variables on a session atomically. + * All exports are executed under a single lock acquisition. */ async setEnvVars( sessionId: string, envVars: Record ): Promise> { - try { - // Get or create session on demand - let sessionResult = await this.getSession(sessionId); - - // If session doesn't exist, create it automatically - if ( - !sessionResult.success && - (sessionResult.error!.details as InternalErrorContext) - ?.originalError === 'Session not found' - ) { - sessionResult = await this.createSession({ - id: sessionId, - cwd: '/workspace' - }); - } - - if (!sessionResult.success) { - return sessionResult as ServiceResult; - } - - const session = sessionResult.data; - - // Export each environment variable in the running bash session + return this.withSession(sessionId, async (exec) => { for (const [key, value] of Object.entries(envVars)) { // Escape the value for safe bash usage const escapedValue = value.replace(/'/g, "'\\''"); const exportCommand = `export ${key}='${escapedValue}'`; - const result = await session.exec(exportCommand); + const result = await exec(exportCommand); if (result.exitCode !== 0) { - return { - success: false, - error: { - message: `Failed to set environment variable '${key}' in session '${sessionId}': ${result.stderr}`, - code: ErrorCode.COMMAND_EXECUTION_ERROR, - details: { - command: `export ${key}='...'`, - exitCode: result.exitCode, - stderr: result.stderr - } satisfies CommandErrorContext - } + throw { + code: ErrorCode.COMMAND_EXECUTION_ERROR, + message: `Failed to set environment variable '${key}': ${result.stderr}`, + details: { + command: exportCommand, + exitCode: result.exitCode, + stderr: result.stderr + } satisfies CommandErrorContext }; } } - - return { - success: true - }; - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : 'Unknown error'; - this.logger.error( - 'Failed to set environment variables', - error instanceof Error ? error : undefined, - { sessionId } - ); - - return { - success: false, - error: { - message: `Failed to set environment variables in session '${sessionId}': ${errorMessage}`, - code: ErrorCode.COMMAND_EXECUTION_ERROR, - details: { - command: 'export', - stderr: errorMessage - } satisfies CommandErrorContext - } - }; - } + }); } /** @@ -464,6 +747,8 @@ export class SessionManager { await session.destroy(); this.sessions.delete(sessionId); + this.sessionLocks.delete(sessionId); + this.creatingLocks.delete(sessionId); return { success: true @@ -544,5 +829,7 @@ export class SessionManager { } this.sessions.clear(); + this.sessionLocks.clear(); + this.creatingLocks.clear(); } } diff --git a/packages/sandbox-container/tests/managers/git-manager.test.ts b/packages/sandbox-container/tests/managers/git-manager.test.ts index b0c3c807..238069e9 100644 --- a/packages/sandbox-container/tests/managers/git-manager.test.ts +++ b/packages/sandbox-container/tests/managers/git-manager.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it } from 'bun:test'; +import { ErrorCode } from '@repo/shared/errors'; import { GitManager } from '@sandbox-container/managers/git-manager'; describe('GitManager', () => { @@ -170,77 +171,77 @@ describe('GitManager', () => { }); describe('determineErrorCode', () => { - it('should return NOT_A_GIT_REPO for exit code 128 with not a git repository message', () => { + it('should return GIT_OPERATION_FAILED for exit code 128 with not a git repository message', () => { const error = new Error('fatal: not a git repository'); expect(manager.determineErrorCode('getCurrentBranch', error, 128)).toBe( - 'NOT_A_GIT_REPO' + ErrorCode.GIT_OPERATION_FAILED ); }); - it('should return REPO_NOT_FOUND for exit code 128 with repository not found message', () => { + it('should return GIT_REPOSITORY_NOT_FOUND for exit code 128 with repository not found message', () => { const error = new Error('fatal: repository not found'); expect(manager.determineErrorCode('clone', error, 128)).toBe( - 'REPO_NOT_FOUND' + ErrorCode.GIT_REPOSITORY_NOT_FOUND ); }); - it('should return GIT_PERMISSION_DENIED for permission errors', () => { + it('should return GIT_AUTH_FAILED for permission errors', () => { expect( manager.determineErrorCode('clone', new Error('Permission denied')) - ).toBe('GIT_PERMISSION_DENIED'); + ).toBe(ErrorCode.GIT_AUTH_FAILED); }); - it('should return GIT_NOT_FOUND for not found errors', () => { + it('should return GIT_REPOSITORY_NOT_FOUND for not found errors', () => { expect( manager.determineErrorCode('checkout', new Error('Branch not found')) - ).toBe('GIT_NOT_FOUND'); + ).toBe(ErrorCode.GIT_REPOSITORY_NOT_FOUND); }); - it('should return GIT_INVALID_REF for pathspec errors', () => { + it('should return GIT_BRANCH_NOT_FOUND for pathspec errors', () => { expect( manager.determineErrorCode( 'checkout', new Error("pathspec 'branch' did not match") ) - ).toBe('GIT_INVALID_REF'); + ).toBe(ErrorCode.GIT_BRANCH_NOT_FOUND); }); it('should return GIT_AUTH_FAILED for authentication errors', () => { expect( manager.determineErrorCode('clone', new Error('Authentication failed')) - ).toBe('GIT_AUTH_FAILED'); + ).toBe(ErrorCode.GIT_AUTH_FAILED); }); it('should return operation-specific error codes as fallback', () => { expect( manager.determineErrorCode('clone', new Error('Unknown error')) - ).toBe('GIT_CLONE_FAILED'); + ).toBe(ErrorCode.GIT_CLONE_FAILED); expect( manager.determineErrorCode('checkout', new Error('Unknown error')) - ).toBe('GIT_CHECKOUT_FAILED'); + ).toBe(ErrorCode.GIT_CHECKOUT_FAILED); expect( manager.determineErrorCode( 'getCurrentBranch', new Error('Unknown error') ) - ).toBe('GIT_BRANCH_ERROR'); + ).toBe(ErrorCode.GIT_OPERATION_FAILED); expect( manager.determineErrorCode('listBranches', new Error('Unknown error')) - ).toBe('GIT_BRANCH_LIST_ERROR'); + ).toBe(ErrorCode.GIT_OPERATION_FAILED); }); it('should handle string errors', () => { expect(manager.determineErrorCode('clone', 'repository not found')).toBe( - 'GIT_NOT_FOUND' + ErrorCode.GIT_REPOSITORY_NOT_FOUND ); }); it('should handle case-insensitive error matching', () => { expect( manager.determineErrorCode('clone', new Error('PERMISSION DENIED')) - ).toBe('GIT_PERMISSION_DENIED'); + ).toBe(ErrorCode.GIT_AUTH_FAILED); }); }); diff --git a/packages/sandbox-container/tests/services/file-service.test.ts b/packages/sandbox-container/tests/services/file-service.test.ts index d281d5c8..80f81af5 100644 --- a/packages/sandbox-container/tests/services/file-service.test.ts +++ b/packages/sandbox-container/tests/services/file-service.test.ts @@ -34,7 +34,8 @@ const mockSessionManager = { createSession: vi.fn(), deleteSession: vi.fn(), listSessions: vi.fn(), - destroy: vi.fn() + destroy: vi.fn(), + withSession: vi.fn() } as unknown as SessionManager; describe('FileService', () => { @@ -50,6 +51,41 @@ describe('FileService', () => { errors: [] }); + // Mock withSession to execute the callback immediately with a mock exec function + mocked(mockSessionManager.withSession).mockImplementation( + async (_sessionId, callback) => { + try { + const mockExec = async (cmd: string) => { + // Delegate to executeInSession mock for compatibility with existing tests + const result = await mockSessionManager.executeInSession( + _sessionId, + cmd + ); + if (result.success) { + return result.data; + } + throw new Error('Command execution failed'); + }; + const data = await callback(mockExec); + return { success: true, data } as any; + } catch (error: any) { + // If error has code/message/details, return it as-is + if (error && typeof error === 'object' && 'code' in error) { + return { success: false, error } as any; + } + // Otherwise wrap as generic error + return { + success: false, + error: { + code: 'INTERNAL_ERROR', + message: error instanceof Error ? error.message : 'Unknown error', + details: {} + } + } as any; + } + } + ); + // Create service with mocked SessionManager fileService = new FileService( mockSecurityService, @@ -644,28 +680,18 @@ describe('FileService', () => { // 1. exists() - 1 call // 2. stat() which internally calls exists() again + stat command - 2 calls // 3. rm command - 1 call - // Total: 4 calls - - // Mock first exists check (from delete) - mocked(mockSessionManager.executeInSession).mockResolvedValueOnce({ - success: true, - data: { exitCode: 0, stdout: '', stderr: '' } - } as ServiceResult); + // Total: 3 calls (exists, isdir, delete) - // Mock second exists check (from stat) + // Mock exists check (test -e returns 0 = file exists) mocked(mockSessionManager.executeInSession).mockResolvedValueOnce({ success: true, data: { exitCode: 0, stdout: '', stderr: '' } } as ServiceResult); - // Mock stat command (to verify it's not a directory) + // Mock isdir check (test -d returns non-zero = not a directory) mocked(mockSessionManager.executeInSession).mockResolvedValueOnce({ success: true, - data: { - exitCode: 0, - stdout: 'regular file:100:1234567890:1234567890\n', - stderr: '' - } + data: { exitCode: 1, stdout: '', stderr: '' } } as ServiceResult); // Mock delete command @@ -678,18 +704,17 @@ describe('FileService', () => { expect(result.success).toBe(true); - // Verify rm command was called (cwd is undefined, so only 2 params) - // Should be the 4th call + // Verify rm command was called expect(mockSessionManager.executeInSession).toHaveBeenNthCalledWith( - 4, + 3, 'session-123', "rm '/tmp/test.txt'" ); }); it('should return error when file does not exist', async () => { - // Mock exists check returning false - mocked(mockSessionManager.executeInSession).mockResolvedValue({ + // Mock exists check returning false (exitCode 1 = file doesn't exist) + mocked(mockSessionManager.executeInSession).mockResolvedValueOnce({ success: true, data: { exitCode: 1, stdout: '', stderr: '' } } as ServiceResult); @@ -703,20 +728,16 @@ describe('FileService', () => { }); it('should handle delete command failures', async () => { - // Mock exists check + // Mock exists check (file exists) mocked(mockSessionManager.executeInSession).mockResolvedValueOnce({ success: true, data: { exitCode: 0, stdout: '', stderr: '' } } as ServiceResult); - // Mock stat check + // Mock isdir check (not a directory) mocked(mockSessionManager.executeInSession).mockResolvedValueOnce({ success: true, - data: { - exitCode: 0, - stdout: 'regular file:100:1234567890:1234567890\n', - stderr: '' - } + data: { exitCode: 1, stdout: '', stderr: '' } } as ServiceResult); // Mock delete command failure diff --git a/packages/sandbox-container/tests/services/git-service.test.ts b/packages/sandbox-container/tests/services/git-service.test.ts index 103d98c3..5b073dfa 100644 --- a/packages/sandbox-container/tests/services/git-service.test.ts +++ b/packages/sandbox-container/tests/services/git-service.test.ts @@ -1,6 +1,6 @@ import { beforeEach, describe, expect, it, vi } from 'bun:test'; import type { Logger } from '@repo/shared'; -import type { ValidationFailedContext } from '@repo/shared/errors'; +import { ErrorCode, type ValidationFailedContext } from '@repo/shared/errors'; import type { CloneOptions, ServiceResult @@ -38,7 +38,8 @@ const mockSessionManager = { createSession: vi.fn(), deleteSession: vi.fn(), listSessions: vi.fn(), - destroy: vi.fn() + destroy: vi.fn(), + withSession: vi.fn() } as unknown as SessionManager; describe('GitService', () => { @@ -58,6 +59,50 @@ describe('GitService', () => { errors: [] }); + // Mock withSession to execute the callback immediately with a mock exec function + mocked(mockSessionManager.withSession).mockImplementation( + async (_sessionId, callback) => { + try { + const mockExec = async ( + cmd: string, + options?: { cwd?: string; env?: Record } + ) => { + // Delegate to executeInSession mock for compatibility with existing tests + // Only pass cwd if it's defined to match test expectations + const result = + options?.cwd !== undefined + ? await mockSessionManager.executeInSession( + _sessionId, + cmd, + options.cwd + ) + : await mockSessionManager.executeInSession(_sessionId, cmd); + if (result.success) { + return result.data; + } + // If executeInSession returned an error, throw it to propagate to withSession + throw result.error; + }; + const data = await callback(mockExec); + return { success: true, data } as any; + } catch (error: any) { + // If error has code/message/details, return it as-is + if (error && typeof error === 'object' && 'code' in error) { + return { success: false, error } as any; + } + // Otherwise wrap as generic error + return { + success: false, + error: { + code: 'INTERNAL_ERROR', + message: error instanceof Error ? error.message : 'Unknown error', + details: {} + } + } as any; + } + } + ); + gitService = new GitService( mockSecurityService, mockLogger, @@ -230,7 +275,7 @@ describe('GitService', () => { expect(result.success).toBe(false); if (!result.success) { - expect(result.error.code).toBe('REPO_NOT_FOUND'); + expect(result.error.code).toBe(ErrorCode.GIT_REPOSITORY_NOT_FOUND); expect(result.error.details?.exitCode).toBe(128); expect(result.error.details?.stderr).toContain('repository not found'); } @@ -312,7 +357,7 @@ describe('GitService', () => { expect(result.success).toBe(false); if (!result.success) { - expect(result.error.code).toBe('GIT_INVALID_REF'); + expect(result.error.code).toBe(ErrorCode.GIT_BRANCH_NOT_FOUND); expect(result.error.details?.stderr).toContain('did not match'); } }); @@ -403,7 +448,7 @@ describe('GitService', () => { expect(result.success).toBe(false); if (!result.success) { - expect(result.error.code).toBe('NOT_A_GIT_REPO'); + expect(result.error.code).toBe(ErrorCode.GIT_OPERATION_FAILED); expect(result.error.details?.exitCode).toBe(128); } }); diff --git a/packages/sandbox-container/tests/services/process-service.test.ts b/packages/sandbox-container/tests/services/process-service.test.ts index 17a2881f..7a822c13 100644 --- a/packages/sandbox-container/tests/services/process-service.test.ts +++ b/packages/sandbox-container/tests/services/process-service.test.ts @@ -182,7 +182,8 @@ describe('ProcessService', () => { 'sleep 10', expect.any(Function), // event handler callback expect.objectContaining({ cwd: '/tmp' }), - expect.any(String) // commandId (generated dynamically) + expect.any(String), // commandId (generated dynamically) + { background: true } // Release lock after startup ); // Verify process was stored diff --git a/packages/sandbox-container/tests/services/session-manager-locking.test.ts b/packages/sandbox-container/tests/services/session-manager-locking.test.ts new file mode 100644 index 00000000..0c9c8b14 --- /dev/null +++ b/packages/sandbox-container/tests/services/session-manager-locking.test.ts @@ -0,0 +1,196 @@ +/** + * Session Manager Locking Tests + * Tests for per-session mutex to prevent concurrent command execution + */ + +import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; +import { mkdir, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { createNoOpLogger } from '@repo/shared'; +import { SessionManager } from '../../src/services/session-manager'; + +describe('SessionManager Locking', () => { + let sessionManager: SessionManager; + let testDir: string; + + beforeEach(async () => { + testDir = join(tmpdir(), `session-lock-test-${Date.now()}`); + await mkdir(testDir, { recursive: true }); + sessionManager = new SessionManager(createNoOpLogger()); + }); + + afterEach(async () => { + await sessionManager.destroy(); + await rm(testDir, { recursive: true, force: true }).catch(() => {}); + }); + + describe('concurrent command serialization', () => { + it('should serialize concurrent commands to the same session', async () => { + const sessionId = 'test-session'; + + // Two commands that would interleave without locking + const cmd1 = sessionManager.executeInSession( + sessionId, + 'echo "START-1"; sleep 0.05; echo "END-1"', + testDir + ); + + const cmd2 = sessionManager.executeInSession( + sessionId, + 'echo "START-2"; sleep 0.05; echo "END-2"', + testDir + ); + + const [result1, result2] = await Promise.all([cmd1, cmd2]); + + expect(result1.success).toBe(true); + expect(result2.success).toBe(true); + + // With locking, each command's output should be complete (not interleaved) + if (result1.success && result2.success) { + expect(result1.data.stdout).toContain('START-1'); + expect(result1.data.stdout).toContain('END-1'); + expect(result2.data.stdout).toContain('START-2'); + expect(result2.data.stdout).toContain('END-2'); + } + }); + }); + + describe('session creation coordination', () => { + it('should not create duplicate sessions under concurrent requests', async () => { + const sessionId = 'concurrent-create-session'; + + // Fire multiple concurrent requests that all try to create the same session + const requests = Array(5) + .fill(null) + .map(() => + sessionManager.executeInSession(sessionId, 'echo "created"', testDir) + ); + + const results = await Promise.all(requests); + + // All should succeed + for (const result of results) { + expect(result.success).toBe(true); + } + + // Only one session should exist + const listResult = await sessionManager.listSessions(); + expect(listResult.success).toBe(true); + if (listResult.success) { + const matchingSessions = listResult.data.filter( + (id) => id === sessionId + ); + expect(matchingSessions.length).toBe(1); + } + }); + }); + + describe('withSession atomic operations', () => { + it('should execute multiple commands atomically', async () => { + const sessionId = 'atomic-session'; + const executionLog: string[] = []; + + // Operation 1: Atomic multi-command sequence + const op1 = sessionManager.withSession( + sessionId, + async (exec) => { + executionLog.push('op1-start'); + await exec('echo "op1-cmd1"'); + await new Promise((r) => setTimeout(r, 50)); + await exec('echo "op1-cmd2"'); + executionLog.push('op1-end'); + return 'op1-result'; + }, + testDir + ); + + // Operation 2: Tries to interleave + const op2 = sessionManager.withSession( + sessionId, + async (exec) => { + executionLog.push('op2-start'); + await exec('echo "op2-cmd1"'); + executionLog.push('op2-end'); + return 'op2-result'; + }, + testDir + ); + + const [result1, result2] = await Promise.all([op1, op2]); + + expect(result1.success).toBe(true); + expect(result2.success).toBe(true); + + // With atomic locking, one operation must fully complete before the other starts + const op1StartIdx = executionLog.indexOf('op1-start'); + const op1EndIdx = executionLog.indexOf('op1-end'); + const op2StartIdx = executionLog.indexOf('op2-start'); + const op2EndIdx = executionLog.indexOf('op2-end'); + + const op1BeforeOp2 = op1EndIdx < op2StartIdx; + const op2BeforeOp1 = op2EndIdx < op1StartIdx; + expect(op1BeforeOp2 || op2BeforeOp1).toBe(true); + }); + }); + + describe('streaming execution locking', () => { + it('should hold lock during foreground streaming until complete', async () => { + const sessionId = 'stream-fg-session'; + + // Start foreground streaming command (holds lock) + const streamPromise = sessionManager.executeStreamInSession( + sessionId, + 'echo "stream-start"; sleep 0.1; echo "stream-end"', + async () => {}, + { cwd: testDir }, + 'cmd-1', + { background: false } + ); + + // Give streaming a moment to start + await new Promise((r) => setTimeout(r, 20)); + + // Try to run another command - should wait for stream to complete + const execPromise = sessionManager.executeInSession( + sessionId, + 'echo "exec-done"', + testDir + ); + + const [streamResult, execResult] = await Promise.all([ + streamPromise, + execPromise + ]); + + expect(streamResult.success).toBe(true); + expect(execResult.success).toBe(true); + }); + + it('should release lock early for background streaming', async () => { + const sessionId = 'stream-bg-session'; + + // Start background streaming command (releases lock after start event) + const streamResult = await sessionManager.executeStreamInSession( + sessionId, + 'sleep 0.5; echo "bg-done"', + async () => {}, + { cwd: testDir }, + 'cmd-bg', + { background: true } + ); + + expect(streamResult.success).toBe(true); + + // Should be able to run another command immediately (not blocked by 500ms sleep) + const execResult = await sessionManager.executeInSession( + sessionId, + 'echo "exec-fast"', + testDir + ); + + expect(execResult.success).toBe(true); + }); + }); +}); diff --git a/tests/e2e/session-state-isolation-workflow.test.ts b/tests/e2e/session-state-isolation-workflow.test.ts index 42272868..b4927f1f 100644 --- a/tests/e2e/session-state-isolation-workflow.test.ts +++ b/tests/e2e/session-state-isolation-workflow.test.ts @@ -609,6 +609,40 @@ describe('Session State Isolation Workflow', () => { expect(exec2Data.stdout.trim()).toBe('Completed in session2'); }, 90000); + test('should serialize concurrent requests to the same session', async () => { + // Fire multiple concurrent requests to the SAME session + // Without proper locking, outputs would interleave + const requests = Array(3) + .fill(null) + .map((_, i) => + fetch(`${workerUrl}/api/execute`, { + method: 'POST', + headers: baseHeaders, + body: JSON.stringify({ + command: `echo "START-${i}"; sleep 0.1; echo "END-${i}"` + }) + }).then((res) => res.json() as Promise) + ); + + const results = await Promise.all(requests); + + // All should succeed + for (const result of results) { + expect(result.exitCode).toBe(0); + } + + // Each result should have its own complete START/END pair (not interleaved) + for (const result of results) { + const stdout = result.stdout; + const startMatch = stdout.match(/START-(\d)/); + expect(startMatch).toBeTruthy(); + if (startMatch) { + const cmdNum = startMatch[1]; + expect(stdout).toContain(`END-${cmdNum}`); + } + } + }, 90000); + test('should properly cleanup session resources with deleteSession', async () => { // Create a session with custom environment variable const sessionResponse = await fetch(`${workerUrl}/api/session/create`, {