Skip to content

Commit 6dc8b17

Browse files
icecrasher321Vikhyath Mondreti
andauthored
fix(sockets-server-disconnection): on reconnect force sync store to db (#638)
* keep warning until refresh * works * fix sockets server sync on reconnection * infinite reconn attempts * fix lint --------- Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
1 parent 9702155 commit 6dc8b17

File tree

5 files changed

+365
-9
lines changed

5 files changed

+365
-9
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import crypto from 'crypto'
2+
import { eq } from 'drizzle-orm'
3+
import { type NextRequest, NextResponse } from 'next/server'
4+
import { getSession } from '@/lib/auth'
5+
import { createLogger } from '@/lib/logs/console-logger'
6+
import { getUserEntityPermissions } from '@/lib/permissions/utils'
7+
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
8+
import { workflowStateApiSchema } from '@/lib/workflows/validation'
9+
import { db } from '@/db'
10+
import { workflow } from '@/db/schema'
11+
12+
const logger = createLogger('ForceSync')
13+
14+
/**
15+
* POST /api/workflows/[id]/force-sync
16+
* Force sync local workflow state to database immediately
17+
* Used during socket reconnection to ensure local changes are persisted
18+
*/
19+
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
20+
const requestId = crypto.randomUUID().slice(0, 8)
21+
const { id: workflowId } = await params
22+
23+
try {
24+
logger.info(`[${requestId}] Force sync request for workflow ${workflowId}`)
25+
26+
// Get session
27+
const session = await getSession()
28+
if (!session?.user?.id) {
29+
logger.warn(`[${requestId}] Unauthorized force sync attempt`)
30+
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
31+
}
32+
33+
const userId = session.user.id
34+
35+
// Get workflow and verify access (inline implementation)
36+
const workflowData = await db
37+
.select({
38+
userId: workflow.userId,
39+
workspaceId: workflow.workspaceId,
40+
name: workflow.name,
41+
})
42+
.from(workflow)
43+
.where(eq(workflow.id, workflowId))
44+
.limit(1)
45+
46+
if (!workflowData.length) {
47+
logger.warn(`[${requestId}] Workflow ${workflowId} not found`)
48+
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
49+
}
50+
51+
const workflowRecord = workflowData[0]
52+
let hasAccess = false
53+
54+
// Check if user owns the workflow
55+
if (workflowRecord.userId === userId) {
56+
hasAccess = true
57+
}
58+
59+
// Check workspace membership if workflow belongs to a workspace
60+
if (!hasAccess && workflowRecord.workspaceId) {
61+
const userPermission = await getUserEntityPermissions(
62+
userId,
63+
'workspace',
64+
workflowRecord.workspaceId
65+
)
66+
hasAccess = userPermission !== null
67+
}
68+
69+
if (!hasAccess) {
70+
logger.warn(`[${requestId}] Access denied for user ${userId} to workflow ${workflowId}`)
71+
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
72+
}
73+
74+
// Parse and validate the workflow state from request body
75+
const body = await request.json()
76+
const { workflowState } = body
77+
78+
if (!workflowState) {
79+
return NextResponse.json({ error: 'Missing workflowState in request body' }, { status: 400 })
80+
}
81+
82+
// Validate the workflow state structure
83+
const validationResult = workflowStateApiSchema.safeParse(workflowState)
84+
if (!validationResult.success) {
85+
logger.error(`[${requestId}] Invalid workflow state structure:`, {
86+
error: validationResult.error,
87+
receivedData: JSON.stringify(workflowState, null, 2),
88+
})
89+
return NextResponse.json(
90+
{
91+
error: 'Invalid workflow state structure',
92+
details: validationResult.error.issues,
93+
receivedKeys: Object.keys(workflowState || {}),
94+
},
95+
{ status: 400 }
96+
)
97+
}
98+
99+
const validatedState = validationResult.data
100+
101+
// Save to normalized tables
102+
logger.info(`[${requestId}] Saving workflow state to normalized tables`)
103+
104+
// Convert deployedAt to Date if it's a string
105+
let deployedAt: Date | undefined
106+
if (validatedState.deployedAt) {
107+
if (typeof validatedState.deployedAt === 'string') {
108+
deployedAt = new Date(validatedState.deployedAt)
109+
} else if (validatedState.deployedAt instanceof Date) {
110+
deployedAt = validatedState.deployedAt
111+
}
112+
}
113+
114+
const saveResult = await saveWorkflowToNormalizedTables(workflowId, {
115+
blocks: validatedState.blocks,
116+
edges: validatedState.edges,
117+
loops: validatedState.loops || {},
118+
parallels: validatedState.parallels || {},
119+
lastSaved: Date.now(),
120+
isDeployed: validatedState.isDeployed,
121+
deployedAt,
122+
deploymentStatuses: validatedState.deploymentStatuses || {},
123+
hasActiveSchedule: validatedState.hasActiveSchedule || false,
124+
hasActiveWebhook: validatedState.hasActiveWebhook || false,
125+
})
126+
127+
if (!saveResult.success) {
128+
logger.error(`[${requestId}] Failed to save workflow state:`, saveResult.error)
129+
return NextResponse.json(
130+
{ error: saveResult.error || 'Failed to save workflow state' },
131+
{ status: 500 }
132+
)
133+
}
134+
135+
// Update workflow's last_synced timestamp
136+
await db
137+
.update(workflow)
138+
.set({
139+
lastSynced: new Date(),
140+
updatedAt: new Date(),
141+
})
142+
.where(eq(workflow.id, workflowId))
143+
144+
logger.info(`[${requestId}] Successfully force synced workflow ${workflowId}`)
145+
146+
// Notify socket server about the sync for real-time updates
147+
try {
148+
const socketServerUrl = process.env.SOCKET_SERVER_URL || 'http://localhost:3002'
149+
await fetch(`${socketServerUrl}/api/workflow-synced`, {
150+
method: 'POST',
151+
headers: {
152+
'Content-Type': 'application/json',
153+
},
154+
body: JSON.stringify({
155+
workflowId,
156+
timestamp: Date.now(),
157+
userId,
158+
}),
159+
})
160+
logger.debug(`[${requestId}] Notified socket server about force sync`)
161+
} catch (socketError) {
162+
// Don't fail the request if socket notification fails
163+
logger.warn(`[${requestId}] Failed to notify socket server about sync:`, socketError)
164+
}
165+
166+
return NextResponse.json({
167+
success: true,
168+
message: 'Workflow state synced successfully',
169+
timestamp: Date.now(),
170+
})
171+
} catch (error: any) {
172+
logger.error(`[${requestId}] Force sync error:`, error)
173+
return NextResponse.json({ error: error.message || 'Internal server error' }, { status: 500 })
174+
}
175+
}

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/user-avatar-stack/components/connection-status/connection-status.tsx

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,64 @@ import { useEffect, useState } from 'react'
44

55
interface ConnectionStatusProps {
66
isConnected: boolean
7+
isSyncing: boolean
78
}
89

9-
export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
10+
export function ConnectionStatus({ isConnected, isSyncing }: ConnectionStatusProps) {
1011
const [showOfflineNotice, setShowOfflineNotice] = useState(false)
12+
const [syncCompleted, setSyncCompleted] = useState(false)
1113

1214
useEffect(() => {
1315
let timeoutId: NodeJS.Timeout
1416

1517
if (!isConnected) {
16-
// Show offline notice after 6 seconds of being disconnected
1718
timeoutId = setTimeout(() => {
1819
setShowOfflineNotice(true)
1920
}, 6000) // 6 seconds
20-
} else {
21-
// Hide notice immediately when reconnected
21+
} else if (isConnected && showOfflineNotice && !isSyncing && syncCompleted) {
2222
setShowOfflineNotice(false)
23+
setSyncCompleted(false)
2324
}
2425

2526
return () => {
2627
if (timeoutId) {
2728
clearTimeout(timeoutId)
2829
}
2930
}
30-
}, [isConnected])
31+
}, [isConnected, isSyncing, showOfflineNotice, syncCompleted])
32+
33+
// Track when sync completes
34+
useEffect(() => {
35+
if (!isSyncing && showOfflineNotice && isConnected) {
36+
setSyncCompleted(true)
37+
}
38+
}, [isSyncing, showOfflineNotice, isConnected])
3139

3240
// Don't render anything if connected or if we haven't been disconnected long enough
3341
if (!showOfflineNotice) {
3442
return null
3543
}
3644

45+
// Show different states based on connection and sync status
46+
if (isConnected && isSyncing) {
47+
return (
48+
<div className='flex items-center gap-1.5'>
49+
<div className='flex items-center gap-1.5 text-yellow-600'>
50+
<div className='relative flex items-center justify-center'>
51+
<div className='absolute h-3 w-3 animate-ping rounded-full bg-yellow-500/20' />
52+
<div className='relative h-2 w-2 rounded-full bg-yellow-500' />
53+
</div>
54+
<div className='flex flex-col'>
55+
<span className='font-medium text-xs leading-tight'>Syncing changes</span>
56+
<span className='text-xs leading-tight opacity-90'>
57+
Saving local changes to database...
58+
</span>
59+
</div>
60+
</div>
61+
</div>
62+
)
63+
}
64+
3765
return (
3866
<div className='flex items-center gap-1.5'>
3967
<div className='flex items-center gap-1.5 text-red-600'>

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/user-avatar-stack/user-avatar-stack.tsx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use client'
22

33
import { useMemo } from 'react'
4+
import { useSocket } from '@/contexts/socket-context'
45
import { usePresence } from '../../../../hooks/use-presence'
56
import { ConnectionStatus } from './components/connection-status/connection-status'
67
import { UserAvatar } from './components/user-avatar/user-avatar'
@@ -27,6 +28,7 @@ export function UserAvatarStack({
2728
}: UserAvatarStackProps) {
2829
// Use presence data if no users are provided via props
2930
const { users: presenceUsers, isConnected } = usePresence()
31+
const { isSyncing } = useSocket()
3032
const users = propUsers || presenceUsers
3133

3234
// Memoize the processed users to avoid unnecessary re-renders
@@ -45,8 +47,10 @@ export function UserAvatarStack({
4547
}, [users, maxVisible])
4648

4749
// Show connection status component regardless of user count
48-
// This will handle the offline notice when disconnected for 15 seconds
49-
const connectionStatusElement = <ConnectionStatus isConnected={isConnected} />
50+
// This will handle the offline notice when disconnected for 6 seconds
51+
const connectionStatusElement = (
52+
<ConnectionStatus isConnected={isConnected} isSyncing={isSyncing} />
53+
)
5054

5155
// Only show presence when there are multiple users (>1)
5256
// But always show connection status

0 commit comments

Comments
 (0)