Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LunaryAI automatic Thread and User tracking #2667

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions packages/components/credentials/LunaryApi.credential.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ class LunaryApi implements INodeCredential {
inputs: INodeParams[]

constructor() {
this.label = 'Lunary API'
this.label = 'Lunary AI'
this.name = 'lunaryApi'
this.version = 1.0
this.description = 'Refer to <a target="_blank" href="https://lunary.ai/docs">official guide</a> to get APP ID'
this.description =
'Refer to the <a target="_blank" href="https://lunary.ai/docs?utm_source=flowise">official guide</a> to get a public key.'
this.inputs = [
{
label: 'APP ID',
label: 'Public Key / Project ID',
name: 'lunaryAppId',
type: 'password',
placeholder: '<Lunary_APP_ID>'
type: 'string',
placeholder: '<Lunary_PROJECT_ID>'
},
{
label: 'Endpoint',
Expand Down
2 changes: 1 addition & 1 deletion packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"linkifyjs": "^4.1.1",
"llamaindex": "^0.3.13",
"lodash": "^4.17.21",
"lunary": "^0.6.16",
"lunary": "^0.7.9",
"mammoth": "^1.5.1",
"moment": "^2.29.3",
"mongodb": "6.3.0",
Expand Down
10 changes: 4 additions & 6 deletions packages/components/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,13 @@ export class AnalyticHandler {
})
this.handlers['langFuse'] = { client: langfuse }
} else if (provider === 'lunary') {
const lunaryAppId = getCredentialParam('lunaryAppId', credentialData, this.nodeData)
const lunaryPublicKey = getCredentialParam('lunaryAppId', credentialData, this.nodeData)
const lunaryEndpoint = getCredentialParam('lunaryEndpoint', credentialData, this.nodeData)

lunary.init({
appId: lunaryAppId,
apiUrl: lunaryEndpoint
publicKey: lunaryPublicKey,
apiUrl: lunaryEndpoint,
runtime: 'flowise'
})

this.handlers['lunary'] = { client: lunary }
Expand Down Expand Up @@ -451,7 +452,6 @@ export class AnalyticHandler {
await monitor.trackEvent('chain', 'start', {
runId,
name,
userId: this.options.chatId,
input,
...this.nodeData?.inputs?.analytics?.lunary
})
Expand Down Expand Up @@ -604,7 +604,6 @@ export class AnalyticHandler {
runId,
parentRunId: chainEventId,
name,
userId: this.options.chatId,
input
})
this.handlers['lunary'].llmEvent = { [runId]: runId }
Expand Down Expand Up @@ -730,7 +729,6 @@ export class AnalyticHandler {
runId,
parentRunId: chainEventId,
name,
userId: this.options.chatId,
input
})
this.handlers['lunary'].toolEvent = { [runId]: runId }
Expand Down
116 changes: 87 additions & 29 deletions packages/server/src/utils/buildChatflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { Request } from 'express'
import { IFileUpload, convertSpeechToText, ICommonObject, addSingleFileToStorage, addArrayFilesToStorage } from 'flowise-components'
import {
IFileUpload,
convertSpeechToText,
ICommonObject,
addSingleFileToStorage,
addArrayFilesToStorage,
getCredentialParam,
getCredentialData
} from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import {
IncomingInput,
Expand Down Expand Up @@ -32,7 +40,8 @@ import {
getMemorySessionId,
isSameOverrideConfig,
getEndingNodes,
constructGraphs
constructGraphs,
parseResultText
} from '../utils'
import { utilValidateKey } from './validateKey'
import { databaseEntities } from '.'
Expand All @@ -43,6 +52,7 @@ import logger from './logger'
import { utilAddChatMessage } from './addChatMesage'
import { buildAgentGraph } from './buildAgentGraph'
import { getErrorMessage } from '../errors/utils'
import lunary from 'lunary'

/**
* Build Chatflow
Expand Down Expand Up @@ -333,29 +343,80 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass({ sessionId })

let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
socketIO,
socketIOClientId: incomingInput.socketIOClientId,
prependMessages
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
prependMessages
})
const runParams = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the correct placement is here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @HenryHengZJ are there events for "on user message" and "on assistant response"? Couldn't find it in the callback handlers space, which seems to be more related with Langchain events (as this PR is for tracking the high level user conversation)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need on user message as we can already have input from the callbacks in AnalyticHandler

For instance, open a thread when onChainStart, and close when onChainEnd. I'd like to keep 3rd party integration on components level and try to reuse the callbacks that are already exposed

chatId,
chatflowid,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
prependMessages,
socketIOClientId: incomingInput.socketIOClientId,
socketIO
}

if (!isStreamValid) {
delete runParams.socketIO
delete runParams.socketIOClientId
}

let lunaryConfig = undefined
let result

try {
lunaryConfig = runParams.analytic ? JSON.parse(runParams.analytic).lunary : undefined
} catch (e) {
logger.error(`[server]: Error parsing chatflow analytic: ${e}`)
}

if (lunaryConfig?.status) {
const credentialData = await getCredentialData(lunaryConfig.credentialId ?? '', runParams)
const lunaryPublicKey = getCredentialParam('lunaryAppId', credentialData, nodeInstance)
const lunaryEndpoint = getCredentialParam('lunaryEndpoint', credentialData, nodeInstance)

lunary.init({
publicKey: lunaryPublicKey,
apiUrl: lunaryEndpoint,
runtime: 'flowise'
})

// This enables viewing high-level conversations with Lunary.ai (open-source), and also tracking the current user
const thread = lunary.openThread({
id: chatId,
userId: incomingInput.leadEmail ?? sessionId ?? undefined,
userProps: {
email: incomingInput.leadEmail
}
})

const messageId = thread.trackMessage({
role: 'user',
content: incomingInput.question
})

// This enable Tracing of the chatflow with Lunary
// We wrap the node executation so that we can reconciliate what happens after with this user message
const traceWrapper = lunary.wrapAgent(
async (question: string) => {
let result = await nodeInstance.run(nodeToExecuteData, question, runParams)
return result
},
{
name: 'FlowiseChatflow'
}
)

result = await traceWrapper(incomingInput.question).setParent(messageId)

thread.trackMessage({
role: 'assistant',
content: parseResultText(result).replace(/^"|"$/g, '') // remove trailing and leading quotes
})
} else {
result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, runParams)
}

result = typeof result === 'string' ? { text: result } : result

// Retrieve threadId from assistant if exists
Expand All @@ -377,10 +438,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
}
await utilAddChatMessage(userMessage)

let resultText = ''
if (result.text) resultText = result.text
else if (result.json) resultText = '```json\n' + JSON.stringify(result.json, null, 2)
else resultText = JSON.stringify(result, null, 2)
const resultText = parseResultText(result)

const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
role: 'apiMessage',
Expand Down
8 changes: 8 additions & 0 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1464,3 +1464,11 @@ export const convertToValidFilename = (word: string) => {
.replace(' ', '')
.toLowerCase()
}

export const parseResultText = (result: any) => {
let resultText = ''
if (result.text) resultText = result.text
else if (result.json) resultText = '```json\n' + JSON.stringify(result.json, null, 2)
else resultText = JSON.stringify(result, null, 2)
return resultText
}
Loading
Loading