Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions apps/sim/app/api/workflows/[id]/force-sync/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import crypto from 'crypto'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { workflowStateApiSchema } from '@/lib/workflows/validation'
import { db } from '@/db'
import { workflow } from '@/db/schema'

const logger = createLogger('ForceSync')

/**
* POST /api/workflows/[id]/force-sync
* Force sync local workflow state to database immediately
* Used during socket reconnection to ensure local changes are persisted
*/
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id: workflowId } = await params

try {
logger.info(`[${requestId}] Force sync request for workflow ${workflowId}`)

// Get session
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized force sync attempt`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}

const userId = session.user.id

// Get workflow and verify access (inline implementation)
const workflowData = await db
.select({
userId: workflow.userId,
workspaceId: workflow.workspaceId,
name: workflow.name,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

if (!workflowData.length) {
logger.warn(`[${requestId}] Workflow ${workflowId} not found`)
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}

const workflowRecord = workflowData[0]
let hasAccess = false

// Check if user owns the workflow
if (workflowRecord.userId === userId) {
hasAccess = true
}

// Check workspace membership if workflow belongs to a workspace
if (!hasAccess && workflowRecord.workspaceId) {
const userPermission = await getUserEntityPermissions(
userId,
'workspace',
workflowRecord.workspaceId
)
hasAccess = userPermission !== null
}

if (!hasAccess) {
logger.warn(`[${requestId}] Access denied for user ${userId} to workflow ${workflowId}`)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}

// Parse and validate the workflow state from request body
const body = await request.json()
const { workflowState } = body

if (!workflowState) {
return NextResponse.json({ error: 'Missing workflowState in request body' }, { status: 400 })
}

// Validate the workflow state structure
const validationResult = workflowStateApiSchema.safeParse(workflowState)
if (!validationResult.success) {
logger.error(`[${requestId}] Invalid workflow state structure:`, {
error: validationResult.error,
receivedData: JSON.stringify(workflowState, null, 2),
})
return NextResponse.json(
{
error: 'Invalid workflow state structure',
details: validationResult.error.issues,
receivedKeys: Object.keys(workflowState || {}),
},
{ status: 400 }
)
}

const validatedState = validationResult.data

// Save to normalized tables
logger.info(`[${requestId}] Saving workflow state to normalized tables`)

// Convert deployedAt to Date if it's a string
let deployedAt: Date | undefined
if (validatedState.deployedAt) {
if (typeof validatedState.deployedAt === 'string') {
deployedAt = new Date(validatedState.deployedAt)
} else if (validatedState.deployedAt instanceof Date) {
deployedAt = validatedState.deployedAt
}
}

const saveResult = await saveWorkflowToNormalizedTables(workflowId, {
blocks: validatedState.blocks,
edges: validatedState.edges,
loops: validatedState.loops || {},
parallels: validatedState.parallels || {},
lastSaved: Date.now(),
isDeployed: validatedState.isDeployed,
deployedAt,
deploymentStatuses: validatedState.deploymentStatuses || {},
hasActiveSchedule: validatedState.hasActiveSchedule || false,
hasActiveWebhook: validatedState.hasActiveWebhook || false,
})

if (!saveResult.success) {
logger.error(`[${requestId}] Failed to save workflow state:`, saveResult.error)
return NextResponse.json(
{ error: saveResult.error || 'Failed to save workflow state' },
{ status: 500 }
)
}

// Update workflow's last_synced timestamp
await db
.update(workflow)
.set({
lastSynced: new Date(),
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))

logger.info(`[${requestId}] Successfully force synced workflow ${workflowId}`)

// Notify socket server about the sync for real-time updates
try {
const socketServerUrl = process.env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-synced`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
workflowId,
timestamp: Date.now(),
userId,
}),
})
logger.debug(`[${requestId}] Notified socket server about force sync`)
} catch (socketError) {
// Don't fail the request if socket notification fails
logger.warn(`[${requestId}] Failed to notify socket server about sync:`, socketError)
}

return NextResponse.json({
success: true,
message: 'Workflow state synced successfully',
timestamp: Date.now(),
})
} catch (error: any) {
logger.error(`[${requestId}] Force sync error:`, error)
return NextResponse.json({ error: error.message || 'Internal server error' }, { status: 500 })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,64 @@ import { useEffect, useState } from 'react'

interface ConnectionStatusProps {
isConnected: boolean
isSyncing: boolean
}

export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
export function ConnectionStatus({ isConnected, isSyncing }: ConnectionStatusProps) {
const [showOfflineNotice, setShowOfflineNotice] = useState(false)
const [syncCompleted, setSyncCompleted] = useState(false)

useEffect(() => {
let timeoutId: NodeJS.Timeout

if (!isConnected) {
// Show offline notice after 6 seconds of being disconnected
timeoutId = setTimeout(() => {
setShowOfflineNotice(true)
}, 6000) // 6 seconds
} else {
// Hide notice immediately when reconnected
} else if (isConnected && showOfflineNotice && !isSyncing && syncCompleted) {
setShowOfflineNotice(false)
setSyncCompleted(false)
}

return () => {
if (timeoutId) {
clearTimeout(timeoutId)
}
}
}, [isConnected])
}, [isConnected, isSyncing, showOfflineNotice, syncCompleted])

// Track when sync completes
useEffect(() => {
if (!isSyncing && showOfflineNotice && isConnected) {
setSyncCompleted(true)
}
}, [isSyncing, showOfflineNotice, isConnected])

// Don't render anything if connected or if we haven't been disconnected long enough
if (!showOfflineNotice) {
return null
}

// Show different states based on connection and sync status
if (isConnected && isSyncing) {
return (
<div className='flex items-center gap-1.5'>
<div className='flex items-center gap-1.5 text-yellow-600'>
<div className='relative flex items-center justify-center'>
<div className='absolute h-3 w-3 animate-ping rounded-full bg-yellow-500/20' />
<div className='relative h-2 w-2 rounded-full bg-yellow-500' />
</div>
<div className='flex flex-col'>
<span className='font-medium text-xs leading-tight'>Syncing changes</span>
<span className='text-xs leading-tight opacity-90'>
Saving local changes to database...
</span>
</div>
</div>
</div>
)
}

return (
<div className='flex items-center gap-1.5'>
<div className='flex items-center gap-1.5 text-red-600'>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use client'

import { useMemo } from 'react'
import { useSocket } from '@/contexts/socket-context'
import { usePresence } from '../../../../hooks/use-presence'
import { ConnectionStatus } from './components/connection-status/connection-status'
import { UserAvatar } from './components/user-avatar/user-avatar'
Expand All @@ -27,6 +28,7 @@ export function UserAvatarStack({
}: UserAvatarStackProps) {
// Use presence data if no users are provided via props
const { users: presenceUsers, isConnected } = usePresence()
const { isSyncing } = useSocket()
const users = propUsers || presenceUsers

// Memoize the processed users to avoid unnecessary re-renders
Expand All @@ -45,8 +47,10 @@ export function UserAvatarStack({
}, [users, maxVisible])

// Show connection status component regardless of user count
// This will handle the offline notice when disconnected for 15 seconds
const connectionStatusElement = <ConnectionStatus isConnected={isConnected} />
// This will handle the offline notice when disconnected for 6 seconds
const connectionStatusElement = (
<ConnectionStatus isConnected={isConnected} isSyncing={isSyncing} />
)

// Only show presence when there are multiple users (>1)
// But always show connection status
Expand Down
Loading