Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions apps/sim/hooks/use-collaborative-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ export function useCollaborativeWorkflow() {
const lastPositionTimestamps = useRef<Map<string, number>>(new Map())

// Operation queue
const { queue, hasOperationError, addToQueue, confirmOperation, failOperation } =
useOperationQueue()
const {
queue,
hasOperationError,
addToQueue,
confirmOperation,
failOperation,
cancelOperationsForBlock,
} = useOperationQueue()

// Clear position timestamps when switching workflows
// Note: Workflow joining is now handled automatically by socket connect event based on URL
Expand Down Expand Up @@ -332,7 +338,7 @@ export function useCollaborativeWorkflow() {
const { operationId, error, retryable } = data
logger.warn('Operation failed', { operationId, error, retryable })

failOperation(operationId)
failOperation(operationId, retryable)
}

// Register event handlers
Expand Down Expand Up @@ -534,9 +540,11 @@ export function useCollaborativeWorkflow() {

const collaborativeRemoveBlock = useCallback(
(id: string) => {
cancelOperationsForBlock(id)

executeQueuedOperation('remove', 'block', { id }, () => workflowStore.removeBlock(id))
},
[executeQueuedOperation, workflowStore]
[executeQueuedOperation, workflowStore, cancelOperationsForBlock]
)

const collaborativeUpdateBlockPosition = useCallback(
Expand Down
11 changes: 9 additions & 2 deletions apps/sim/socket-server/handlers/subblocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,16 @@ export function setupSubblocksHandlers(
serverTimestamp: Date.now(),
})
}
}

logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
} else if (operationId) {
// Block was deleted - notify client that operation completed (but didn't update anything)
socket.emit('operation-failed', {
operationId,
error: 'Block no longer exists',
retryable: false, // No point retrying for deleted blocks
})
}
} catch (error) {
logger.error('Error handling subblock update:', error)

Expand Down
78 changes: 76 additions & 2 deletions apps/sim/stores/operation-queue/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ interface OperationQueueState {

addToQueue: (operation: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>) => void
confirmOperation: (operationId: string) => void
failOperation: (operationId: string) => void
failOperation: (operationId: string, retryable?: boolean) => void
handleOperationTimeout: (operationId: string) => void
processNextOperation: () => void
cancelOperationsForBlock: (blockId: string) => void

triggerOfflineMode: () => void
clearError: () => void
Expand Down Expand Up @@ -211,7 +212,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
get().processNextOperation()
},

failOperation: (operationId: string) => {
failOperation: (operationId: string, retryable = true) => {
const state = get()
const operation = state.operations.find((op) => op.id === operationId)
if (!operation) {
Expand Down Expand Up @@ -239,6 +240,18 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
}
}

if (!retryable) {
logger.debug('Operation marked as non-retryable, removing from queue', { operationId })

set((state) => ({
operations: state.operations.filter((op) => op.id !== operationId),
isProcessing: false,
}))

get().processNextOperation()
return
}

if (operation.retryCount < 3) {
const newRetryCount = operation.retryCount + 1
const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s
Expand Down Expand Up @@ -338,6 +351,66 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
operationTimeouts.set(nextOperation.id, timeoutId)
},

cancelOperationsForBlock: (blockId: string) => {
logger.debug('Canceling all operations for block', { blockId })

// Cancel all debounce timeouts for this block's subblocks
const keysToDelete: string[] = []
for (const [key, timeout] of subblockDebounceTimeouts.entries()) {
if (key.startsWith(`${blockId}-`)) {
clearTimeout(timeout)
keysToDelete.push(key)
}
}
keysToDelete.forEach((key) => subblockDebounceTimeouts.delete(key))

// Find and cancel operation timeouts for operations related to this block
const state = get()
const operationsToCancel = state.operations.filter(
(op) =>
(op.operation.target === 'block' && op.operation.payload?.id === blockId) ||
(op.operation.target === 'subblock' && op.operation.payload?.blockId === blockId)
)

// Cancel timeouts for these operations
operationsToCancel.forEach((op) => {
const operationTimeout = operationTimeouts.get(op.id)
if (operationTimeout) {
clearTimeout(operationTimeout)
operationTimeouts.delete(op.id)
}

const retryTimeout = retryTimeouts.get(op.id)
if (retryTimeout) {
clearTimeout(retryTimeout)
retryTimeouts.delete(op.id)
}
})

// Remove all operations for this block (both pending and processing)
const newOperations = state.operations.filter(
(op) =>
!(
(op.operation.target === 'block' && op.operation.payload?.id === blockId) ||
(op.operation.target === 'subblock' && op.operation.payload?.blockId === blockId)
)
)

set({
operations: newOperations,
isProcessing: false, // Reset processing state in case we removed the current operation
})

logger.debug('Cancelled operations for block', {
blockId,
cancelledDebounceTimeouts: keysToDelete.length,
cancelledOperations: operationsToCancel.length,
})

// Process next operation if there are any remaining
get().processNextOperation()
},

triggerOfflineMode: () => {
logger.error('Operation failed after retries - triggering offline mode')

Expand Down Expand Up @@ -369,6 +442,7 @@ export function useOperationQueue() {
confirmOperation: store.confirmOperation,
failOperation: store.failOperation,
processNextOperation: store.processNextOperation,
cancelOperationsForBlock: store.cancelOperationsForBlock,
triggerOfflineMode: store.triggerOfflineMode,
clearError: store.clearError,
}
Expand Down