Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ export function Code({
}
}, [generationType])

// State management
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId)
const [code, setCode] = useState<string>('')
const [_lineCount, setLineCount] = useState(1)
const [showTags, setShowTags] = useState(false)
Expand All @@ -98,53 +96,69 @@ export function Code({
const toggleCollapsed = () => {
setCollapsedValue(blockId, collapsedStateKey, !isCollapsed)
}

// Create refs to hold the handlers
const handleStreamStartRef = useRef<() => void>(() => {})
const handleGeneratedContentRef = useRef<(generatedCode: string) => void>(() => {})
const handleStreamChunkRef = useRef<(chunk: string) => void>(() => {})

// AI Code Generation Hook
const {
isLoading: isAiLoading,
isStreaming: isAiStreaming,
generate: generateCode,
generateStream: generateCodeStream,
cancelGeneration,
isPromptVisible,
showPromptInline,
hidePromptInline,
promptInputValue,
updatePromptValue,
} = useCodeGeneration({
generationType: generationType,
initialContext: code,
onGeneratedContent: (content: string) => handleGeneratedContentRef.current?.(content),
onStreamChunk: (chunk: string) => handleStreamChunkRef.current?.(chunk),
onStreamStart: () => handleStreamStartRef.current?.(),
})

// State management - useSubBlockValue with explicit streaming control
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId, false, {
debounceMs: 150,
isStreaming: isAiStreaming, // Use AI streaming state directly
onStreamingEnd: () => {
logger.debug('AI streaming ended, value persisted', { blockId, subBlockId })
},
})

// Use preview value when in preview mode, otherwise use store value or prop value
const value = isPreview ? previewValue : propValue !== undefined ? propValue : storeValue

// AI Code Generation Hook
const handleStreamStart = () => {
// Define the handlers now that we have access to setStoreValue
handleStreamStartRef.current = () => {
setCode('')
// Optionally clear the store value too, though handleStreamChunk will update it
// setStoreValue('')
// Streaming state is now controlled by isAiStreaming
}

const handleGeneratedContent = (generatedCode: string) => {
handleGeneratedContentRef.current = (generatedCode: string) => {
setCode(generatedCode)
if (!isPreview && !disabled) {
setStoreValue(generatedCode)
// Final value will be persisted when isAiStreaming becomes false
}
}

// Handle streaming chunks directly into the editor
const handleStreamChunk = (chunk: string) => {
handleStreamChunkRef.current = (chunk: string) => {
setCode((currentCode) => {
const newCode = currentCode + chunk
if (!isPreview && !disabled) {
// Update the value - it won't be persisted until streaming ends
setStoreValue(newCode)
}
return newCode
})
}

const {
isLoading: isAiLoading,
isStreaming: isAiStreaming,
generate: generateCode,
generateStream: generateCodeStream,
cancelGeneration,
isPromptVisible,
showPromptInline,
hidePromptInline,
promptInputValue,
updatePromptValue,
} = useCodeGeneration({
generationType: generationType,
initialContext: code,
onGeneratedContent: handleGeneratedContent,
onStreamChunk: handleStreamChunk,
onStreamStart: handleStreamStart,
})

// Effects
useEffect(() => {
const valueString = value?.toString() ?? ''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ export function ResponseFormat({
isPreview = false,
previewValue,
}: ResponseFormatProps) {
const [storeValue, setStoreValue] = useSubBlockValue<JSONProperty[]>(blockId, subBlockId)
// useSubBlockValue now includes debouncing by default
const [storeValue, setStoreValue] = useSubBlockValue<JSONProperty[]>(blockId, subBlockId, false, {
debounceMs: 200, // Slightly longer debounce for complex structures
})

const [showPreview, setShowPreview] = useState(false)

const value = isPreview ? previewValue : storeValue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { useCallback, useEffect, useRef } from 'react'
import { isEqual } from 'lodash'
import { createLogger } from '@/lib/logs/console-logger'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { getProviderFromModel } from '@/providers/utils'
import { useGeneralStore } from '@/stores/settings/general/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'

const logger = createLogger('SubBlockValue')

// Helper function to dispatch collaborative subblock updates
const dispatchSubblockUpdate = (blockId: string, subBlockId: string, value: any) => {
const event = new CustomEvent('update-subblock-value', {
Expand Down Expand Up @@ -154,20 +158,31 @@ function storeApiKeyValue(
}
}

interface UseSubBlockValueOptions {
debounceMs?: number
isStreaming?: boolean // Explicit streaming state
onStreamingEnd?: () => void
}

/**
* Custom hook to get and set values for a sub-block in a workflow.
* Handles complex object values properly by using deep equality comparison.
* Includes automatic debouncing and explicit streaming mode for AI generation.
*
* @param blockId The ID of the block containing the sub-block
* @param subBlockId The ID of the sub-block
* @param triggerWorkflowUpdate Whether to trigger a workflow update when the value changes
* @returns A tuple containing the current value and a setter function
* @param options Configuration for debouncing and streaming behavior
* @returns A tuple containing the current value and setter function
*/
export function useSubBlockValue<T = any>(
blockId: string,
subBlockId: string,
triggerWorkflowUpdate = false
triggerWorkflowUpdate = false,
options?: UseSubBlockValueOptions
): readonly [T | null, (value: T) => void] {
const { debounceMs = 150, isStreaming = false, onStreamingEnd } = options || {}

const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()

const blockType = useWorkflowStore(
Expand All @@ -187,6 +202,12 @@ export function useSubBlockValue<T = any>(
// Previous model reference for detecting model changes
const prevModelRef = useRef<string | null>(null)

// Debouncing refs
const debounceTimerRef = useRef<NodeJS.Timeout | null>(null)
const lastEmittedValueRef = useRef<T | null>(null)
const streamingValueRef = useRef<T | null>(null)
const wasStreamingRef = useRef<boolean>(false)

// Get value from subblock store - always call this hook unconditionally
const storeValue = useSubBlockStore(
useCallback((state) => state.getValue(blockId, subBlockId), [blockId, subBlockId])
Expand All @@ -211,13 +232,59 @@ export function useSubBlockValue<T = any>(
// Compute the modelValue based on block type
const modelValue = isProviderBasedBlock ? (modelSubBlockValue as string) : null

// Cleanup timer on unmount
useEffect(() => {
return () => {
if (debounceTimerRef.current) {
clearTimeout(debounceTimerRef.current)
}
}
}, [])

// Emit the value to socket/DB
const emitValue = useCallback(
(value: T) => {
collaborativeSetSubblockValue(blockId, subBlockId, value)
lastEmittedValueRef.current = value
},
[blockId, subBlockId, collaborativeSetSubblockValue]
)

// Handle streaming mode changes
useEffect(() => {
// If we just exited streaming mode, emit the final value
if (wasStreamingRef.current && !isStreaming && streamingValueRef.current !== null) {
logger.debug('Streaming ended, persisting final value', { blockId, subBlockId })
emitValue(streamingValueRef.current)
streamingValueRef.current = null
onStreamingEnd?.()
}
wasStreamingRef.current = isStreaming
}, [isStreaming, blockId, subBlockId, emitValue, onStreamingEnd])

// Hook to set a value in the subblock store
const setValue = useCallback(
(newValue: T) => {
// Use deep comparison to avoid unnecessary updates for complex objects
if (!isEqual(valueRef.current, newValue)) {
valueRef.current = newValue

// Always update local store immediately for UI responsiveness
useSubBlockStore.setState((state) => ({
workflowValues: {
...state.workflowValues,
[useWorkflowRegistry.getState().activeWorkflowId || '']: {
...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || ''],
[blockId]: {
...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || '']?.[
blockId
],
[subBlockId]: newValue,
},
},
},
}))

// Ensure we're passing the actual value, not a reference that might change
const valueCopy =
newValue === null
Expand All @@ -231,8 +298,27 @@ export function useSubBlockValue<T = any>(
storeApiKeyValue(blockId, blockType, modelValue, newValue, storeValue)
}

// Use collaborative function which handles both local store update and socket emission
collaborativeSetSubblockValue(blockId, subBlockId, valueCopy)
// Clear any existing debounce timer
if (debounceTimerRef.current) {
clearTimeout(debounceTimerRef.current)
debounceTimerRef.current = null
}

// If streaming, just store the value without emitting
if (isStreaming) {
streamingValueRef.current = valueCopy
} else {
// Detect large changes for extended debounce
const isLargeChange = detectLargeChange(lastEmittedValueRef.current, valueCopy)
const effectiveDebounceMs = isLargeChange ? debounceMs * 2 : debounceMs

// Debounce the socket emission
debounceTimerRef.current = setTimeout(() => {
if (valueRef.current !== null && valueRef.current !== lastEmittedValueRef.current) {
emitValue(valueCopy)
}
}, effectiveDebounceMs)
}

if (triggerWorkflowUpdate) {
useWorkflowStore.getState().triggerUpdate()
Expand All @@ -247,7 +333,9 @@ export function useSubBlockValue<T = any>(
storeValue,
triggerWorkflowUpdate,
modelValue,
collaborativeSetSubblockValue,
isStreaming,
debounceMs,
emitValue,
]
)

Expand Down Expand Up @@ -320,5 +408,29 @@ export function useSubBlockValue<T = any>(
}
}, [storeValue, initialValue])

// Return appropriate tuple based on whether options were provided
return [storeValue !== undefined ? storeValue : initialValue, setValue] as const
}

// Helper function to detect large changes
function detectLargeChange(oldValue: any, newValue: any): boolean {
// Handle null/undefined
if (oldValue == null && newValue == null) return false
if (oldValue == null || newValue == null) return true

// For strings, check if it's a large paste or deletion
if (typeof oldValue === 'string' && typeof newValue === 'string') {
const sizeDiff = Math.abs(newValue.length - oldValue.length)
// Consider it a large change if more than 50 characters changed at once
return sizeDiff > 50
}

// For arrays, check length difference
if (Array.isArray(oldValue) && Array.isArray(newValue)) {
const sizeDiff = Math.abs(newValue.length - oldValue.length)
return sizeDiff > 5
}

// For other types, always treat as small change
return false
}
2 changes: 0 additions & 2 deletions apps/sim/socket-server/handlers/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,5 @@ export function setupConnectionHandlers(
roomManager.cleanupUserFromRoom(socket.id, workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
}

roomManager.clearPendingOperations(socket.id)
})
}
5 changes: 0 additions & 5 deletions apps/sim/socket-server/rooms/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ export class RoomManager {
this.userSessions.delete(socketId)
}

// This would be used if we implement operation queuing
clearPendingOperations(socketId: string) {
logger.debug(`Cleared pending operations for socket ${socketId}`)
}

handleWorkflowDeletion(workflowId: string) {
logger.info(`Handling workflow deletion notification for ${workflowId}`)

Expand Down