Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sse #3125

Merged
merged 23 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6f2f182
Base changes for ServerSide Events (instead of socket.io)
vinodkiran Aug 25, 2024
183e951
lint fixes
vinodkiran Aug 25, 2024
f88694e
adding of interface and separate methods for streaming events
vinodkiran Aug 26, 2024
7ec4fa1
lint
vinodkiran Aug 26, 2024
c5991dd
Merge branch 'main' into feature/sse
vinodkiran Aug 26, 2024
d3c5438
first draft, handles both internal and external prediction end points.
vinodkiran Sep 1, 2024
f0c259f
lint fixes
vinodkiran Sep 1, 2024
ddc10b3
Merge branch 'main' into feature/sse
vinodkiran Sep 1, 2024
57d0a75
additional internal end point for streaming and associated changes
vinodkiran Sep 2, 2024
6487381
return streamresponse as true to build agent flow
HenryHengZJ Sep 3, 2024
768ec51
1) JSON formatting for internal events
vinodkiran Sep 3, 2024
1174c0d
1) convert internal event to metadata to maintain consistency with ex…
vinodkiran Sep 3, 2024
d24c9a4
fix action and metadata streaming
HenryHengZJ Sep 3, 2024
8e00eb2
fix for error when agent flow is aborted
vinodkiran Sep 4, 2024
77163b9
Merge branch 'main' into FEATURE/sse
vinodkiran Sep 4, 2024
1c936b9
prevent subflows from streaming and other code cleanup
vinodkiran Sep 4, 2024
3a66e2b
prevent streaming from enclosed tools
vinodkiran Sep 4, 2024
0a520d8
add fix for preventing chaintool streaming
HenryHengZJ Sep 4, 2024
6367d48
update lock file
HenryHengZJ Sep 5, 2024
88ec25e
add open when hidden to sse
HenryHengZJ Sep 7, 2024
f5a8939
Streaming errors
vinodkiran Sep 11, 2024
285ddaf
Streaming errors
vinodkiran Sep 11, 2024
1a00e94
add fix for showing error message
HenryHengZJ Sep 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions packages/components/nodes/agents/AirtableAgent/AirtableAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import axios from 'axios'
import { BaseLanguageModel } from '@langchain/core/language_models/base'
import { AgentExecutor } from 'langchain/agents'
import { LLMChain } from 'langchain/chains'
import { ICommonObject, INode, INodeData, INodeParams, PromptTemplate } from '../../../src/Interface'
import { ICommonObject, INode, INodeData, INodeParams, IServerSideEventStreamer, PromptTemplate } from '../../../src/Interface'
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { LoadPyodide, finalSystemPrompt, systemPrompt } from './core'
Expand Down Expand Up @@ -104,11 +104,17 @@ class Airtable_Agents implements INode {
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
// if (options.shouldStreamResponse) {
// streamResponse(options.sseStreamer, options.chatId, e.message)
// }
return formatResponse(e.message)
}
}

const shouldStreamResponse = options.shouldStreamResponse
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
const chatId = options.chatId

const credentialData = await getCredentialData(nodeData.credential ?? '', options)
const accessToken = getCredentialParam('accessToken', credentialData, nodeData)

Expand All @@ -123,7 +129,6 @@ class Airtable_Agents implements INode {
let base64String = Buffer.from(JSON.stringify(airtableData)).toString('base64')

const loggerHandler = new ConsoleCallbackHandler(options.logger)
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
const callbacks = await additionalCallbacks(nodeData, options)

const pyodide = await LoadPyodide()
Expand Down Expand Up @@ -194,7 +199,8 @@ json.dumps(my_dict)`
answer: finalResult
}

if (options.socketIO && options.socketIOClientId) {
if (options.shouldStreamResponse) {
const handler = new CustomChainHandler(shouldStreamResponse ? sseStreamer : undefined, chatId)
const result = await chain.call(inputs, [loggerHandler, handler, ...callbacks])
return result?.text
} else {
Expand Down
4 changes: 3 additions & 1 deletion packages/components/nodes/agents/AutoGPT/AutoGPT.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class AutoGPT_Agents implements INode {
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
// if (options.shouldStreamResponse) {
// streamResponse(options.sseStreamer, options.chatId, e.message)
// }
return formatResponse(e.message)
}
}
Expand Down
4 changes: 3 additions & 1 deletion packages/components/nodes/agents/BabyAGI/BabyAGI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class BabyAGI_Agents implements INode {
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
// if (options.shouldStreamResponse) {
// streamResponse(options.sseStreamer, options.chatId, e.message)
// }
return formatResponse(e.message)
}
}
Expand Down
14 changes: 10 additions & 4 deletions packages/components/nodes/agents/CSVAgent/CSVAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { BaseLanguageModel } from '@langchain/core/language_models/base'
import { AgentExecutor } from 'langchain/agents'
import { LLMChain } from 'langchain/chains'
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { ICommonObject, INode, INodeData, INodeParams, PromptTemplate } from '../../../src/Interface'
import { ICommonObject, INode, INodeData, INodeParams, IServerSideEventStreamer, PromptTemplate } from '../../../src/Interface'
import { getBaseClasses } from '../../../src/utils'
import { LoadPyodide, finalSystemPrompt, systemPrompt } from './core'
import { checkInputs, Moderation } from '../../moderation/Moderation'
Expand Down Expand Up @@ -90,13 +90,18 @@ class CSV_Agents implements INode {
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
// if (options.shouldStreamResponse) {
// streamResponse(options.sseStreamer, options.chatId, e.message)
// }
return formatResponse(e.message)
}
}

const loggerHandler = new ConsoleCallbackHandler(options.logger)
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
const shouldStreamResponse = options.shouldStreamResponse
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
const chatId = options.chatId

const callbacks = await additionalCallbacks(nodeData, options)

let files: string[] = []
Expand Down Expand Up @@ -203,7 +208,8 @@ json.dumps(my_dict)`
answer: finalResult
}

if (options.socketIO && options.socketIOClientId) {
if (options.shouldStreamResponse) {
const handler = new CustomChainHandler(shouldStreamResponse ? sseStreamer : undefined, chatId)
const result = await chain.call(inputs, [loggerHandler, handler, ...callbacks])
return result?.text
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ import { RunnableSequence } from '@langchain/core/runnables'
import { ChatConversationalAgent } from 'langchain/agents'
import { getBaseClasses } from '../../../src/utils'
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { IVisionChatModal, FlowiseMemory, ICommonObject, INode, INodeData, INodeParams, IUsedTool } from '../../../src/Interface'
import {
IVisionChatModal,
FlowiseMemory,
ICommonObject,
INode,
INodeData,
INodeParams,
IUsedTool,
IServerSideEventStreamer
} from '../../../src/Interface'
import { AgentExecutor } from '../../../src/agents'
import { addImagesToMessages, llmSupportsVision } from '../../../src/multiModalUtils'
import { checkInputs, Moderation } from '../../moderation/Moderation'
Expand Down Expand Up @@ -106,12 +115,18 @@ class ConversationalAgent_Agents implements INode {
const memory = nodeData.inputs?.memory as FlowiseMemory
const moderations = nodeData.inputs?.inputModeration as Moderation[]

const shouldStreamResponse = options.shouldStreamResponse
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
const chatId = options.chatId
if (moderations && moderations.length > 0) {
try {
// Use the output of the moderation chain as input for the BabyAGI agent
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
// if (options.shouldStreamResponse) {
// streamResponse(options.sseStreamer, options.chatId, e.message)
// }
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
return formatResponse(e.message)
}
Expand All @@ -125,15 +140,17 @@ class ConversationalAgent_Agents implements INode {
let sourceDocuments: ICommonObject[] = []
let usedTools: IUsedTool[] = []

if (options.socketIO && options.socketIOClientId) {
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
if (options.shouldStreamResponse) {
const handler = new CustomChainHandler(shouldStreamResponse ? sseStreamer : undefined, chatId)
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
if (res.sourceDocuments) {
options.socketIO.to(options.socketIOClientId).emit('sourceDocuments', flatten(res.sourceDocuments))
if (options.sseStreamer) {
sseStreamer.streamSourceDocumentsEvent(options.chatId, flatten(res.sourceDocuments))
}
sourceDocuments = res.sourceDocuments
}
if (res.usedTools) {
options.socketIO.to(options.socketIOClientId).emit('usedTools', res.usedTools)
sseStreamer.streamUsedToolsEvent(options.chatId, res.usedTools)
usedTools = res.usedTools
}
// If the tool is set to returnDirect, stream the output to the client
Expand All @@ -142,11 +159,14 @@ class ConversationalAgent_Agents implements INode {
inputTools = flatten(inputTools)
for (const tool of res.usedTools) {
const inputTool = inputTools.find((inputTool: Tool) => inputTool.name === tool.tool)
if (inputTool && inputTool.returnDirect) {
options.socketIO.to(options.socketIOClientId).emit('token', tool.toolOutput)
if (inputTool && inputTool.returnDirect && options.sseStreamer) {
sseStreamer.streamTokenEvent(options.chatId, tool.toolOutput)
}
}
}
if (sseStreamer) {
sseStreamer.streamEndEvent(options.chatId)
}
} else {
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
if (res.sourceDocuments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ import { ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, Pr
import { formatToOpenAIToolMessages } from 'langchain/agents/format_scratchpad/openai_tools'
import { getBaseClasses } from '../../../src/utils'
import { type ToolsAgentStep } from 'langchain/agents/openai/output_parser'
import { FlowiseMemory, ICommonObject, INode, INodeData, INodeParams, IUsedTool, IVisionChatModal } from '../../../src/Interface'
import {
FlowiseMemory,
ICommonObject,
INode,
INodeData,
INodeParams,
IServerSideEventStreamer,
IUsedTool,
IVisionChatModal
} from '../../../src/Interface'
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { AgentExecutor, ToolCallingAgentOutputParser } from '../../../src/agents'
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
Expand Down Expand Up @@ -104,16 +113,19 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
const memory = nodeData.inputs?.memory as FlowiseMemory
const moderations = nodeData.inputs?.inputModeration as Moderation[]

const isStreamable = options.socketIO && options.socketIOClientId
const shouldStreamResponse = options.shouldStreamResponse
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
const chatId = options.chatId

if (moderations && moderations.length > 0) {
try {
// Use the output of the moderation chain as input for the OpenAI Function Agent
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
if (isStreamable)
streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
if (shouldStreamResponse) {
streamResponse(sseStreamer, chatId, e.message)
}
return formatResponse(e.message)
}
}
Expand All @@ -127,15 +139,15 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
let sourceDocuments: ICommonObject[] = []
let usedTools: IUsedTool[] = []

if (isStreamable) {
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
if (shouldStreamResponse) {
const handler = new CustomChainHandler(sseStreamer, chatId)
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
if (res.sourceDocuments) {
options.socketIO.to(options.socketIOClientId).emit('sourceDocuments', flatten(res.sourceDocuments))
sseStreamer.streamSourceDocumentsEvent(chatId, flatten(res.sourceDocuments))
sourceDocuments = res.sourceDocuments
}
if (res.usedTools) {
options.socketIO.to(options.socketIOClientId).emit('usedTools', res.usedTools)
sseStreamer.streamUsedToolsEvent(chatId, res.usedTools)
usedTools = res.usedTools
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import { flatten } from 'lodash'
import { ChatMessage, OpenAI, OpenAIAgent } from 'llamaindex'
import { getBaseClasses } from '../../../../src/utils'
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, IUsedTool } from '../../../../src/Interface'
import {
FlowiseMemory,
ICommonObject,
IMessage,
INode,
INodeData,
INodeParams,
IServerSideEventStreamer,
IUsedTool
} from '../../../../src/Interface'

class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
label: string
Expand Down Expand Up @@ -67,7 +76,9 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
let tools = nodeData.inputs?.tools
tools = flatten(tools)

const isStreamingEnabled = options.socketIO && options.socketIOClientId
const shouldStreamResponse = options.shouldStreamResponse
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
const chatId = options.chatId

const chatHistory = [] as ChatMessage[]

Expand Down Expand Up @@ -104,7 +115,7 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
let isStreamingStarted = false
const usedTools: IUsedTool[] = []

if (isStreamingEnabled) {
if (shouldStreamResponse) {
const stream = await agent.chat({
message: input,
chatHistory,
Expand All @@ -116,7 +127,9 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
text += chunk.response.delta
if (!isStreamingStarted) {
isStreamingStarted = true
options.socketIO.to(options.socketIOClientId).emit('start', chunk.response.delta)
if (sseStreamer) {
sseStreamer.streamStartEvent(chatId, chunk.response.delta)
}
if (chunk.sources.length) {
for (const sourceTool of chunk.sources) {
usedTools.push({
Expand All @@ -125,11 +138,14 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
toolOutput: sourceTool.output as any
})
}
options.socketIO.to(options.socketIOClientId).emit('usedTools', usedTools)
if (sseStreamer) {
sseStreamer.streamUsedToolsEvent(chatId, usedTools)
}
}
}

options.socketIO.to(options.socketIOClientId).emit('token', chunk.response.delta)
if (sseStreamer) {
sseStreamer.streamTokenEvent(chatId, chunk.response.delta)
}
}
} else {
const response = await agent.chat({ message: input, chatHistory, verbose: process.env.DEBUG === 'true' ? true : false })
Expand Down
Loading
Loading