Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions apps/sim/app/api/knowledge/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,29 @@ vi.mock('@sim/db', () => {
},
}),
}),
delete: () => ({
where: () => Promise.resolve(),
}),
insert: () => ({
values: (records: any) => {
dbOps.order.push('insert')
dbOps.insertRecords.push(records)
return Promise.resolve()
},
}),
transaction: vi.fn(async (fn: any) => {
await fn({
insert: (table: any) => ({
delete: () => ({
where: () => Promise.resolve(),
}),
insert: () => ({
values: (records: any) => {
dbOps.order.push('insert')
dbOps.insertRecords.push(records)
return Promise.resolve()
},
}),
update: (table: any) => ({
update: () => ({
set: (payload: any) => ({
where: () => {
dbOps.updatePayloads.push(payload)
Expand Down
44 changes: 9 additions & 35 deletions apps/sim/app/workspace/[workspaceId]/knowledge/[id]/base.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ export function KnowledgeBase({
error: knowledgeBaseError,
refresh: refreshKnowledgeBase,
} = useKnowledgeBase(id)
const [hasProcessingDocuments, setHasProcessingDocuments] = useState(false)

const {
documents,
pagination,
Expand All @@ -468,6 +470,7 @@ export function KnowledgeBase({
offset: (currentPage - 1) * DOCUMENTS_PER_PAGE,
sortBy,
sortOrder,
refetchInterval: hasProcessingDocuments && !isDeleting ? 3000 : false,
})

const { tagDefinitions } = useKnowledgeBaseTagDefinitions(id)
Expand Down Expand Up @@ -534,25 +537,15 @@ export function KnowledgeBase({
)

useEffect(() => {
const hasProcessingDocuments = documents.some(
const processing = documents.some(
(doc) => doc.processingStatus === 'pending' || doc.processingStatus === 'processing'
)
setHasProcessingDocuments(processing)

if (!hasProcessingDocuments) return

const refreshInterval = setInterval(async () => {
try {
if (!isDeleting) {
await checkForDeadProcesses()
await refreshDocuments()
}
} catch (error) {
logger.error('Error refreshing documents:', error)
}
}, 3000)

return () => clearInterval(refreshInterval)
}, [documents, refreshDocuments, isDeleting])
if (processing) {
checkForDeadProcesses()
}
}, [documents])

/**
* Checks for documents with stale processing states and marks them as failed
Expand Down Expand Up @@ -672,25 +665,6 @@ export function KnowledgeBase({

await refreshDocuments()

let refreshAttempts = 0
const maxRefreshAttempts = 3
const refreshInterval = setInterval(async () => {
try {
refreshAttempts++
await refreshDocuments()
if (refreshAttempts >= maxRefreshAttempts) {
clearInterval(refreshInterval)
}
} catch (error) {
logger.error('Error refreshing documents after retry:', error)
clearInterval(refreshInterval)
}
}, 1000)

setTimeout(() => {
clearInterval(refreshInterval)
}, 4000)

logger.info(`Document retry initiated successfully for: ${docId}`)
} catch (err) {
logger.error('Error retrying document:', err)
Expand Down
1 change: 1 addition & 0 deletions apps/sim/background/knowledge-processing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export type DocumentProcessingPayload = {
export const processDocument = task({
id: 'knowledge-process-document',
maxDuration: env.KB_CONFIG_MAX_DURATION || 600,
machine: 'large-1x', // 2 vCPU, 2GB RAM - needed for large PDF processing
retry: {
maxAttempts: env.KB_CONFIG_MAX_ATTEMPTS || 3,
factor: env.KB_CONFIG_RETRY_FACTOR || 2,
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/hooks/queries/knowledge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ export function useKnowledgeDocumentsQuery(
params: KnowledgeDocumentsParams,
options?: {
enabled?: boolean
refetchInterval?: number | false
}
) {
const paramsKey = serializeDocumentParams(params)
Expand All @@ -237,6 +238,7 @@ export function useKnowledgeDocumentsQuery(
enabled: (options?.enabled ?? true) && Boolean(params.knowledgeBaseId),
staleTime: 60 * 1000,
placeholderData: keepPreviousData,
refetchInterval: options?.refetchInterval ?? false,
})
}

Expand Down
2 changes: 2 additions & 0 deletions apps/sim/hooks/use-knowledge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export function useKnowledgeBaseDocuments(
sortBy?: string
sortOrder?: string
enabled?: boolean
refetchInterval?: number | false
}
) {
const queryClient = useQueryClient()
Expand All @@ -92,6 +93,7 @@ export function useKnowledgeBaseDocuments(
},
{
enabled: (options?.enabled ?? true) && Boolean(knowledgeBaseId),
refetchInterval: options?.refetchInterval,
}
)

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/chunkers/docs-chunker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface HeaderInfo {
interface Frontmatter {
title?: string
description?: string
[key: string]: any
[key: string]: unknown
}

const logger = createLogger('DocsChunker')
Expand Down
23 changes: 14 additions & 9 deletions apps/sim/lib/chunkers/json-yaml-chunker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators'

const logger = createLogger('JsonYamlChunker')

type JsonPrimitive = string | number | boolean | null
type JsonValue = JsonPrimitive | JsonObject | JsonArray
type JsonObject = { [key: string]: JsonValue }
type JsonArray = JsonValue[]

function getTokenCount(text: string): number {
try {
return getAccurateTokenCount(text, 'text-embedding-3-small')
Expand Down Expand Up @@ -59,11 +64,11 @@ export class JsonYamlChunker {
*/
async chunk(content: string): Promise<Chunk[]> {
try {
let data: any
let data: JsonValue
try {
data = JSON.parse(content)
data = JSON.parse(content) as JsonValue
} catch {
data = yaml.load(content)
data = yaml.load(content) as JsonValue
}
const chunks = this.chunkStructuredData(data)

Expand All @@ -86,15 +91,15 @@ export class JsonYamlChunker {
/**
* Chunk structured data based on its structure
*/
private chunkStructuredData(data: any, path: string[] = []): Chunk[] {
private chunkStructuredData(data: JsonValue, path: string[] = []): Chunk[] {
const chunks: Chunk[] = []

if (Array.isArray(data)) {
return this.chunkArray(data, path)
}

if (typeof data === 'object' && data !== null) {
return this.chunkObject(data, path)
return this.chunkObject(data as JsonObject, path)
}

const content = JSON.stringify(data, null, 2)
Expand All @@ -118,9 +123,9 @@ export class JsonYamlChunker {
/**
* Chunk an array intelligently
*/
private chunkArray(arr: any[], path: string[]): Chunk[] {
private chunkArray(arr: JsonArray, path: string[]): Chunk[] {
const chunks: Chunk[] = []
let currentBatch: any[] = []
let currentBatch: JsonValue[] = []
let currentTokens = 0

const contextHeader = path.length > 0 ? `// ${path.join('.')}\n` : ''
Expand Down Expand Up @@ -194,7 +199,7 @@ export class JsonYamlChunker {
/**
* Chunk an object intelligently
*/
private chunkObject(obj: Record<string, any>, path: string[]): Chunk[] {
private chunkObject(obj: JsonObject, path: string[]): Chunk[] {
const chunks: Chunk[] = []
const entries = Object.entries(obj)

Expand All @@ -213,7 +218,7 @@ export class JsonYamlChunker {
return chunks
}

let currentObj: Record<string, any> = {}
let currentObj: JsonObject = {}
let currentTokens = 0
let currentKeys: string[] = []

Expand Down
6 changes: 4 additions & 2 deletions apps/sim/lib/chunkers/text-chunker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ export class TextChunker {
chunks.push(currentChunk.trim())
}

// Start new chunk with current part
// If part itself is too large, split it further
if (this.estimateTokens(part) > this.chunkSize) {
chunks.push(...(await this.splitRecursively(part, separatorIndex + 1)))
const subChunks = await this.splitRecursively(part, separatorIndex + 1)
for (const subChunk of subChunks) {
chunks.push(subChunk)
}
currentChunk = ''
} else {
currentChunk = part
Expand Down
1 change: 1 addition & 0 deletions apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ export const env = createEnv({
KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed)
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
KB_CONFIG_CHUNK_CONCURRENCY: z.number().optional().default(10), // Concurrent PDF chunk OCR processing

// Real-time Communication
SOCKET_SERVER_URL: z.string().url().optional(), // WebSocket server URL for real-time features
Expand Down
89 changes: 57 additions & 32 deletions apps/sim/lib/file-parsers/doc-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ export class DocParser implements FileParser {
throw new Error(`File not found: ${filePath}`)
}

logger.info(`Parsing DOC file: ${filePath}`)

const buffer = await readFile(filePath)
return this.parseBuffer(buffer)
} catch (error) {
Expand All @@ -29,53 +27,80 @@ export class DocParser implements FileParser {

async parseBuffer(buffer: Buffer): Promise<FileParseResult> {
try {
logger.info('Parsing DOC buffer, size:', buffer.length)

if (!buffer || buffer.length === 0) {
throw new Error('Empty buffer provided')
}

let parseOfficeAsync
try {
const officeParser = await import('officeparser')
parseOfficeAsync = officeParser.parseOfficeAsync
} catch (importError) {
logger.warn('officeparser not available, using fallback extraction')
return this.fallbackExtraction(buffer)
const result = await officeParser.parseOfficeAsync(buffer)

if (result) {
const resultString = typeof result === 'string' ? result : String(result)
const content = sanitizeTextForUTF8(resultString.trim())

if (content.length > 0) {
return {
content,
metadata: {
characterCount: content.length,
extractionMethod: 'officeparser',
},
}
}
}
} catch (officeError) {
logger.warn('officeparser failed, trying mammoth:', officeError)
}

try {
const result = await parseOfficeAsync(buffer)

if (!result) {
throw new Error('officeparser returned no result')
const mammoth = await import('mammoth')
const result = await mammoth.extractRawText({ buffer })

if (result.value && result.value.trim().length > 0) {
const content = sanitizeTextForUTF8(result.value.trim())
return {
content,
metadata: {
characterCount: content.length,
extractionMethod: 'mammoth',
messages: result.messages,
},
}
}

const resultString = typeof result === 'string' ? result : String(result)

const content = sanitizeTextForUTF8(resultString.trim())

logger.info('DOC parsing completed successfully with officeparser')

return {
content: content,
metadata: {
characterCount: content.length,
extractionMethod: 'officeparser',
},
}
} catch (extractError) {
logger.warn('officeparser failed, using fallback:', extractError)
return this.fallbackExtraction(buffer)
} catch (mammothError) {
logger.warn('mammoth failed:', mammothError)
}

return this.fallbackExtraction(buffer)
} catch (error) {
logger.error('DOC buffer parsing error:', error)
logger.error('DOC parsing error:', error)
throw new Error(`Failed to parse DOC buffer: ${(error as Error).message}`)
}
}

private fallbackExtraction(buffer: Buffer): FileParseResult {
logger.info('Using fallback text extraction for DOC file')
const isBinaryDoc = buffer.length >= 2 && buffer[0] === 0xd0 && buffer[1] === 0xcf

if (!isBinaryDoc) {
const textContent = buffer.toString('utf8').trim()

if (textContent.length > 0) {
const printableChars = textContent.match(/[\x20-\x7E\n\r\t]/g)?.length || 0
const isProbablyText = printableChars / textContent.length > 0.9

if (isProbablyText) {
return {
content: sanitizeTextForUTF8(textContent),
metadata: {
extractionMethod: 'plaintext-fallback',
characterCount: textContent.length,
warning: 'File is not a valid DOC format, extracted as plain text',
},
}
}
}
}

const text = buffer.toString('utf8', 0, Math.min(buffer.length, 100000))

Expand Down
Loading