Skip to content

Commit

Permalink
fix(llms): add missing events for stream method
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Dvorak <toomas2d@gmail.com>
  • Loading branch information
Tomas2D committed Dec 12, 2024
1 parent 24546c9 commit 9a82d29
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions src/llms/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,40 @@ export abstract class BaseLLM<
async (run) => {
const cacheEntry = await this.createCacheAccessor(input, options);

const tokens: TOutput[] = [];
for await (const token of cacheEntry.value || this._stream(input, options ?? {}, run)) {
tokens.push(token);
emit(token);
try {
await run.emitter.emit("start", { input, options });

const tokenEmitter = run.emitter.child({ groupId: "tokens" });
const chunks: TOutput[] = [];
const controller = createAbortController(options?.signal);

for await (const chunk of cacheEntry.value ||
this._stream(input, { ...options, signal: controller.signal }, run)) {
if (controller.signal.aborted) {
continue;
}

chunks.push(chunk);
await tokenEmitter.emit("newToken", {
value: chunk,
callbacks: { abort: () => controller.abort() },
});
emit(chunk);
}
const result = this._mergeChunks(chunks);
await run.emitter.emit("success", { value: result });
cacheEntry.resolve(chunks);
} catch (error) {
await run.emitter.emit("error", { input, error, options });
await cacheEntry.reject(error);
if (error instanceof LLMError) {
throw error;
} else {
throw new LLMError(`LLM has occurred an error.`, [error]);
}
} finally {
await run.emitter.emit("finish", null);
}
cacheEntry.resolve(tokens);
},
).middleware(INSTRUMENTATION_ENABLED ? createTelemetryMiddleware() : doNothing());
});
Expand Down

0 comments on commit 9a82d29

Please sign in to comment.