-
Notifications
You must be signed in to change notification settings - Fork 3.2k
fix(revert-deployed): correctly revert to deployed state as unit op using separate endpoint #633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| import crypto from 'crypto' | ||
| import { eq } from 'drizzle-orm' | ||
| import type { NextRequest } from 'next/server' | ||
| import { createLogger } from '@/lib/logs/console-logger' | ||
| import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers' | ||
| import { db } from '@/db' | ||
| import { workflow } from '@/db/schema' | ||
| import type { WorkflowState } from '@/stores/workflows/workflow/types' | ||
| import { validateWorkflowAccess } from '../../middleware' | ||
| import { createErrorResponse, createSuccessResponse } from '../../utils' | ||
|
|
||
| const logger = createLogger('RevertToDeployedAPI') | ||
|
|
||
| export const dynamic = 'force-dynamic' | ||
| export const runtime = 'nodejs' | ||
|
|
||
| /** | ||
| * POST /api/workflows/[id]/revert-to-deployed | ||
| * Revert workflow to its deployed state by saving deployed state to normalized tables | ||
| */ | ||
| export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) { | ||
| const requestId = crypto.randomUUID().slice(0, 8) | ||
| const { id } = await params | ||
|
|
||
| try { | ||
| logger.debug(`[${requestId}] Reverting workflow to deployed state: ${id}`) | ||
| const validation = await validateWorkflowAccess(request, id, false) | ||
|
|
||
| if (validation.error) { | ||
| logger.warn(`[${requestId}] Workflow revert failed: ${validation.error.message}`) | ||
| return createErrorResponse(validation.error.message, validation.error.status) | ||
| } | ||
|
|
||
| const workflowData = validation.workflow | ||
|
|
||
| // Check if workflow is deployed and has deployed state | ||
| if (!workflowData.isDeployed || !workflowData.deployedState) { | ||
| logger.warn(`[${requestId}] Cannot revert: workflow is not deployed or has no deployed state`) | ||
| return createErrorResponse('Workflow is not deployed or has no deployed state', 400) | ||
| } | ||
|
|
||
| // Validate deployed state structure | ||
| const deployedState = workflowData.deployedState as WorkflowState | ||
| if (!deployedState.blocks || !deployedState.edges) { | ||
| logger.error(`[${requestId}] Invalid deployed state structure`, { deployedState }) | ||
| return createErrorResponse('Invalid deployed state structure', 500) | ||
| } | ||
|
|
||
| logger.debug(`[${requestId}] Saving deployed state to normalized tables`, { | ||
| blocksCount: Object.keys(deployedState.blocks).length, | ||
| edgesCount: deployedState.edges.length, | ||
| loopsCount: Object.keys(deployedState.loops || {}).length, | ||
| parallelsCount: Object.keys(deployedState.parallels || {}).length, | ||
| }) | ||
|
|
||
| // Save deployed state to normalized tables | ||
| const saveResult = await saveWorkflowToNormalizedTables(id, { | ||
| blocks: deployedState.blocks, | ||
| edges: deployedState.edges, | ||
| loops: deployedState.loops || {}, | ||
| parallels: deployedState.parallels || {}, | ||
| lastSaved: Date.now(), | ||
| isDeployed: workflowData.isDeployed, | ||
| deployedAt: workflowData.deployedAt, | ||
| deploymentStatuses: deployedState.deploymentStatuses || {}, | ||
| hasActiveSchedule: deployedState.hasActiveSchedule || false, | ||
| hasActiveWebhook: deployedState.hasActiveWebhook || false, | ||
| }) | ||
|
Comment on lines
+57
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Spread operator could simplify this object construction: {...deployedState, lastSaved: Date.now()} |
||
|
|
||
| if (!saveResult.success) { | ||
| logger.error(`[${requestId}] Failed to save deployed state to normalized tables`, { | ||
| error: saveResult.error, | ||
| }) | ||
| return createErrorResponse( | ||
| saveResult.error || 'Failed to save deployed state to normalized tables', | ||
| 500 | ||
| ) | ||
| } | ||
|
|
||
| // Update workflow's last_synced timestamp to indicate changes | ||
| await db | ||
| .update(workflow) | ||
| .set({ | ||
| lastSynced: new Date(), | ||
| updatedAt: new Date(), | ||
| }) | ||
| .where(eq(workflow.id, id)) | ||
|
|
||
| // Notify socket server about the revert operation for real-time sync | ||
| try { | ||
| const socketServerUrl = process.env.SOCKET_SERVER_URL || 'http://localhost:3002' | ||
| await fetch(`${socketServerUrl}/api/workflow-reverted`, { | ||
| method: 'POST', | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| }, | ||
| body: JSON.stringify({ | ||
| workflowId: id, | ||
| timestamp: Date.now(), | ||
| }), | ||
| }) | ||
| logger.debug(`[${requestId}] Notified socket server about workflow revert: ${id}`) | ||
| } catch (socketError) { | ||
| // Don't fail the request if socket notification fails | ||
| logger.warn(`[${requestId}] Failed to notify socket server about revert:`, socketError) | ||
| } | ||
|
Comment on lines
+89
to
+106
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Socket notification logic is complex enough to warrant extraction into separate function |
||
|
|
||
| logger.info(`[${requestId}] Successfully reverted workflow to deployed state: ${id}`) | ||
|
|
||
| return createSuccessResponse({ | ||
| message: 'Workflow successfully reverted to deployed state', | ||
| lastSaved: Date.now(), | ||
| }) | ||
| } catch (error: any) { | ||
| logger.error(`[${requestId}] Error reverting workflow to deployed state: ${id}`, { | ||
| error: error.message, | ||
| stack: error.stack, | ||
| }) | ||
| return createErrorResponse(error.message || 'Failed to revert workflow to deployed state', 500) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -50,6 +50,7 @@ interface SocketContextType { | |||||||||
| onUserJoined: (handler: (data: any) => void) => void | ||||||||||
| onUserLeft: (handler: (data: any) => void) => void | ||||||||||
| onWorkflowDeleted: (handler: (data: any) => void) => void | ||||||||||
| onWorkflowReverted: (handler: (data: any) => void) => void | ||||||||||
| } | ||||||||||
|
|
||||||||||
| const SocketContext = createContext<SocketContextType>({ | ||||||||||
|
|
@@ -71,6 +72,7 @@ const SocketContext = createContext<SocketContextType>({ | |||||||||
| onUserJoined: () => {}, | ||||||||||
| onUserLeft: () => {}, | ||||||||||
| onWorkflowDeleted: () => {}, | ||||||||||
| onWorkflowReverted: () => {}, | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| export const useSocket = () => useContext(SocketContext) | ||||||||||
|
|
@@ -100,6 +102,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { | |||||||||
| userJoined?: (data: any) => void | ||||||||||
| userLeft?: (data: any) => void | ||||||||||
| workflowDeleted?: (data: any) => void | ||||||||||
| workflowReverted?: (data: any) => void | ||||||||||
|
Comment on lines
104
to
+105
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Consider adding TypeScript interface for the workflowReverted event data to maintain type safety across the application.
Suggested change
|
||||||||||
| }>({}) | ||||||||||
|
|
||||||||||
| // Helper function to generate a fresh socket token | ||||||||||
|
|
@@ -281,6 +284,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { | |||||||||
| eventHandlers.current.workflowDeleted?.(data) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| // Workflow revert events | ||||||||||
| socketInstance.on('workflow-reverted', (data) => { | ||||||||||
| logger.info(`Workflow ${data.workflowId} has been reverted to deployed state`) | ||||||||||
| eventHandlers.current.workflowReverted?.(data) | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| // Cursor update events | ||||||||||
| socketInstance.on('cursor-update', (data) => { | ||||||||||
| setPresenceUsers((prev) => | ||||||||||
|
|
@@ -557,6 +566,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { | |||||||||
| eventHandlers.current.workflowDeleted = handler | ||||||||||
| }, []) | ||||||||||
|
|
||||||||||
| const onWorkflowReverted = useCallback((handler: (data: any) => void) => { | ||||||||||
| eventHandlers.current.workflowReverted = handler | ||||||||||
| }, []) | ||||||||||
|
|
||||||||||
| return ( | ||||||||||
| <SocketContext.Provider | ||||||||||
| value={{ | ||||||||||
|
|
@@ -578,6 +591,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { | |||||||||
| onUserJoined, | ||||||||||
| onUserLeft, | ||||||||||
| onWorkflowDeleted, | ||||||||||
| onWorkflowReverted, | ||||||||||
| }} | ||||||||||
| > | ||||||||||
| {children} | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ export function useCollaborativeWorkflow() { | |
| onUserJoined, | ||
| onUserLeft, | ||
| onWorkflowDeleted, | ||
| onWorkflowReverted, | ||
| } = useSocket() | ||
|
|
||
| const { activeWorkflowId } = useWorkflowRegistry() | ||
|
|
@@ -262,12 +263,80 @@ export function useCollaborativeWorkflow() { | |
| } | ||
| } | ||
|
|
||
| const handleWorkflowReverted = async (data: any) => { | ||
| const { workflowId } = data | ||
| logger.info(`Workflow ${workflowId} has been reverted to deployed state`) | ||
|
|
||
| // If the reverted workflow is the currently active one, reload the workflow state | ||
| if (activeWorkflowId === workflowId) { | ||
| logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`) | ||
|
|
||
| try { | ||
| // Fetch the updated workflow state from the server (which loads from normalized tables) | ||
| const response = await fetch(`/api/workflows/${workflowId}`) | ||
| if (response.ok) { | ||
| const responseData = await response.json() | ||
| const workflowData = responseData.data | ||
|
|
||
| if (workflowData?.state) { | ||
| // Update the workflow store with the reverted state | ||
| isApplyingRemoteChange.current = true | ||
| try { | ||
| // Update the main workflow state using the API response | ||
| useWorkflowStore.setState({ | ||
| blocks: workflowData.state.blocks || {}, | ||
| edges: workflowData.state.edges || [], | ||
| loops: workflowData.state.loops || {}, | ||
| parallels: workflowData.state.parallels || {}, | ||
| isDeployed: workflowData.state.isDeployed || false, | ||
| deployedAt: workflowData.state.deployedAt, | ||
| lastSaved: workflowData.state.lastSaved || Date.now(), | ||
| hasActiveSchedule: workflowData.state.hasActiveSchedule || false, | ||
| hasActiveWebhook: workflowData.state.hasActiveWebhook || false, | ||
| deploymentStatuses: workflowData.state.deploymentStatuses || {}, | ||
| }) | ||
|
|
||
| // Update subblock store with reverted values | ||
| const subblockValues: Record<string, Record<string, any>> = {} | ||
| Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { | ||
| const blockState = block as any | ||
| subblockValues[blockId] = {} | ||
| Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { | ||
| subblockValues[blockId][subblockId] = (subblock as any).value | ||
| }) | ||
| }) | ||
|
Comment on lines
+302
to
+307
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Type assertion to 'any' should be avoided. Define proper types for block state. |
||
|
|
||
| // Update subblock store for this workflow | ||
| useSubBlockStore.setState((state: any) => ({ | ||
| workflowValues: { | ||
| ...state.workflowValues, | ||
| [workflowId]: subblockValues, | ||
| }, | ||
| })) | ||
|
|
||
| logger.info(`Successfully loaded reverted workflow state for ${workflowId}`) | ||
| } finally { | ||
| isApplyingRemoteChange.current = false | ||
| } | ||
| } else { | ||
| logger.error('No state found in workflow data after revert', { workflowData }) | ||
| } | ||
| } else { | ||
| logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`) | ||
| } | ||
| } catch (error) { | ||
| logger.error('Error reloading workflow state after revert:', error) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Register event handlers | ||
| onWorkflowOperation(handleWorkflowOperation) | ||
| onSubblockUpdate(handleSubblockUpdate) | ||
| onUserJoined(handleUserJoined) | ||
| onUserLeft(handleUserLeft) | ||
| onWorkflowDeleted(handleWorkflowDeleted) | ||
| onWorkflowReverted(handleWorkflowReverted) | ||
|
|
||
| return () => { | ||
| // Cleanup handled by socket context | ||
|
|
@@ -278,6 +347,7 @@ export function useCollaborativeWorkflow() { | |
| onUserJoined, | ||
| onUserLeft, | ||
| onWorkflowDeleted, | ||
| onWorkflowReverted, | ||
| workflowStore, | ||
| subBlockStore, | ||
| activeWorkflowId, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,6 +115,26 @@ export class RoomManager { | |
| ) | ||
| } | ||
|
|
||
| handleWorkflowRevert(workflowId: string, timestamp: number) { | ||
| logger.info(`Handling workflow revert notification for ${workflowId}`) | ||
|
|
||
| const room = this.workflowRooms.get(workflowId) | ||
| if (!room) { | ||
| logger.debug(`No active room found for reverted workflow ${workflowId}`) | ||
| return | ||
| } | ||
|
|
||
| this.io.to(workflowId).emit('workflow-reverted', { | ||
| workflowId, | ||
| message: 'Workflow has been reverted to deployed state', | ||
| timestamp, | ||
| }) | ||
|
|
||
| room.lastModified = timestamp | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logic: Potential race condition: timestamp should be checked against room.lastModified before updating |
||
|
|
||
| logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`) | ||
| } | ||
|
|
||
| async validateWorkflowConsistency( | ||
| workflowId: string | ||
| ): Promise<{ valid: boolean; issues: string[] }> { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -50,6 +50,27 @@ export function createHttpHandler(roomManager: RoomManager, logger: Logger) { | |||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Handle workflow revert notifications from the main API | ||||||||||||||||||||||||||||||||||
| if (req.method === 'POST' && req.url === '/api/workflow-reverted') { | ||||||||||||||||||||||||||||||||||
| let body = '' | ||||||||||||||||||||||||||||||||||
| req.on('data', (chunk) => { | ||||||||||||||||||||||||||||||||||
| body += chunk.toString() | ||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+55
to
+58
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Consider using a stream buffer size limit to prevent memory issues with large payloads
Suggested change
|
||||||||||||||||||||||||||||||||||
| req.on('end', () => { | ||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||
| const { workflowId, timestamp } = JSON.parse(body) | ||||||||||||||||||||||||||||||||||
| roomManager.handleWorkflowRevert(workflowId, timestamp) | ||||||||||||||||||||||||||||||||||
| res.writeHead(200, { 'Content-Type': 'application/json' }) | ||||||||||||||||||||||||||||||||||
| res.end(JSON.stringify({ success: true })) | ||||||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||||||
| logger.error('Error handling workflow revert notification:', error) | ||||||||||||||||||||||||||||||||||
| res.writeHead(500, { 'Content-Type': 'application/json' }) | ||||||||||||||||||||||||||||||||||
| res.end(JSON.stringify({ error: 'Failed to process revert notification' })) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| res.writeHead(404, { 'Content-Type': 'application/json' }) | ||||||||||||||||||||||||||||||||||
| res.end(JSON.stringify({ error: 'Not found' })) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Type assertion could be unsafe. Consider runtime type checking or zod schema validation