Skip to content

Commit

Permalink
Bugfix/Prevent streaming of chatflow tool and chain tool (#3257)
Browse files Browse the repository at this point in the history
prevent streaming of chatflow tool and chain tool
  • Loading branch information
HenryHengZJ authored and 0xi4o committed Sep 30, 2024
1 parent d2577e5 commit a80ba4b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 40 deletions.
34 changes: 5 additions & 29 deletions packages/components/nodes/tools/ChainTool/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,17 @@ export class ChainTool extends DynamicTool {
super({
...rest,
func: async (input, runManager) => {
// prevent sending SSE events of the sub-chain
const sseStreamer = runManager?.handlers.find((handler) => handler instanceof CustomChainHandler)?.sseStreamer
if (runManager) {
const callbacks = runManager.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = undefined
}
}
}
const childManagers = runManager?.getChild()
const handlers = childManagers?.handlers?.filter((handler) => !(handler instanceof CustomChainHandler)) || []
if (childManagers) childManagers.handlers = handlers

if ((chain as any).prompt && (chain as any).prompt.promptValues) {
const promptValues = handleEscapeCharacters((chain as any).prompt.promptValues, true)

const values = await chain.call(promptValues, runManager?.getChild())
if (runManager && sseStreamer) {
const callbacks = runManager.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = sseStreamer
}
}
}
const values = await chain.call(promptValues, childManagers)
return values?.text
}

const values = chain.run(input, runManager?.getChild())
if (runManager && sseStreamer) {
const callbacks = runManager.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = sseStreamer
}
}
}
const values = chain.run(input, childManagers)
return values
}
})
Expand Down
26 changes: 15 additions & 11 deletions packages/components/nodes/tools/ChatflowTool/ChatflowTool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { StructuredTool } from '@langchain/core/tools'
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
import { availableDependencies, defaultAllowBuiltInDep, getCredentialData, getCredentialParam } from '../../../src/utils'
import { v4 as uuidv4 } from 'uuid'
import { CustomChainHandler } from '../../../src'

class ChatflowTool_Tools implements INode {
label: string
Expand All @@ -24,7 +23,7 @@ class ChatflowTool_Tools implements INode {
constructor() {
this.label = 'Chatflow Tool'
this.name = 'ChatflowTool'
this.version = 4.0
this.version = 5.0
this.type = 'ChatflowTool'
this.icon = 'chatflowTool.svg'
this.category = 'Tools'
Expand Down Expand Up @@ -58,6 +57,12 @@ class ChatflowTool_Tools implements INode {
placeholder:
'State of the Union QA - useful for when you need to ask questions about the most recent state of the union address.'
},
{
label: 'Return Direct',
name: 'returnDirect',
type: 'boolean',
optional: true
},
{
label: 'Override Config',
name: 'overrideConfig',
Expand Down Expand Up @@ -135,6 +140,7 @@ class ChatflowTool_Tools implements INode {
const _name = nodeData.inputs?.name as string
const description = nodeData.inputs?.description as string
const useQuestionFromChat = nodeData.inputs?.useQuestionFromChat as boolean
const returnDirect = nodeData.inputs?.returnDirect as boolean
const customInput = nodeData.inputs?.customInput as string
const overrideConfig =
typeof nodeData.inputs?.overrideConfig === 'string' &&
Expand Down Expand Up @@ -168,6 +174,7 @@ class ChatflowTool_Tools implements INode {
name,
baseURL,
description,
returnDirect,
chatflowid: selectedChatflowId,
startNewSession,
headers,
Expand Down Expand Up @@ -206,6 +213,7 @@ class ChatflowTool extends StructuredTool {
constructor({
name,
description,
returnDirect,
input,
chatflowid,
startNewSession,
Expand All @@ -215,6 +223,7 @@ class ChatflowTool extends StructuredTool {
}: {
name: string
description: string
returnDirect: boolean
input: string
chatflowid: string
startNewSession: boolean
Expand All @@ -231,6 +240,7 @@ class ChatflowTool extends StructuredTool {
this.headers = headers
this.chatflowid = chatflowid
this.overrideConfig = overrideConfig
this.returnDirect = returnDirect
}

async call(
Expand All @@ -249,15 +259,6 @@ class ChatflowTool extends StructuredTool {
} catch (e) {
throw new Error(`Received tool input did not match expected schema: ${JSON.stringify(arg)}`)
}
// iterate over the callbacks and the sse streamer
if (config.callbacks instanceof CallbackManager) {
const callbacks = config.callbacks.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = undefined
}
}
}
const callbackManager_ = await CallbackManager.configure(
config.callbacks,
this.callbacks,
Expand All @@ -283,6 +284,9 @@ class ChatflowTool extends StructuredTool {
await runManager?.handleToolError(e)
throw e
}
if (result && typeof result !== 'string') {
result = JSON.stringify(result)
}
await runManager?.handleToolEnd(result)
return result
}
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/utils/buildChatflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass({ sessionId })

isStreamValid = (req.body.streaming === 'true' || req.body.streaming === true) && isStreamValid

let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
Expand Down

0 comments on commit a80ba4b

Please sign in to comment.