Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/remove app server functions from utils file #3671

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions packages/server/src/services/documentstore/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ const updateVectorStoreConfigOnly = async (data: ICommonObject) => {
)
}
}
const saveVectorStoreConfig = async (data: ICommonObject) => {
const saveVectorStoreConfig = async (data: ICommonObject, isStrictSave = true) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
Expand All @@ -932,6 +932,7 @@ const saveVectorStoreConfig = async (data: ICommonObject) => {
} else if (entity.embeddingConfig && !data.embeddingName && !data.embeddingConfig) {
data.embeddingConfig = JSON.parse(entity.embeddingConfig)?.config
data.embeddingName = JSON.parse(entity.embeddingConfig)?.name
if (isStrictSave) entity.embeddingConfig = null
} else if (!data.embeddingName && !data.embeddingConfig) {
entity.embeddingConfig = null
}
Expand All @@ -944,6 +945,7 @@ const saveVectorStoreConfig = async (data: ICommonObject) => {
} else if (entity.vectorStoreConfig && !data.vectorStoreName && !data.vectorStoreConfig) {
data.vectorStoreConfig = JSON.parse(entity.vectorStoreConfig)?.config
data.vectorStoreName = JSON.parse(entity.vectorStoreConfig)?.name
if (isStrictSave) entity.vectorStoreConfig = null
} else if (!data.vectorStoreName && !data.vectorStoreConfig) {
entity.vectorStoreConfig = null
}
Expand All @@ -956,6 +958,7 @@ const saveVectorStoreConfig = async (data: ICommonObject) => {
} else if (entity.recordManagerConfig && !data.recordManagerName && !data.recordManagerConfig) {
data.recordManagerConfig = JSON.parse(entity.recordManagerConfig)?.config
data.recordManagerName = JSON.parse(entity.recordManagerConfig)?.name
if (isStrictSave) entity.recordManagerConfig = null
} else if (!data.recordManagerName && !data.recordManagerConfig) {
entity.recordManagerConfig = null
}
Expand All @@ -975,15 +978,15 @@ const saveVectorStoreConfig = async (data: ICommonObject) => {
}
}

const insertIntoVectorStore = async (data: ICommonObject) => {
const insertIntoVectorStore = async (data: ICommonObject, isStrictSave = true) => {
try {
const appServer = getRunningExpressApp()
const entity = await saveVectorStoreConfig(data)
const entity = await saveVectorStoreConfig(data, isStrictSave)
entity.status = DocumentStoreStatus.UPSERTING
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)

// TODO: to be moved into a worker thread...
const indexResult = await _insertIntoVectorStoreWorkerThread(data)
const indexResult = await _insertIntoVectorStoreWorkerThread(data, isStrictSave)
return indexResult
} catch (error) {
throw new InternalFlowiseError(
Expand All @@ -993,10 +996,10 @@ const insertIntoVectorStore = async (data: ICommonObject) => {
}
}

const _insertIntoVectorStoreWorkerThread = async (data: ICommonObject) => {
const _insertIntoVectorStoreWorkerThread = async (data: ICommonObject, isStrictSave = true) => {
try {
const appServer = getRunningExpressApp()
const entity = await saveVectorStoreConfig(data)
const entity = await saveVectorStoreConfig(data, isStrictSave)
let upsertHistory: Record<string, any> = {}
const chatflowid = data.storeId // fake chatflowid because this is not tied to any chatflow

Expand Down Expand Up @@ -1520,7 +1523,7 @@ const upsertDocStoreMiddleware = async (
recordManagerConfig
}

const res = await insertIntoVectorStore(insertData)
const res = await insertIntoVectorStore(insertData, false)
res.docId = newDocId

return res
Expand Down
4 changes: 4 additions & 0 deletions packages/server/src/services/openai-realtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { IDepthQueue, IReactFlowNode } from '../../Interface'
import { ICommonObject, INodeData } from 'flowise-components'
import { convertToOpenAIFunction } from '@langchain/core/utils/function_calling'
import { v4 as uuidv4 } from 'uuid'
import { Variable } from '../../database/entities/Variable'

const SOURCE_DOCUMENTS_PREFIX = '\n\n----FLOWISE_SOURCE_DOCUMENTS----\n\n'
const ARTIFACTS_PREFIX = '\n\n----FLOWISE_ARTIFACTS----\n\n'
Expand Down Expand Up @@ -59,6 +60,7 @@ const buildAndInitTool = async (chatflowid: string, _chatId?: string, _apiMessag
}
startingNodeIds = [...new Set(startingNodeIds)]

const availableVariables = await appServer.AppDataSource.getRepository(Variable).find()
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)

const reactFlowNodes = await buildFlow({
Expand All @@ -77,6 +79,7 @@ const buildAndInitTool = async (chatflowid: string, _chatId?: string, _apiMessag
appDataSource: appServer.AppDataSource,
apiOverrideStatus,
nodeOverrides,
availableVariables,
variableOverrides
})

Expand All @@ -99,6 +102,7 @@ const buildAndInitTool = async (chatflowid: string, _chatId?: string, _apiMessag
[],
flowDataObj,
'',
availableVariables,
variableOverrides
)
let nodeToExecuteData = reactFlowNodeData
Expand Down
10 changes: 10 additions & 0 deletions packages/server/src/utils/buildAgentGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { replaceInputsWithConfig, resolveVariables } from '.'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { getErrorMessage } from '../errors/utils'
import logger from './logger'
import { Variable } from '../database/entities/Variable'

/**
* Build Agent Graph
Expand Down Expand Up @@ -114,6 +115,7 @@ export const buildAgentGraph = async (
}

/*** Get API Config ***/
const availableVariables = await appServer.AppDataSource.getRepository(Variable).find()
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)

// Initialize nodes like ChatModels, Tools, etc.
Expand All @@ -135,6 +137,7 @@ export const buildAgentGraph = async (
overrideConfig: incomingInput?.overrideConfig,
apiOverrideStatus,
nodeOverrides,
availableVariables,
variableOverrides,
cachePool: appServer.cachePool,
isUpsert: false,
Expand Down Expand Up @@ -519,6 +522,7 @@ const compileMultiAgentsGraph = async (params: MultiAgentsGraphParams) => {
const workerNodes = reactFlowNodes.filter((node) => workerNodeIds.includes(node.data.id))

/*** Get API Config ***/
const availableVariables = await appServer.AppDataSource.getRepository(Variable).find()
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)

let supervisorWorkers: { [key: string]: IMultiAgentNode[] } = {}
Expand All @@ -540,6 +544,7 @@ const compileMultiAgentsGraph = async (params: MultiAgentsGraphParams) => {
chatHistory,
overrideConfig,
uploadedFilesContent,
availableVariables,
variableOverrides
)

Expand Down Expand Up @@ -581,6 +586,7 @@ const compileMultiAgentsGraph = async (params: MultiAgentsGraphParams) => {
chatHistory,
overrideConfig,
uploadedFilesContent,
availableVariables,
variableOverrides
)

Expand Down Expand Up @@ -753,6 +759,9 @@ const compileSeqAgentsGraph = async (params: SeqAgentsGraphParams) => {
let conditionalToolNodes: Record<string, { source: ISeqAgentNode; toolNodes: ISeqAgentNode[] }> = {}
let bindModel: Record<string, any> = {}
let interruptToolNodeNames = []

/*** Get API Config ***/
const availableVariables = await appServer.AppDataSource.getRepository(Variable).find()
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)

const initiateNode = async (node: IReactFlowNode) => {
Expand All @@ -771,6 +780,7 @@ const compileSeqAgentsGraph = async (params: SeqAgentsGraphParams) => {
chatHistory,
overrideConfig,
uploadedFilesContent,
availableVariables,
variableOverrides
)

Expand Down
4 changes: 4 additions & 0 deletions packages/server/src/utils/buildChatflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import { getErrorMessage } from '../errors/utils'
import { ChatMessage } from '../database/entities/ChatMessage'
import { IAction } from 'flowise-components'
import { FLOWISE_METRIC_COUNTERS, FLOWISE_COUNTER_STATUS } from '../Interface.Metrics'
import { Variable } from '../database/entities/Variable'

/**
* Build Chatflow
Expand Down Expand Up @@ -350,6 +351,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))

/*** Get API Config ***/
const availableVariables = await appServer.AppDataSource.getRepository(Variable).find()
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)

logger.debug(`[server]: Start building chatflow ${chatflowid}`)
Expand All @@ -373,6 +375,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
overrideConfig: incomingInput?.overrideConfig,
apiOverrideStatus,
nodeOverrides,
availableVariables,
variableOverrides,
cachePool: appServer.cachePool,
isUpsert: false,
Expand Down Expand Up @@ -427,6 +430,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
chatHistory,
flowData,
uploadedFilesContent,
availableVariables,
variableOverrides
)
nodeToExecuteData = reactFlowNodeData
Expand Down
31 changes: 23 additions & 8 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* Strictly no getRepository, appServer here, must be passed as parameter
*/

import path from 'path'
import fs from 'fs'
import logger from './logger'
Expand All @@ -17,6 +21,7 @@ import {
IOverrideConfig,
IReactFlowEdge,
IReactFlowNode,
IVariable,
IVariableDict,
IVariableOverride,
IncomingInput
Expand Down Expand Up @@ -439,6 +444,7 @@ type BuildFlowParams = {
overrideConfig?: ICommonObject
apiOverrideStatus?: boolean
nodeOverrides?: INodeOverrides
availableVariables?: IVariable[]
variableOverrides?: IVariableOverride[]
cachePool?: CachePool
isUpsert?: boolean
Expand Down Expand Up @@ -470,6 +476,7 @@ export const buildFlow = async ({
overrideConfig,
apiOverrideStatus = false,
nodeOverrides = {},
availableVariables = [],
variableOverrides = [],
cachePool,
isUpsert,
Expand Down Expand Up @@ -534,6 +541,7 @@ export const buildFlow = async ({
chatHistory,
flowData,
uploadedFilesContent,
availableVariables,
variableOverrides
)

Expand Down Expand Up @@ -727,9 +735,12 @@ export const clearSessionMemory = async (
}
}

const getGlobalVariable = async (appDataSource: DataSource, overrideConfig?: ICommonObject, variableOverrides?: ICommonObject[]) => {
const variables = await appDataSource.getRepository(Variable).find()

const getGlobalVariable = async (
appDataSource: DataSource,
overrideConfig?: ICommonObject,
availableVariables: IVariable[] = [],
variableOverrides?: ICommonObject[]
) => {
// override variables defined in overrideConfig
// nodeData.inputs.vars is an Object, check each property and override the variable
if (overrideConfig?.vars && variableOverrides) {
Expand All @@ -740,14 +751,14 @@ const getGlobalVariable = async (appDataSource: DataSource, overrideConfig?: ICo
continue // Skip this variable if it's not enabled for override
}

const foundVar = variables.find((v) => v.name === propertyName)
const foundVar = availableVariables.find((v) => v.name === propertyName)
if (foundVar) {
// even if the variable was defined as runtime, we override it with static value
foundVar.type = 'static'
foundVar.value = overrideConfig.vars[propertyName]
} else {
// add it the variables, if not found locally in the db
variables.push({
availableVariables.push({
name: propertyName,
type: 'static',
value: overrideConfig.vars[propertyName],
Expand All @@ -760,8 +771,8 @@ const getGlobalVariable = async (appDataSource: DataSource, overrideConfig?: ICo
}

let vars = {}
if (variables.length) {
for (const item of variables) {
if (availableVariables.length) {
for (const item of availableVariables) {
let value = item.value

// read from .env file
Expand Down Expand Up @@ -797,6 +808,7 @@ export const getVariableValue = async (
isAcceptVariable = false,
flowData?: ICommonObject,
uploadedFilesContent?: string,
availableVariables: IVariable[] = [],
variableOverrides: ICommonObject[] = []
) => {
const isObject = typeof paramValue === 'object'
Expand Down Expand Up @@ -839,7 +851,7 @@ export const getVariableValue = async (
}

if (variableFullPath.startsWith('$vars.')) {
const vars = await getGlobalVariable(appDataSource, flowData, variableOverrides)
const vars = await getGlobalVariable(appDataSource, flowData, availableVariables, variableOverrides)
const variableValue = get(vars, variableFullPath.replace('$vars.', ''))
if (variableValue) {
variableDict[`{{${variableFullPath}}}`] = variableValue
Expand Down Expand Up @@ -949,6 +961,7 @@ export const resolveVariables = async (
chatHistory: IMessage[],
flowData?: ICommonObject,
uploadedFilesContent?: string,
availableVariables: IVariable[] = [],
variableOverrides: ICommonObject[] = []
): Promise<INodeData> => {
let flowNodeData = cloneDeep(reactFlowNodeData)
Expand All @@ -969,6 +982,7 @@ export const resolveVariables = async (
undefined,
flowData,
uploadedFilesContent,
availableVariables,
variableOverrides
)
resolvedInstances.push(resolvedInstance)
Expand All @@ -985,6 +999,7 @@ export const resolveVariables = async (
isAcceptVariable,
flowData,
uploadedFilesContent,
availableVariables,
variableOverrides
)
paramsObj[key] = resolvedInstance
Expand Down
3 changes: 3 additions & 0 deletions packages/server/src/utils/upsertVector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { StatusCodes } from 'http-status-codes'
import { getErrorMessage } from '../errors/utils'
import { v4 as uuidv4 } from 'uuid'
import { FLOWISE_COUNTER_STATUS, FLOWISE_METRIC_COUNTERS } from '../Interface.Metrics'
import { Variable } from '../database/entities/Variable'
/**
* Upsert documents
* @param {Request} req
Expand Down Expand Up @@ -157,6 +158,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
const { startingNodeIds, depthQueue } = getStartingNodes(filteredGraph, stopNodeId)

/*** Get API Config ***/
const availableVariables = await appServer.AppDataSource.getRepository(Variable).find()
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)

// For "files" input, add a new node override with the actual input name such as pdfFile, txtFile, etc.
Expand Down Expand Up @@ -189,6 +191,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
overrideConfig: incomingInput?.overrideConfig,
apiOverrideStatus,
nodeOverrides,
availableVariables,
variableOverrides,
cachePool: appServer.cachePool,
isUpsert,
Expand Down
2 changes: 1 addition & 1 deletion packages/ui/src/ui-component/extended/OverrideConfig.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ const OverrideConfig = ({ dialogProps }) => {
</Stack>
</Card>
)}
{variableOverrides && (
{variableOverrides && variableOverrides.length > 0 && (
<Card sx={{ borderColor: theme.palette.primary[200] + 75, p: 2 }} variant='outlined'>
<Stack sx={{ mt: 1, mb: 2, ml: 1, alignItems: 'center' }} direction='row' spacing={2}>
<IconVariable />
Expand Down
Loading