Skip to content

Commit

Permalink
Add ingestion pipeline with doc store strategies (run-llama#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcusschiesser authored Jan 24, 2024
1 parent ba42aa5 commit e2790da
Show file tree
Hide file tree
Showing 18 changed files with 363 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/mighty-chicken-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"llamaindex": patch
---

Preview: Add ingestion pipeline (incl. different strategies to handle doc store duplicates)
36 changes: 36 additions & 0 deletions examples/pipeline/ingestion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import fs from "node:fs/promises";

import {
Document,
IngestionPipeline,
MetadataMode,
OpenAIEmbedding,
SimpleNodeParser,
} from "llamaindex";

async function main() {
// Load essay from abramov.txt in Node
const path = "node_modules/llamaindex/examples/abramov.txt";

const essay = await fs.readFile(path, "utf-8");

// Create Document object with essay
const document = new Document({ text: essay, id_: path });
const pipeline = new IngestionPipeline({
transformations: [
new SimpleNodeParser({ chunkSize: 1024, chunkOverlap: 20 }),
// new TitleExtractor(llm),
new OpenAIEmbedding(),
],
});

// run the pipeline
const nodes = await pipeline.run({ documents: [document] });

// print out the result of the pipeline run
for (const node of nodes) {
console.log(node.getContent(MetadataMode.NONE));
}
}

main().catch(console.error);
13 changes: 12 additions & 1 deletion packages/core/src/embeddings/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { BaseNode, MetadataMode } from "../Node";
import { TransformComponent } from "../ingestion";
import { similarity } from "./utils";

/**
Expand All @@ -10,7 +12,7 @@ export enum SimilarityType {
EUCLIDEAN = "euclidean",
}

export abstract class BaseEmbedding {
export abstract class BaseEmbedding implements TransformComponent {
similarity(
embedding1: number[],
embedding2: number[],
Expand All @@ -21,4 +23,13 @@ export abstract class BaseEmbedding {

abstract getTextEmbedding(text: string): Promise<number[]>;
abstract getQueryEmbedding(query: string): Promise<number[]>;

async transform(nodes: BaseNode[], _options?: any): Promise<BaseNode[]> {
for (const node of nodes) {
node.embedding = await this.getTextEmbedding(
node.getContent(MetadataMode.EMBED),
);
}
return nodes;
}
}
11 changes: 10 additions & 1 deletion packages/core/src/extractors/types.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { BaseNode, MetadataMode, TextNode } from "../Node";
import { TransformComponent } from "../ingestion";
import { defaultNodeTextTemplate } from "./prompts";

/*
* Abstract class for all extractors.
*/
export abstract class BaseExtractor {
export abstract class BaseExtractor implements TransformComponent {
isTextNodeOnly: boolean = true;
showProgress: boolean = true;
metadataMode: MetadataMode = MetadataMode.ALL;
Expand All @@ -14,6 +15,14 @@ export abstract class BaseExtractor {

abstract extract(nodes: BaseNode[]): Promise<Record<string, any>[]>;

async transform(nodes: BaseNode[], options?: any): Promise<BaseNode[]> {
return this.processNodes(
nodes,
options?.excludedEmbedMetadataKeys,
options?.excludedLlmMetadataKeys,
);
}

/**
*
* @param nodes Nodes to extract metadata from.
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export * from "./embeddings";
export * from "./engines/chat";
export * from "./extractors";
export * from "./indices";
export * from "./ingestion";
export * from "./llm";
export * from "./nodeParsers";
export * from "./postprocessors";
Expand Down
94 changes: 94 additions & 0 deletions packages/core/src/ingestion/IngestionPipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { BaseNode, Document } from "../Node";
import { BaseReader } from "../readers/base";
import { BaseDocumentStore, VectorStore } from "../storage";
import { DocStoreStrategy, createDocStoreStrategy } from "./strategies";
import { TransformComponent } from "./types";

interface IngestionRunArgs {
documents?: Document[];
nodes?: BaseNode[];
inPlace?: boolean;
}

export async function runTransformations(
nodesToRun: BaseNode[],
transformations: TransformComponent[],
transformOptions: any = {},
{ inPlace = true }: IngestionRunArgs,
): Promise<BaseNode[]> {
let nodes = nodesToRun;
if (!inPlace) {
nodes = [...nodesToRun];
}
for (const transform of transformations) {
nodes = await transform.transform(nodes, transformOptions);
}
return nodes;
}

// TODO: add caching, add concurrency
export class IngestionPipeline {
transformations: TransformComponent[] = [];
documents?: Document[];
reader?: BaseReader;
vectorStore?: VectorStore;
docStore?: BaseDocumentStore;
docStoreStrategy: DocStoreStrategy = DocStoreStrategy.UPSERTS;
disableCache: boolean = true;

private _docStoreStrategy?: TransformComponent;

constructor(init?: Partial<IngestionPipeline>) {
Object.assign(this, init);
this._docStoreStrategy = createDocStoreStrategy(
this.docStoreStrategy,
this.docStore,
this.vectorStore,
);
}

async prepareInput(
documents?: Document[],
nodes?: BaseNode[],
): Promise<BaseNode[]> {
const inputNodes: BaseNode[] = [];
if (documents) {
inputNodes.push(...documents);
}
if (nodes) {
inputNodes.push(...nodes);
}
if (this.documents) {
inputNodes.push(...this.documents);
}
if (this.reader) {
inputNodes.push(...(await this.reader.loadData()));
}
return inputNodes;
}

async run(
args: IngestionRunArgs = {},
transformOptions?: any,
): Promise<BaseNode[]> {
const inputNodes = await this.prepareInput(args.documents, args.nodes);
let nodesToRun;
if (this._docStoreStrategy) {
nodesToRun = await this._docStoreStrategy.transform(inputNodes);
} else {
nodesToRun = inputNodes;
}

const nodes = await runTransformations(
nodesToRun,
this.transformations,
transformOptions,
args,
);
if (this.vectorStore) {
const nodesToAdd = nodes.filter((node) => node.embedding);
await this.vectorStore.add(nodesToAdd);
}
return nodes;
}
}
2 changes: 2 additions & 0 deletions packages/core/src/ingestion/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./IngestionPipeline";
export * from "./types";
32 changes: 32 additions & 0 deletions packages/core/src/ingestion/strategies/DuplicatesStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { BaseNode } from "../../Node";
import { BaseDocumentStore } from "../../storage";
import { TransformComponent } from "../types";

/**
* Handle doc store duplicates by checking all hashes.
*/
export class DuplicatesStrategy implements TransformComponent {
private docStore: BaseDocumentStore;

constructor(docStore: BaseDocumentStore) {
this.docStore = docStore;
}

async transform(nodes: BaseNode[]): Promise<BaseNode[]> {
const hashes = await this.docStore.getAllDocumentHashes();
const currentHashes = new Set<string>();
const nodesToRun: BaseNode[] = [];

for (const node of nodes) {
if (!(node.hash in hashes) && !currentHashes.has(node.hash)) {
this.docStore.setDocumentHash(node.id_, node.hash);
nodesToRun.push(node);
currentHashes.add(node.hash);
}
}

this.docStore.addDocuments(nodesToRun, true);

return nodesToRun;
}
}
44 changes: 44 additions & 0 deletions packages/core/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { BaseNode } from "../../Node";
import { BaseDocumentStore, VectorStore } from "../../storage";
import { classify } from "./classify";

/**
* Handle docstore upserts by checking hashes and ids.
* Identify missing docs and delete them from docstore and vector store
*/
export class UpsertsAndDeleteStrategy {
protected docStore: BaseDocumentStore;
protected vectorStore?: VectorStore;

constructor(docStore: BaseDocumentStore, vectorStore?: VectorStore) {
this.docStore = docStore;
this.vectorStore = vectorStore;
}

async transform(nodes: BaseNode[]): Promise<BaseNode[]> {
const { dedupedNodes, missingDocs, unusedDocs } = await classify(
this.docStore,
nodes,
);

// remove unused docs
for (const refDocId of unusedDocs) {
await this.docStore.deleteRefDoc(refDocId, false);
if (this.vectorStore) {
await this.vectorStore.delete(refDocId);
}
}

// remove missing docs
for (const docId of missingDocs) {
await this.docStore.deleteDocument(docId, true);
if (this.vectorStore) {
await this.vectorStore.delete(docId);
}
}

await this.docStore.addDocuments(dedupedNodes, true);

return dedupedNodes;
}
}
31 changes: 31 additions & 0 deletions packages/core/src/ingestion/strategies/UpsertsStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { BaseNode } from "../../Node";
import { BaseDocumentStore, VectorStore } from "../../storage";
import { TransformComponent } from "../types";
import { classify } from "./classify";

/**
* Handles doc store upserts by checking hashes and ids.
*/
export class UpsertsStrategy implements TransformComponent {
protected docStore: BaseDocumentStore;
protected vectorStore?: VectorStore;

constructor(docStore: BaseDocumentStore, vectorStore?: VectorStore) {
this.docStore = docStore;
this.vectorStore = vectorStore;
}

async transform(nodes: BaseNode[]): Promise<BaseNode[]> {
const { dedupedNodes, unusedDocs } = await classify(this.docStore, nodes);
// remove unused docs
for (const refDocId of unusedDocs) {
await this.docStore.deleteRefDoc(refDocId, false);
if (this.vectorStore) {
await this.vectorStore.delete(refDocId);
}
}
// add non-duplicate docs
this.docStore.addDocuments(dedupedNodes, true);
return dedupedNodes;
}
}
27 changes: 27 additions & 0 deletions packages/core/src/ingestion/strategies/classify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { BaseNode } from "../../Node";
import { BaseDocumentStore } from "../../storage";

export async function classify(docStore: BaseDocumentStore, nodes: BaseNode[]) {
const existingDocIds = Object.values(await docStore.getAllDocumentHashes());
const docIdsFromNodes = new Set<string>();
const dedupedNodes: BaseNode[] = [];
const unusedDocs: string[] = [];

for (const node of nodes) {
const refDocId = node.sourceNode?.nodeId || node.id_;
docIdsFromNodes.add(refDocId);
const existingHash = await docStore.getDocumentHash(refDocId);

if (!existingHash) {
// document doesn't exist, so add it
dedupedNodes.push(node);
} else if (existingHash && existingHash !== node.hash) {
// document exists but hash is different, so mark doc as unused and add node as deduped
unusedDocs.push(refDocId);
dedupedNodes.push(node);
}
// otherwise, document exists and hash is the same, so do nothing
}
const missingDocs = existingDocIds.filter((id) => !docIdsFromNodes.has(id));
return { dedupedNodes, missingDocs, unusedDocs };
}
40 changes: 40 additions & 0 deletions packages/core/src/ingestion/strategies/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { BaseDocumentStore, VectorStore } from "../../storage";
import { TransformComponent } from "../types";
import { DuplicatesStrategy } from "./DuplicatesStrategy";
import { UpsertsStrategy } from "./UpsertsStrategy";

export enum DocStoreStrategy {
UPSERTS = "upserts",
DUPLICATES_ONLY = "duplicates_only",
UPSERTS_AND_DELETE = "upserts_and_delete",
}

export function createDocStoreStrategy(
docStoreStrategy: DocStoreStrategy,
docStore?: BaseDocumentStore,
vectorStore?: VectorStore,
): TransformComponent | undefined {
if (docStore && vectorStore) {
if (
docStoreStrategy === DocStoreStrategy.UPSERTS ||
docStoreStrategy === DocStoreStrategy.UPSERTS_AND_DELETE
) {
return new UpsertsStrategy(docStore, vectorStore);
} else if (docStoreStrategy === DocStoreStrategy.DUPLICATES_ONLY) {
return new DuplicatesStrategy(docStore);
} else {
throw new Error(`Invalid docstore strategy: ${docStoreStrategy}`);
}
} else if (docStore && !vectorStore) {
if (docStoreStrategy === DocStoreStrategy.UPSERTS) {
console.warn(
"Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.",
);
} else if (docStoreStrategy === DocStoreStrategy.UPSERTS_AND_DELETE) {
console.warn(
"Docstore strategy set to upserts and delete, but no vector store. Switching to duplicates_only strategy.",
);
}
return new DuplicatesStrategy(docStore);
}
}
5 changes: 5 additions & 0 deletions packages/core/src/ingestion/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { BaseNode } from "../Node";

export interface TransformComponent {
transform(nodes: BaseNode[], options?: any): Promise<BaseNode[]>;
}
Loading

0 comments on commit e2790da

Please sign in to comment.