Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 32 additions & 34 deletions apps/sim/app/api/jobs/[jobId]/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { runs } from '@trigger.dev/sdk'
import { type NextRequest, NextResponse } from 'next/server'
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
import { getSession } from '@/lib/auth'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { createLogger } from '@/lib/logs/console/logger'
import { createErrorResponse } from '@/app/api/workflows/utils'
Expand All @@ -18,38 +17,44 @@ export async function GET(
try {
logger.debug(`[${requestId}] Getting status for task: ${taskId}`)

// Try session auth first (for web UI)
const session = await getSession()
let authenticatedUserId: string | null = session?.user?.id || null

if (!authenticatedUserId) {
const apiKeyHeader = request.headers.get('x-api-key')
if (apiKeyHeader) {
const authResult = await authenticateApiKeyFromHeader(apiKeyHeader)
if (authResult.success && authResult.userId) {
authenticatedUserId = authResult.userId
if (authResult.keyId) {
await updateApiKeyLastUsed(authResult.keyId).catch((error) => {
logger.warn(`[${requestId}] Failed to update API key last used timestamp:`, {
keyId: authResult.keyId,
error,
})
})
}
}
}
const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
logger.warn(`[${requestId}] Unauthorized task status request`)
return createErrorResponse(authResult.error || 'Authentication required', 401)
}

if (!authenticatedUserId) {
return createErrorResponse('Authentication required', 401)
}
const authenticatedUserId = authResult.userId

// Fetch task status from Trigger.dev
const run = await runs.retrieve(taskId)

logger.debug(`[${requestId}] Task ${taskId} status: ${run.status}`)

// Map Trigger.dev status to our format
const payload = run.payload as any
if (payload?.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket-server/middleware/permissions')
const accessCheck = await verifyWorkflowAccess(authenticatedUserId, payload.workflowId)
if (!accessCheck.hasAccess) {
logger.warn(`[${requestId}] User ${authenticatedUserId} denied access to task ${taskId}`, {
workflowId: payload.workflowId,
})
return createErrorResponse('Access denied', 403)
}
logger.debug(`[${requestId}] User ${authenticatedUserId} has access to task ${taskId}`)
} else {
if (payload?.userId && payload.userId !== authenticatedUserId) {
logger.warn(
`[${requestId}] User ${authenticatedUserId} attempted to access task ${taskId} owned by ${payload.userId}`
)
return createErrorResponse('Access denied', 403)
}
if (!payload?.userId) {
logger.warn(
`[${requestId}] Task ${taskId} has no ownership information in payload. Denying access for security.`
)
return createErrorResponse('Access denied', 403)
}
}

const statusMap = {
QUEUED: 'queued',
WAITING_FOR_DEPLOY: 'queued',
Expand All @@ -67,7 +72,6 @@ export async function GET(

const mappedStatus = statusMap[run.status as keyof typeof statusMap] || 'unknown'

// Build response based on status
const response: any = {
success: true,
taskId,
Expand All @@ -77,21 +81,18 @@ export async function GET(
},
}

// Add completion details if finished
if (mappedStatus === 'completed') {
response.output = run.output // This contains the workflow execution results
response.metadata.completedAt = run.finishedAt
response.metadata.duration = run.durationMs
}

// Add error details if failed
if (mappedStatus === 'failed') {
response.error = run.error
response.metadata.completedAt = run.finishedAt
response.metadata.duration = run.durationMs
}

// Add progress info if still processing
if (mappedStatus === 'processing' || mappedStatus === 'queued') {
response.estimatedDuration = 180000 // 3 minutes max from our config
}
Expand All @@ -107,6 +108,3 @@ export async function GET(
return createErrorResponse('Failed to fetch task status', 500)
}
}

// TODO: Implement task cancellation via Trigger.dev API if needed
// export async function DELETE() { ... }
61 changes: 51 additions & 10 deletions apps/sim/app/api/logs/execution/[executionId]/route.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,80 @@
import { db } from '@sim/db'
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import {
permissions,
workflow,
workflowExecutionLogs,
workflowExecutionSnapshots,
} from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { createLogger } from '@/lib/logs/console/logger'

const logger = createLogger('LogsByExecutionIdAPI')

export async function GET(
_request: NextRequest,
request: NextRequest,
{ params }: { params: Promise<{ executionId: string }> }
) {
const requestId = generateRequestId()

try {
const { executionId } = await params

logger.debug(`Fetching execution data for: ${executionId}`)
const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
logger.warn(`[${requestId}] Unauthorized execution data access attempt for: ${executionId}`)
return NextResponse.json(
{ error: authResult.error || 'Authentication required' },
{ status: 401 }
)
}

const authenticatedUserId = authResult.userId

logger.debug(
`[${requestId}] Fetching execution data for: ${executionId} (auth: ${authResult.authType})`
)

// Get the workflow execution log to find the snapshot
const [workflowLog] = await db
.select()
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, authenticatedUserId)
)
)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)

if (!workflowLog) {
logger.warn(`[${requestId}] Execution not found or access denied: ${executionId}`)
return NextResponse.json({ error: 'Workflow execution not found' }, { status: 404 })
}

// Get the workflow state snapshot
const [snapshot] = await db
.select()
.from(workflowExecutionSnapshots)
.where(eq(workflowExecutionSnapshots.id, workflowLog.stateSnapshotId))
.limit(1)

if (!snapshot) {
logger.warn(`[${requestId}] Workflow state snapshot not found for execution: ${executionId}`)
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
}

Expand All @@ -50,14 +91,14 @@ export async function GET(
},
}

logger.debug(`Successfully fetched execution data for: ${executionId}`)
logger.debug(`[${requestId}] Successfully fetched execution data for: ${executionId}`)
logger.debug(
`Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks`
`[${requestId}] Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks`
)

return NextResponse.json(response)
} catch (error) {
logger.error('Error fetching execution data:', error)
logger.error(`[${requestId}] Error fetching execution data:`, error)
return NextResponse.json({ error: 'Failed to fetch execution data' }, { status: 500 })
}
}
76 changes: 76 additions & 0 deletions apps/sim/app/api/memory/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { memory, workflowBlocks } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { createLogger } from '@/lib/logs/console/logger'
import { getWorkflowAccessContext } from '@/lib/workflows/utils'

const logger = createLogger('MemoryByIdAPI')

Expand Down Expand Up @@ -65,6 +67,65 @@ const memoryPutBodySchema = z.object({
workflowId: z.string().uuid('Invalid workflow ID format'),
})

/**
* Validates authentication and workflow access for memory operations
* @param request - The incoming request
* @param workflowId - The workflow ID to check access for
* @param requestId - Request ID for logging
* @param action - 'read' for GET, 'write' for PUT/DELETE
* @returns Object with userId if successful, or error response if failed
*/
async function validateMemoryAccess(
request: NextRequest,
workflowId: string,
requestId: string,
action: 'read' | 'write'
): Promise<{ userId: string } | { error: NextResponse }> {
const authResult = await checkHybridAuth(request, {
requireWorkflowId: false,
})
if (!authResult.success || !authResult.userId) {
logger.warn(`[${requestId}] Unauthorized memory ${action} attempt`)
return {
error: NextResponse.json(
{ success: false, error: { message: 'Authentication required' } },
{ status: 401 }
),
}
}

const accessContext = await getWorkflowAccessContext(workflowId, authResult.userId)
if (!accessContext) {
logger.warn(`[${requestId}] Workflow ${workflowId} not found`)
return {
error: NextResponse.json(
{ success: false, error: { message: 'Workflow not found' } },
{ status: 404 }
),
}
}

const { isOwner, workspacePermission } = accessContext
const hasAccess =
action === 'read'
? isOwner || workspacePermission !== null
: isOwner || workspacePermission === 'write' || workspacePermission === 'admin'

if (!hasAccess) {
logger.warn(
`[${requestId}] User ${authResult.userId} denied ${action} access to workflow ${workflowId}`
)
return {
error: NextResponse.json(
{ success: false, error: { message: 'Access denied' } },
{ status: 403 }
),
}
}

return { userId: authResult.userId }
}

export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'

Expand Down Expand Up @@ -101,6 +162,11 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{

const { workflowId: validatedWorkflowId } = validation.data

const accessCheck = await validateMemoryAccess(request, validatedWorkflowId, requestId, 'read')
if ('error' in accessCheck) {
return accessCheck.error
}

const memories = await db
.select()
.from(memory)
Expand Down Expand Up @@ -203,6 +269,11 @@ export async function DELETE(

const { workflowId: validatedWorkflowId } = validation.data

const accessCheck = await validateMemoryAccess(request, validatedWorkflowId, requestId, 'write')
if ('error' in accessCheck) {
return accessCheck.error
}

const existingMemory = await db
.select({ id: memory.id })
.from(memory)
Expand Down Expand Up @@ -296,6 +367,11 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
)
}

const accessCheck = await validateMemoryAccess(request, validatedWorkflowId, requestId, 'write')
if ('error' in accessCheck) {
return accessCheck.error
}

const existingMemories = await db
.select()
.from(memory)
Expand Down
Loading
Loading