Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,62 @@ function createMockStreamEvents(model = 'claude-3-haiku-20240307') {
return generator();
}

// Mimics Anthropic SDK's MessageStream class
class MockMessageStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we actually use the MessageStream class since this is an integration test anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't export that :(

constructor(model) {
this._model = model;
this._eventHandlers = {};
}

on(event, handler) {
if (!this._eventHandlers[event]) {
this._eventHandlers[event] = [];
}
this._eventHandlers[event].push(handler);

// Start processing events asynchronously (don't await)
if (event === 'streamEvent' && !this._processing) {
this._processing = true;
this._processEvents();
}

return this;
}

async _processEvents() {
try {
const generator = createMockStreamEvents(this._model);
for await (const event of generator) {
if (this._eventHandlers['streamEvent']) {
for (const handler of this._eventHandlers['streamEvent']) {
handler(event);
}
}
}

// Emit 'message' event when done
if (this._eventHandlers['message']) {
for (const handler of this._eventHandlers['message']) {
handler();
}
}
} catch (error) {
if (this._eventHandlers['error']) {
for (const handler of this._eventHandlers['error']) {
handler(error);
}
}
}
}

async *[Symbol.asyncIterator]() {
const generator = createMockStreamEvents(this._model);
for await (const event of generator) {
yield event;
}
}
}

class MockAnthropic {
constructor(config) {
this.apiKey = config.apiKey;
Expand Down Expand Up @@ -68,9 +124,9 @@ class MockAnthropic {
};
}

async _messagesStream(params) {
await new Promise(resolve => setTimeout(resolve, 5));
return createMockStreamEvents(params?.model);
// This should return synchronously (like the real Anthropic SDK)
_messagesStream(params) {
return new MockMessageStream(params?.model);
}
}

Expand All @@ -90,13 +146,27 @@ async function run() {
}

// 2) Streaming via messages.stream API
const stream2 = await client.messages.stream({
const stream2 = client.messages.stream({
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'Stream this too' }],
});
for await (const _ of stream2) {
void _;
}

// 3) Streaming via messages.stream API with redundant stream: true param
const stream3 = client.messages.stream({
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'Stream with param' }],
stream: true, // This param is redundant but should not break synchronous behavior
});
// Verify it has .on() method immediately (not a Promise)
if (typeof stream3.on !== 'function') {
throw new Error('BUG: messages.stream() with stream: true did not return MessageStream synchronously!');
}
for await (const _ of stream3) {
void _;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,23 @@ describe('Anthropic integration', () => {
'gen_ai.usage.total_tokens': 25,
}),
}),
// messages.stream with redundant stream: true param
expect.objectContaining({
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
data: expect.objectContaining({
'gen_ai.system': 'anthropic',
'gen_ai.operation.name': 'messages',
'gen_ai.request.model': 'claude-3-haiku-20240307',
'gen_ai.request.stream': true,
'gen_ai.response.streaming': true,
'gen_ai.response.model': 'claude-3-haiku-20240307',
'gen_ai.response.id': 'msg_stream_1',
'gen_ai.usage.input_tokens': 10,
'gen_ai.usage.output_tokens': 15,
'gen_ai.usage.total_tokens': 25,
}),
}),
]),
};

Expand All @@ -331,6 +348,14 @@ describe('Anthropic integration', () => {
'gen_ai.response.text': 'Hello from stream!',
}),
}),
expect.objectContaining({
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
data: expect.objectContaining({
'gen_ai.response.streaming': true,
'gen_ai.response.text': 'Hello from stream!',
}),
}),
]),
};

Expand Down
21 changes: 12 additions & 9 deletions packages/core/src/utils/anthropic-ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ function handleStreamingError(error: unknown, span: Span, methodPath: string): n
* Handle streaming cases with common logic
*/
function handleStreamingRequest<T extends unknown[], R>(
originalMethod: (...args: T) => Promise<R>,
target: (...args: T) => Promise<R>,
originalMethod: (...args: T) => R | Promise<R>,
target: (...args: T) => R | Promise<R>,
context: unknown,
args: T,
requestAttributes: Record<string, unknown>,
Expand All @@ -215,15 +215,17 @@ function handleStreamingRequest<T extends unknown[], R>(
params: Record<string, unknown> | undefined,
options: AnthropicAiOptions,
isStreamRequested: boolean,
): Promise<R> {
isStreamingMethod: boolean,
): R | Promise<R> {
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
const spanConfig = {
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
};

if (isStreamRequested) {
// messages.stream() always returns a sync MessageStream, even with stream: true param
if (isStreamRequested && !isStreamingMethod) {
return startSpanManual(spanConfig, async span => {
try {
if (options.recordInputs && params) {
Expand Down Expand Up @@ -260,13 +262,13 @@ function handleStreamingRequest<T extends unknown[], R>(
* @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation
*/
function instrumentMethod<T extends unknown[], R>(
originalMethod: (...args: T) => Promise<R>,
originalMethod: (...args: T) => R | Promise<R>,
methodPath: AnthropicAiInstrumentedMethod,
context: unknown,
options: AnthropicAiOptions,
): (...args: T) => Promise<R> {
): (...args: T) => R | Promise<R> {
return new Proxy(originalMethod, {
apply(target, thisArg, args: T): Promise<R> {
apply(target, thisArg, args: T): R | Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
const operationName = getFinalOperationName(methodPath);
Expand All @@ -287,6 +289,7 @@ function instrumentMethod<T extends unknown[], R>(
params,
options,
isStreamRequested,
isStreamingMethod,
);
}

Expand Down Expand Up @@ -320,7 +323,7 @@ function instrumentMethod<T extends unknown[], R>(
},
);
},
}) as (...args: T) => Promise<R>;
}) as (...args: T) => R | Promise<R>;
}

/**
Expand All @@ -333,7 +336,7 @@ function createDeepProxy<T extends object>(target: T, currentPath = '', options:
const methodPath = buildMethodPath(currentPath, String(prop));

if (typeof value === 'function' && shouldInstrument(methodPath)) {
return instrumentMethod(value as (...args: unknown[]) => Promise<unknown>, methodPath, obj, options);
return instrumentMethod(value as (...args: unknown[]) => unknown | Promise<unknown>, methodPath, obj, options);
}

if (typeof value === 'function') {
Expand Down
Loading