Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(llamaindex): support streaming #142

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type * as llamaindex from "llamaindex";
import {
Tracer,
Span,
Context,
SpanKind,
SpanStatusCode,
trace,
Expand All @@ -14,7 +15,7 @@ import { safeExecuteInTheMiddle } from "@opentelemetry/instrumentation";
import { SpanAttributes } from "@traceloop/ai-semantic-conventions";

import { LlamaIndexInstrumentationConfig } from "./types";
import { shouldSendPrompts, generatorWrapper } from "./utils";
import { shouldSendPrompts, llmGeneratorWrapper } from "./utils";

type LLM = llamaindex.LLM;

Expand Down Expand Up @@ -85,6 +86,7 @@ export class CustomLLMInstrumentation {
result = plugin.handleStreamingResponse(
result,
span,
execContext,
this.metadata,
);
} else {
Expand Down Expand Up @@ -139,6 +141,7 @@ export class CustomLLMInstrumentation {
handleStreamingResponse<T extends AsyncResponseType>(
result: T,
span: Span,
execContext: Context,
metadata: llamaindex.LLMMetadata,
): T {
span.setAttribute(SpanAttributes.LLM_RESPONSE_MODEL, metadata.model);
Expand All @@ -148,7 +151,7 @@ export class CustomLLMInstrumentation {
return result;
}

return generatorWrapper(result, (message) => {
return llmGeneratorWrapper(result, execContext, (message) => {
span.setAttribute(`${SpanAttributes.LLM_COMPLETIONS}.0.content`, message);
span.setStatus({ code: SpanStatusCode.OK });
span.end();
Expand Down
78 changes: 61 additions & 17 deletions packages/instrumentation-llamaindex/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as llamaindex from "llamaindex";
import { trace, context, Tracer, SpanStatusCode } from "@opentelemetry/api";
import { LlamaIndexInstrumentationConfig } from "./types";
import { safeExecuteInTheMiddle } from "@opentelemetry/instrumentation";
import { Context } from "@opentelemetry/api";
import {
TraceloopSpanKindValues,
SpanAttributes,
Expand All @@ -21,14 +22,46 @@ export const shouldSendPrompts = (config: LlamaIndexInstrumentationConfig) => {
return config.traceContent !== undefined ? config.traceContent : true;
};

// Adopted from https://github.com/open-telemetry/opentelemetry-js/issues/2951#issuecomment-1214587378
export function bindAsyncGenerator<T = unknown, TReturn = any, TNext = unknown>(
ctx: Context,
generator: AsyncGenerator<T, TReturn, TNext>,
): AsyncGenerator<T, TReturn, TNext> {
return {
next: context.bind(ctx, generator.next.bind(generator)),
return: context.bind(ctx, generator.return.bind(generator)),
throw: context.bind(ctx, generator.throw.bind(generator)),

[Symbol.asyncIterator]() {
return bindAsyncGenerator(ctx, generator[Symbol.asyncIterator]());
},
};
}

export async function* generatorWrapper(
streamingResult: AsyncGenerator,
ctx: Context,
fn: () => void,
) {
for await (const chunk of bindAsyncGenerator(ctx, streamingResult)) {
yield chunk;
}
fn();
}

export async function* llmGeneratorWrapper(
streamingResult:
| AsyncIterable<llamaindex.ChatResponseChunk>
| AsyncIterable<llamaindex.CompletionResponse>,
ctx: Context,
fn: (message: string) => void,
) {
let message = "";
for await (const messageChunk of streamingResult) {

for await (const messageChunk of bindAsyncGenerator(
ctx,
streamingResult as AsyncGenerator,
)) {
if ((messageChunk as llamaindex.ChatResponseChunk).delta) {
message += (messageChunk as llamaindex.ChatResponseChunk).delta;
}
Expand All @@ -50,6 +83,9 @@ export function genericWrapper(
// eslint-disable-next-line @typescript-eslint/ban-types
return (original: Function) => {
return function method(this: any, ...args: unknown[]) {
const params = args[0];
const streaming = params && (params as any).stream;

const name = `${lodash.snakeCase(className)}.${lodash.snakeCase(methodName)}`;
const span = tracer().startSpan(`${name}`, {}, context.active());
span.setAttribute(SpanAttributes.TRACELOOP_SPAN_KIND, kind);
Expand Down Expand Up @@ -98,25 +134,33 @@ export function genericWrapper(
const wrappedPromise = execPromise
.then((result: any) => {
return new Promise((resolve) => {
span.setStatus({ code: SpanStatusCode.OK });
if (streaming) {
result = generatorWrapper(result, execContext, () => {
span.setStatus({ code: SpanStatusCode.OK });
span.end();
});
resolve(result);
} else {
span.setStatus({ code: SpanStatusCode.OK });

try {
if (shouldSendPrompts) {
if (result instanceof Map) {
span.setAttribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
JSON.stringify(Array.from(result.entries())),
);
} else {
span.setAttribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
JSON.stringify(result),
);
try {
if (shouldSendPrompts) {
if (result instanceof Map) {
span.setAttribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
JSON.stringify(Array.from(result.entries())),
);
} else {
span.setAttribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
JSON.stringify(result),
);
}
}
} finally {
span.end();
resolve(result);
}
} finally {
span.end();
resolve(result);
}
});
})
Expand Down
54 changes: 54 additions & 0 deletions packages/instrumentation-llamaindex/test/instrumentation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,58 @@ describe("Test LlamaIndex instrumentation", async function () {
result.response,
);
}).timeout(60000);

it("should build proper trace on streaming query engine", async () => {
const directoryReader = new llamaindex.SimpleDirectoryReader();
const documents = await directoryReader.loadData({ directoryPath: "test" });
const embedModel = new llamaindex.OpenAIEmbedding();
const vectorStore = new llamaindex.SimpleVectorStore();

const serviceContext = llamaindex.serviceContextFromDefaults({
embedModel,
});
const storageContext = await llamaindex.storageContextFromDefaults({
vectorStore,
});

const index = await llamaindex.VectorStoreIndex.fromDocuments(documents, {
storageContext,
serviceContext,
});

const queryEngine = index.asQueryEngine();

const result = await queryEngine.query({
query: "Where was albert einstein born?",
stream: true,
});

for await (const res of result) {
assert.ok(res);
}

const spans = memoryExporter.getFinishedSpans();

// TODO: Need to figure out why this doesn't get logged
// assert.ok(spanNames.includes("get_query_embedding.task"));

const retrieverQueryEngineSpan = spans.find(
(span) => span.name === "retriever_query_engine.query",
);
const synthesizeSpan = spans.find(
(span) => span.name === "response_synthesizer.synthesize",
);
const openAIChatSpan = spans.find(
(span) => span.name === "llamaindex.open_ai.chat",
);

assert.strictEqual(
synthesizeSpan?.parentSpanId,
retrieverQueryEngineSpan?.spanContext().spanId,
);
assert.strictEqual(
openAIChatSpan?.parentSpanId,
synthesizeSpan?.spanContext().spanId,
);
}).timeout(60000);
});
8 changes: 6 additions & 2 deletions packages/sample-app/src/sample_llamaindex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SampleLlamaIndex {

const res = await queryEngine.query({
query: "What did the author do growing up?",
stream: true,
});
return res;
}
Expand All @@ -45,7 +46,10 @@ traceloop.withAssociationProperties(
{ user_id: "12345", chat_id: "789" },
async () => {
const sampleLlamaIndex = new SampleLlamaIndex();
const result = await sampleLlamaIndex.query();
console.log(result.response);
const res = await sampleLlamaIndex.query();
for await (const result of res) {
process.stdout.write(result.response);
}
//console.log(result.response);
},
);
Loading