diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index ea18bfc5a7..b303109ec8 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -44,8 +44,14 @@ export function useCollaborativeWorkflow() { const lastPositionTimestamps = useRef>(new Map()) // Operation queue - const { queue, hasOperationError, addToQueue, confirmOperation, failOperation } = - useOperationQueue() + const { + queue, + hasOperationError, + addToQueue, + confirmOperation, + failOperation, + cancelOperationsForBlock, + } = useOperationQueue() // Clear position timestamps when switching workflows // Note: Workflow joining is now handled automatically by socket connect event based on URL @@ -332,7 +338,7 @@ export function useCollaborativeWorkflow() { const { operationId, error, retryable } = data logger.warn('Operation failed', { operationId, error, retryable }) - failOperation(operationId) + failOperation(operationId, retryable) } // Register event handlers @@ -534,9 +540,11 @@ export function useCollaborativeWorkflow() { const collaborativeRemoveBlock = useCallback( (id: string) => { + cancelOperationsForBlock(id) + executeQueuedOperation('remove', 'block', { id }, () => workflowStore.removeBlock(id)) }, - [executeQueuedOperation, workflowStore] + [executeQueuedOperation, workflowStore, cancelOperationsForBlock] ) const collaborativeUpdateBlockPosition = useCallback( diff --git a/apps/sim/socket-server/handlers/subblocks.ts b/apps/sim/socket-server/handlers/subblocks.ts index a0ea38beac..3d5f6220ff 100644 --- a/apps/sim/socket-server/handlers/subblocks.ts +++ b/apps/sim/socket-server/handlers/subblocks.ts @@ -125,9 +125,16 @@ export function setupSubblocksHandlers( serverTimestamp: Date.now(), }) } - } - logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`) + logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`) + } else if (operationId) { + // Block was deleted - notify client that operation completed (but didn't update anything) + socket.emit('operation-failed', { + operationId, + error: 'Block no longer exists', + retryable: false, // No point retrying for deleted blocks + }) + } } catch (error) { logger.error('Error handling subblock update:', error) diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index 849fbc5ebd..cfa1a08efa 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -24,9 +24,10 @@ interface OperationQueueState { addToQueue: (operation: Omit) => void confirmOperation: (operationId: string) => void - failOperation: (operationId: string) => void + failOperation: (operationId: string, retryable?: boolean) => void handleOperationTimeout: (operationId: string) => void processNextOperation: () => void + cancelOperationsForBlock: (blockId: string) => void triggerOfflineMode: () => void clearError: () => void @@ -211,7 +212,7 @@ export const useOperationQueueStore = create((set, get) => get().processNextOperation() }, - failOperation: (operationId: string) => { + failOperation: (operationId: string, retryable = true) => { const state = get() const operation = state.operations.find((op) => op.id === operationId) if (!operation) { @@ -239,6 +240,18 @@ export const useOperationQueueStore = create((set, get) => } } + if (!retryable) { + logger.debug('Operation marked as non-retryable, removing from queue', { operationId }) + + set((state) => ({ + operations: state.operations.filter((op) => op.id !== operationId), + isProcessing: false, + })) + + get().processNextOperation() + return + } + if (operation.retryCount < 3) { const newRetryCount = operation.retryCount + 1 const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s @@ -338,6 +351,66 @@ export const useOperationQueueStore = create((set, get) => operationTimeouts.set(nextOperation.id, timeoutId) }, + cancelOperationsForBlock: (blockId: string) => { + logger.debug('Canceling all operations for block', { blockId }) + + // Cancel all debounce timeouts for this block's subblocks + const keysToDelete: string[] = [] + for (const [key, timeout] of subblockDebounceTimeouts.entries()) { + if (key.startsWith(`${blockId}-`)) { + clearTimeout(timeout) + keysToDelete.push(key) + } + } + keysToDelete.forEach((key) => subblockDebounceTimeouts.delete(key)) + + // Find and cancel operation timeouts for operations related to this block + const state = get() + const operationsToCancel = state.operations.filter( + (op) => + (op.operation.target === 'block' && op.operation.payload?.id === blockId) || + (op.operation.target === 'subblock' && op.operation.payload?.blockId === blockId) + ) + + // Cancel timeouts for these operations + operationsToCancel.forEach((op) => { + const operationTimeout = operationTimeouts.get(op.id) + if (operationTimeout) { + clearTimeout(operationTimeout) + operationTimeouts.delete(op.id) + } + + const retryTimeout = retryTimeouts.get(op.id) + if (retryTimeout) { + clearTimeout(retryTimeout) + retryTimeouts.delete(op.id) + } + }) + + // Remove all operations for this block (both pending and processing) + const newOperations = state.operations.filter( + (op) => + !( + (op.operation.target === 'block' && op.operation.payload?.id === blockId) || + (op.operation.target === 'subblock' && op.operation.payload?.blockId === blockId) + ) + ) + + set({ + operations: newOperations, + isProcessing: false, // Reset processing state in case we removed the current operation + }) + + logger.debug('Cancelled operations for block', { + blockId, + cancelledDebounceTimeouts: keysToDelete.length, + cancelledOperations: operationsToCancel.length, + }) + + // Process next operation if there are any remaining + get().processNextOperation() + }, + triggerOfflineMode: () => { logger.error('Operation failed after retries - triggering offline mode') @@ -369,6 +442,7 @@ export function useOperationQueue() { confirmOperation: store.confirmOperation, failOperation: store.failOperation, processNextOperation: store.processNextOperation, + cancelOperationsForBlock: store.cancelOperationsForBlock, triggerOfflineMode: store.triggerOfflineMode, clearError: store.clearError, }