Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/Multer to s3 #2408

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { zodToJsonSchema } from 'zod-to-json-schema'
import { AnalyticHandler } from '../../../src/handler'
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
import { addFileToStorage } from '../../../src/storageUtils'
import { addSingleFileToStorage } from '../../../src/storageUtils'

const lenticularBracketRegex = /【[^】]*】/g
const imageRegex = /<img[^>]*\/>/g
Expand Down Expand Up @@ -731,7 +731,7 @@ const downloadImg = async (openai: OpenAI, fileId: string, fileName: string, ...
const image_data_buffer = Buffer.from(image_data)
const mime = 'image/png'

await addFileToStorage(mime, image_data_buffer, fileName, ...paths)
await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths)

return image_data_buffer
}
Expand All @@ -754,7 +754,7 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string
const data_buffer = Buffer.from(data)
const mime = 'application/octet-stream'

return await addFileToStorage(mime, data_buffer, fileName, ...paths)
return await addSingleFileToStorage(mime, data_buffer, fileName, ...paths)
} catch (error) {
console.error('Error downloading or writing the file:', error)
return ''
Expand Down
43 changes: 38 additions & 5 deletions packages/components/src/storageUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { DeleteObjectsCommand, GetObjectCommand, ListObjectsV2Command, PutObject
import { Readable } from 'node:stream'
import { getUserHome } from './utils'

export const addBase64FilesToStorage = async (file: string, chatflowid: string, fileNames: string[]) => {
export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: string, fileNames: string[]) => {
const storageType = getStorageType()
if (storageType === 's3') {
const { s3Client, Bucket } = getS3Config()

const splitDataURI = file.split(',')
const splitDataURI = fileBase64.split(',')
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const mime = splitDataURI[0].split(':')[1].split(';')[0]
Expand All @@ -32,7 +32,7 @@ export const addBase64FilesToStorage = async (file: string, chatflowid: string,
fs.mkdirSync(dir, { recursive: true })
}

const splitDataURI = file.split(',')
const splitDataURI = fileBase64.split(',')
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')

Expand All @@ -43,7 +43,40 @@ export const addBase64FilesToStorage = async (file: string, chatflowid: string,
}
}

export const addFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => {
export const addArrayFilesToStorage = async (mime: string, bf: Buffer, fileName: string, fileNames: string[], ...paths: string[]) => {
const storageType = getStorageType()
if (storageType === 's3') {
const { s3Client, Bucket } = getS3Config()

let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + fileName
if (Key.startsWith('/')) {
Key = Key.substring(1)
}

const putObjCmd = new PutObjectCommand({
Bucket,
Key,
ContentEncoding: 'base64', // required for binary data
ContentType: mime,
Body: bf
})
await s3Client.send(putObjCmd)
fileNames.push(fileName)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else {
const dir = path.join(getStoragePath(), ...paths)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}

const filePath = path.join(dir, fileName)
fs.writeFileSync(filePath, bf)
fileNames.push(fileName)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
}
}

export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => {
const storageType = getStorageType()
if (storageType === 's3') {
const { s3Client, Bucket } = getS3Config()
Expand Down Expand Up @@ -273,7 +306,7 @@ export const streamStorageFile = async (
}
}

const getS3Config = () => {
export const getS3Config = () => {
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
const region = process.env.S3_STORAGE_REGION
Expand Down
4 changes: 2 additions & 2 deletions packages/server/src/services/documentstore/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { DocumentStore } from '../../database/entities/DocumentStore'
// @ts-ignore
import {
addFileToStorage,
addSingleFileToStorage,
getFileFromStorage,
ICommonObject,
IDocument,
Expand Down Expand Up @@ -343,7 +343,7 @@ const _saveFileToStorage = async (fileBase64: string, entity: DocumentStore) =>
if (mimePrefix) {
mime = mimePrefix.split(';')[0].split(':')[1]
}
await addFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id)
await addSingleFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id)
return {
id: uuidv4(),
name: filename,
Expand Down
21 changes: 11 additions & 10 deletions packages/server/src/utils/buildChatflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Request } from 'express'
import { IFileUpload, convertSpeechToText, ICommonObject, addFileToStorage } from 'flowise-components'
import { IFileUpload, convertSpeechToText, ICommonObject, addSingleFileToStorage, addArrayFilesToStorage } from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
Expand Down Expand Up @@ -71,7 +71,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const splitDataURI = upload.data.split(',')
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const mime = splitDataURI[0].split(':')[1].split(';')[0]
await addFileToStorage(mime, bf, filename, chatflowid, chatId)
await addSingleFileToStorage(mime, bf, filename, chatflowid, chatId)
upload.type = 'stored-file'
// Omit upload.data since we don't store the content in database
fileUploads[i] = omit(upload, ['data'])
Expand Down Expand Up @@ -111,20 +111,21 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter

let isStreamValid = false

const files = (req.files as any[]) || []
const files = (req.files as Express.Multer.File[]) || []

if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
const fileNames: string[] = []
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
const fileBuffer = fs.readFileSync(file.path)

const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid)

const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}

overrideConfig[fileInputField] = storagePath

fs.unlinkSync(file.path)
}
incomingInput = {
question: req.body.question ?? 'hello',
Expand Down
47 changes: 45 additions & 2 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,44 @@ export const saveUpsertFlowData = (nodeData: INodeData, upsertHistory: Record<st
return existingUpsertFlowData
}

/**
* Check if doc loader should be bypassed, ONLY if doc loader is connected to a vector store
* Reason being we dont want to load the doc loader again whenever we are building the flow, because it was already done during upserting
* TODO: Remove this logic when we remove doc loader nodes from the canvas
* @param {IReactFlowNode} reactFlowNode
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IReactFlowEdge[]} reactFlowEdges
* @returns {boolean}
*/
const checkIfDocLoaderShouldBeIgnored = (
reactFlowNode: IReactFlowNode,
reactFlowNodes: IReactFlowNode[],
reactFlowEdges: IReactFlowEdge[]
): boolean => {
let outputId = ''

if (reactFlowNode.data.outputAnchors.length) {
if (Object.keys(reactFlowNode.data.outputs || {}).length) {
const output = reactFlowNode.data.outputs?.output
const node = reactFlowNode.data.outputAnchors[0].options?.find((anchor) => anchor.name === output)
if (node) outputId = (node as ICommonObject).id
} else {
outputId = (reactFlowNode.data.outputAnchors[0] as ICommonObject).id
}
}

const targetNodeId = reactFlowEdges.find((edge) => edge.sourceHandle === outputId)?.target

if (targetNodeId) {
const targetNodeCategory = reactFlowNodes.find((nd) => nd.id === targetNodeId)?.data.category || ''
if (targetNodeCategory === 'Vector Stores') {
return true
}
}

return false
}

/**
* Build langchain from start to end
* @param {string[]} startingNodeIds
Expand Down Expand Up @@ -446,7 +484,6 @@ export const buildFlow = async (

const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory)

// TODO: Avoid processing Text Splitter + Doc Loader once Upsert & Load Existing Vector Nodes are deprecated
if (isUpsert && stopNodeId && nodeId === stopNodeId) {
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
const indexResult = await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
Expand All @@ -464,6 +501,12 @@ export const buildFlow = async (
if (indexResult) upsertHistory['result'] = indexResult
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
break
} else if (
!isUpsert &&
reactFlowNode.data.category === 'Document Loaders' &&
checkIfDocLoaderShouldBeIgnored(reactFlowNode, reactFlowNodes, reactFlowEdges)
) {
initializedNodes.add(nodeId)
} else {
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
Expand Down Expand Up @@ -935,7 +978,7 @@ export const mapMimeTypeToInputField = (mimeType: string) => {
case 'text/yaml':
return 'yamlFile'
default:
return ''
return 'txtFile'
}
}

Expand Down
19 changes: 10 additions & 9 deletions packages/server/src/utils/upsertVector.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Request } from 'express'
import * as fs from 'fs'
import { cloneDeep, omit } from 'lodash'
import { ICommonObject, IMessage } from 'flowise-components'
import { ICommonObject, IMessage, addArrayFilesToStorage } from 'flowise-components'
import telemetryService from '../services/telemetry'
import logger from '../utils/logger'
import {
Expand Down Expand Up @@ -48,20 +48,21 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
}
}

const files = (req.files as any[]) || []
const files = (req.files as Express.Multer.File[]) || []

if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
const fileNames: string[] = []
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
const fileBuffer = fs.readFileSync(file.path)

const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid)

const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}

overrideConfig[fileInputField] = storagePath

fs.unlinkSync(file.path)
}
incomingInput = {
question: req.body.question ?? 'hello',
Expand Down
Loading