diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx index a228efb171..8306e1e8f8 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx @@ -73,8 +73,6 @@ export function Code({ } }, [generationType]) - // State management - const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId) const [code, setCode] = useState('') const [_lineCount, setLineCount] = useState(1) const [showTags, setShowTags] = useState(false) @@ -98,53 +96,69 @@ export function Code({ const toggleCollapsed = () => { setCollapsedValue(blockId, collapsedStateKey, !isCollapsed) } + + // Create refs to hold the handlers + const handleStreamStartRef = useRef<() => void>(() => {}) + const handleGeneratedContentRef = useRef<(generatedCode: string) => void>(() => {}) + const handleStreamChunkRef = useRef<(chunk: string) => void>(() => {}) + + // AI Code Generation Hook + const { + isLoading: isAiLoading, + isStreaming: isAiStreaming, + generate: generateCode, + generateStream: generateCodeStream, + cancelGeneration, + isPromptVisible, + showPromptInline, + hidePromptInline, + promptInputValue, + updatePromptValue, + } = useCodeGeneration({ + generationType: generationType, + initialContext: code, + onGeneratedContent: (content: string) => handleGeneratedContentRef.current?.(content), + onStreamChunk: (chunk: string) => handleStreamChunkRef.current?.(chunk), + onStreamStart: () => handleStreamStartRef.current?.(), + }) + + // State management - useSubBlockValue with explicit streaming control + const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId, false, { + debounceMs: 150, + isStreaming: isAiStreaming, // Use AI streaming state directly + onStreamingEnd: () => { + logger.debug('AI streaming ended, value persisted', { blockId, subBlockId }) + }, + }) + // Use preview value when in preview mode, otherwise use store value or prop value const value = isPreview ? previewValue : propValue !== undefined ? propValue : storeValue - // AI Code Generation Hook - const handleStreamStart = () => { + // Define the handlers now that we have access to setStoreValue + handleStreamStartRef.current = () => { setCode('') - // Optionally clear the store value too, though handleStreamChunk will update it - // setStoreValue('') + // Streaming state is now controlled by isAiStreaming } - const handleGeneratedContent = (generatedCode: string) => { + handleGeneratedContentRef.current = (generatedCode: string) => { setCode(generatedCode) if (!isPreview && !disabled) { setStoreValue(generatedCode) + // Final value will be persisted when isAiStreaming becomes false } } - // Handle streaming chunks directly into the editor - const handleStreamChunk = (chunk: string) => { + handleStreamChunkRef.current = (chunk: string) => { setCode((currentCode) => { const newCode = currentCode + chunk if (!isPreview && !disabled) { + // Update the value - it won't be persisted until streaming ends setStoreValue(newCode) } return newCode }) } - const { - isLoading: isAiLoading, - isStreaming: isAiStreaming, - generate: generateCode, - generateStream: generateCodeStream, - cancelGeneration, - isPromptVisible, - showPromptInline, - hidePromptInline, - promptInputValue, - updatePromptValue, - } = useCodeGeneration({ - generationType: generationType, - initialContext: code, - onGeneratedContent: handleGeneratedContent, - onStreamChunk: handleStreamChunk, - onStreamStart: handleStreamStart, - }) - // Effects useEffect(() => { const valueString = value?.toString() ?? '' diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/response/response-format.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/response/response-format.tsx index 9f6a62c2e2..d7ce80d911 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/response/response-format.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/response/response-format.tsx @@ -50,7 +50,11 @@ export function ResponseFormat({ isPreview = false, previewValue, }: ResponseFormatProps) { - const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId) + // useSubBlockValue now includes debouncing by default + const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId, false, { + debounceMs: 200, // Slightly longer debounce for complex structures + }) + const [showPreview, setShowPreview] = useState(false) const value = isPreview ? previewValue : storeValue diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts index cee2f17a2d..09e7f0b770 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts @@ -1,11 +1,15 @@ import { useCallback, useEffect, useRef } from 'react' import { isEqual } from 'lodash' +import { createLogger } from '@/lib/logs/console-logger' import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow' import { getProviderFromModel } from '@/providers/utils' import { useGeneralStore } from '@/stores/settings/general/store' +import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' +const logger = createLogger('SubBlockValue') + // Helper function to dispatch collaborative subblock updates const dispatchSubblockUpdate = (blockId: string, subBlockId: string, value: any) => { const event = new CustomEvent('update-subblock-value', { @@ -154,20 +158,31 @@ function storeApiKeyValue( } } +interface UseSubBlockValueOptions { + debounceMs?: number + isStreaming?: boolean // Explicit streaming state + onStreamingEnd?: () => void +} + /** * Custom hook to get and set values for a sub-block in a workflow. * Handles complex object values properly by using deep equality comparison. + * Includes automatic debouncing and explicit streaming mode for AI generation. * * @param blockId The ID of the block containing the sub-block * @param subBlockId The ID of the sub-block * @param triggerWorkflowUpdate Whether to trigger a workflow update when the value changes - * @returns A tuple containing the current value and a setter function + * @param options Configuration for debouncing and streaming behavior + * @returns A tuple containing the current value and setter function */ export function useSubBlockValue( blockId: string, subBlockId: string, - triggerWorkflowUpdate = false + triggerWorkflowUpdate = false, + options?: UseSubBlockValueOptions ): readonly [T | null, (value: T) => void] { + const { debounceMs = 150, isStreaming = false, onStreamingEnd } = options || {} + const { collaborativeSetSubblockValue } = useCollaborativeWorkflow() const blockType = useWorkflowStore( @@ -187,6 +202,12 @@ export function useSubBlockValue( // Previous model reference for detecting model changes const prevModelRef = useRef(null) + // Debouncing refs + const debounceTimerRef = useRef(null) + const lastEmittedValueRef = useRef(null) + const streamingValueRef = useRef(null) + const wasStreamingRef = useRef(false) + // Get value from subblock store - always call this hook unconditionally const storeValue = useSubBlockStore( useCallback((state) => state.getValue(blockId, subBlockId), [blockId, subBlockId]) @@ -211,6 +232,36 @@ export function useSubBlockValue( // Compute the modelValue based on block type const modelValue = isProviderBasedBlock ? (modelSubBlockValue as string) : null + // Cleanup timer on unmount + useEffect(() => { + return () => { + if (debounceTimerRef.current) { + clearTimeout(debounceTimerRef.current) + } + } + }, []) + + // Emit the value to socket/DB + const emitValue = useCallback( + (value: T) => { + collaborativeSetSubblockValue(blockId, subBlockId, value) + lastEmittedValueRef.current = value + }, + [blockId, subBlockId, collaborativeSetSubblockValue] + ) + + // Handle streaming mode changes + useEffect(() => { + // If we just exited streaming mode, emit the final value + if (wasStreamingRef.current && !isStreaming && streamingValueRef.current !== null) { + logger.debug('Streaming ended, persisting final value', { blockId, subBlockId }) + emitValue(streamingValueRef.current) + streamingValueRef.current = null + onStreamingEnd?.() + } + wasStreamingRef.current = isStreaming + }, [isStreaming, blockId, subBlockId, emitValue, onStreamingEnd]) + // Hook to set a value in the subblock store const setValue = useCallback( (newValue: T) => { @@ -218,6 +269,22 @@ export function useSubBlockValue( if (!isEqual(valueRef.current, newValue)) { valueRef.current = newValue + // Always update local store immediately for UI responsiveness + useSubBlockStore.setState((state) => ({ + workflowValues: { + ...state.workflowValues, + [useWorkflowRegistry.getState().activeWorkflowId || '']: { + ...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || ''], + [blockId]: { + ...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || '']?.[ + blockId + ], + [subBlockId]: newValue, + }, + }, + }, + })) + // Ensure we're passing the actual value, not a reference that might change const valueCopy = newValue === null @@ -231,8 +298,27 @@ export function useSubBlockValue( storeApiKeyValue(blockId, blockType, modelValue, newValue, storeValue) } - // Use collaborative function which handles both local store update and socket emission - collaborativeSetSubblockValue(blockId, subBlockId, valueCopy) + // Clear any existing debounce timer + if (debounceTimerRef.current) { + clearTimeout(debounceTimerRef.current) + debounceTimerRef.current = null + } + + // If streaming, just store the value without emitting + if (isStreaming) { + streamingValueRef.current = valueCopy + } else { + // Detect large changes for extended debounce + const isLargeChange = detectLargeChange(lastEmittedValueRef.current, valueCopy) + const effectiveDebounceMs = isLargeChange ? debounceMs * 2 : debounceMs + + // Debounce the socket emission + debounceTimerRef.current = setTimeout(() => { + if (valueRef.current !== null && valueRef.current !== lastEmittedValueRef.current) { + emitValue(valueCopy) + } + }, effectiveDebounceMs) + } if (triggerWorkflowUpdate) { useWorkflowStore.getState().triggerUpdate() @@ -247,7 +333,9 @@ export function useSubBlockValue( storeValue, triggerWorkflowUpdate, modelValue, - collaborativeSetSubblockValue, + isStreaming, + debounceMs, + emitValue, ] ) @@ -320,5 +408,29 @@ export function useSubBlockValue( } }, [storeValue, initialValue]) + // Return appropriate tuple based on whether options were provided return [storeValue !== undefined ? storeValue : initialValue, setValue] as const } + +// Helper function to detect large changes +function detectLargeChange(oldValue: any, newValue: any): boolean { + // Handle null/undefined + if (oldValue == null && newValue == null) return false + if (oldValue == null || newValue == null) return true + + // For strings, check if it's a large paste or deletion + if (typeof oldValue === 'string' && typeof newValue === 'string') { + const sizeDiff = Math.abs(newValue.length - oldValue.length) + // Consider it a large change if more than 50 characters changed at once + return sizeDiff > 50 + } + + // For arrays, check length difference + if (Array.isArray(oldValue) && Array.isArray(newValue)) { + const sizeDiff = Math.abs(newValue.length - oldValue.length) + return sizeDiff > 5 + } + + // For other types, always treat as small change + return false +} diff --git a/apps/sim/socket-server/handlers/connection.ts b/apps/sim/socket-server/handlers/connection.ts index 7c1657cb32..f140cd538e 100644 --- a/apps/sim/socket-server/handlers/connection.ts +++ b/apps/sim/socket-server/handlers/connection.ts @@ -28,7 +28,5 @@ export function setupConnectionHandlers( roomManager.cleanupUserFromRoom(socket.id, workflowId) roomManager.broadcastPresenceUpdate(workflowId) } - - roomManager.clearPendingOperations(socket.id) }) } diff --git a/apps/sim/socket-server/rooms/manager.ts b/apps/sim/socket-server/rooms/manager.ts index 4791b9fdf9..3a4e73efd8 100644 --- a/apps/sim/socket-server/rooms/manager.ts +++ b/apps/sim/socket-server/rooms/manager.ts @@ -75,11 +75,6 @@ export class RoomManager { this.userSessions.delete(socketId) } - // This would be used if we implement operation queuing - clearPendingOperations(socketId: string) { - logger.debug(`Cleared pending operations for socket ${socketId}`) - } - handleWorkflowDeletion(workflowId: string) { logger.info(`Handling workflow deletion notification for ${workflowId}`)