Skip to content

Commit aec3740

Browse files
committed
Add metadata.stream limits and improve the metadata streams structure
1 parent f14fc0d commit aec3740

File tree

10 files changed

+241
-64
lines changed

10 files changed

+241
-64
lines changed

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ export class SpanPresenter extends BasePresenter {
210210
const span = await eventRepository.getSpan(spanId, run.traceId);
211211

212212
const metadata = run.metadata
213-
? await prettyPrintPacket(run.metadata, run.metadataType)
213+
? await prettyPrintPacket(run.metadata, run.metadataType, { filteredKeys: ["$$streams"] })
214214
: undefined;
215215

216216
const context = {

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -234,28 +234,28 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
234234
});
235235

236236
// Check for stream metadata
237-
if (run.metadata) {
238-
for (const [key] of Object.entries(run.metadata)) {
239-
if (key.startsWith("$$stream.")) {
240-
const streamKey = key.replace("$$stream.", "") as keyof TStreams;
241-
242-
if (!activeStreams.has(key)) {
243-
activeStreams.add(key);
244-
245-
const subscription = this.options.streamFactory.createSubscription(
246-
run.id,
247-
streamKey.toString(),
248-
this.options.client?.baseUrl
249-
);
250-
251-
await subscription.subscribe(async (chunk) => {
252-
controller.enqueue({
253-
type: streamKey,
254-
chunk: chunk as TStreams[typeof streamKey],
255-
run,
256-
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
257-
});
258-
}
237+
if (run.metadata && "$$streams" in run.metadata && Array.isArray(run.metadata.$$streams)) {
238+
for (const streamKey of run.metadata.$$streams) {
239+
if (typeof streamKey !== "string") {
240+
continue;
241+
}
242+
243+
if (!activeStreams.has(streamKey)) {
244+
activeStreams.add(streamKey);
245+
246+
const subscription = this.options.streamFactory.createSubscription(
247+
run.id,
248+
streamKey,
249+
this.options.client?.baseUrl
250+
);
251+
252+
await subscription.subscribe(async (chunk) => {
253+
controller.enqueue({
254+
type: streamKey,
255+
chunk: chunk as TStreams[typeof streamKey],
256+
run,
257+
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
258+
});
259259
}
260260
}
261261
}

packages/core/src/v3/runMetadata/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ export class RunMetadataAPI implements RunMetadataManager {
5858
}
5959

6060
appendKey(key: string, value: DeserializedJson): void {
61-
return this.#getManager().appendKey(key, value);
61+
this.#getManager().appendKey(key, value);
62+
}
63+
64+
removeFromKey(key: string, value: DeserializedJson): void {
65+
this.#getManager().removeFromKey(key, value);
6266
}
6367

6468
public update(metadata: Record<string, DeserializedJson>): void {

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import { RunMetadataManager } from "./types.js";
66
import { MetadataStream } from "./metadataStream.js";
77
import { ApiClient } from "../apiClient/index.js";
88

9+
const MAXIMUM_ACTIVE_STREAMS = 2;
10+
const MAXIMUM_TOTAL_STREAMS = 5;
11+
912
export class StandardMetadataManager implements RunMetadataManager {
1013
private flushTimeoutId: NodeJS.Timeout | null = null;
1114
private hasChanges: boolean = false;
@@ -122,6 +125,40 @@ export class StandardMetadataManager implements RunMetadataManager {
122125
this.store = nextStore;
123126
}
124127

128+
public removeFromKey(key: string, value: DeserializedJson) {
129+
if (!this.runId) {
130+
return;
131+
}
132+
133+
let nextStore: Record<string, DeserializedJson> | undefined = this.store
134+
? structuredClone(this.store)
135+
: {};
136+
137+
if (key.startsWith("$.")) {
138+
const path = new JSONHeroPath(key);
139+
const currentValue = path.first(nextStore);
140+
141+
if (Array.isArray(currentValue)) {
142+
// Remove the value from array using deep equality check
143+
const newArray = currentValue.filter((item) => !dequal(item, value));
144+
path.set(nextStore, newArray);
145+
}
146+
} else {
147+
const currentValue = nextStore[key];
148+
149+
if (Array.isArray(currentValue)) {
150+
// Remove the value from array using deep equality check
151+
nextStore[key] = currentValue.filter((item) => !dequal(item, value));
152+
}
153+
}
154+
155+
if (!dequal(this.store, nextStore)) {
156+
this.hasChanges = true;
157+
}
158+
159+
this.store = nextStore;
160+
}
161+
125162
public incrementKey(key: string, increment: number = 1) {
126163
if (!this.runId) {
127164
return;
@@ -173,9 +210,27 @@ export class StandardMetadataManager implements RunMetadataManager {
173210
return $value;
174211
}
175212

213+
// Check to make sure we haven't exceeded the max number of active streams
214+
if (this.activeStreams.size >= MAXIMUM_ACTIVE_STREAMS) {
215+
console.warn(
216+
`Exceeded the maximum number of active streams (${MAXIMUM_ACTIVE_STREAMS}). The "${key}" stream will be ignored.`
217+
);
218+
return $value;
219+
}
220+
221+
// Check to make sure we haven't exceeded the max number of total streams
222+
const streams = (this.store?.$$streams ?? []) as string[];
223+
224+
if (streams.length >= MAXIMUM_TOTAL_STREAMS) {
225+
console.warn(
226+
`Exceeded the maximum number of total streams (${MAXIMUM_TOTAL_STREAMS}). The "${key}" stream will be ignored.`
227+
);
228+
return $value;
229+
}
230+
176231
try {
177232
// Add the key to the special stream metadata object
178-
this.setKey(`$$stream.${key}`, key);
233+
this.appendKey(`$$streams`, key);
179234

180235
await this.flush();
181236

packages/core/src/v3/runMetadata/noopManager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ export class NoopRunMetadataManager implements RunMetadataManager {
66
appendKey(key: string, value: DeserializedJson): void {
77
throw new Error("Method not implemented.");
88
}
9+
removeFromKey(key: string, value: DeserializedJson): void {
10+
throw new Error("Method not implemented.");
11+
}
912
incrementKey(key: string, value: number): void {
1013
throw new Error("Method not implemented.");
1114
}

packages/core/src/v3/runMetadata/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export interface RunMetadataManager {
99
setKey(key: string, value: DeserializedJson): void;
1010
deleteKey(key: string): void;
1111
appendKey(key: string, value: DeserializedJson): void;
12+
removeFromKey(key: string, value: DeserializedJson): void;
1213
incrementKey(key: string, value: number): void;
1314
decrementKey(key: string, value: number): void;
1415
update(metadata: Record<string, DeserializedJson>): void;

packages/core/src/v3/utils/ioSerialization.ts

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ export async function createPacketAttributes(
271271

272272
try {
273273
const parsed = parse(packet.data) as any;
274-
const jsonified = JSON.parse(JSON.stringify(parsed, safeReplacer));
274+
const jsonified = JSON.parse(JSON.stringify(parsed, makeSafeReplacer()));
275275

276276
const result = {
277277
...flattenAttributes(jsonified, dataKey),
@@ -319,7 +319,7 @@ export async function createPacketAttributesAsJson(
319319
const { deserialize } = await loadSuperJSON();
320320

321321
const deserialized = deserialize(data) as any;
322-
const jsonify = safeJsonParse(JSON.stringify(deserialized, safeReplacer));
322+
const jsonify = safeJsonParse(JSON.stringify(deserialized, makeSafeReplacer()));
323323

324324
return imposeAttributeLimits(flattenAttributes(jsonify, undefined));
325325
case "application/store":
@@ -329,7 +329,11 @@ export async function createPacketAttributesAsJson(
329329
}
330330
}
331331

332-
export async function prettyPrintPacket(rawData: any, dataType?: string): Promise<string> {
332+
export async function prettyPrintPacket(
333+
rawData: any,
334+
dataType?: string,
335+
options?: ReplacerOptions
336+
): Promise<string> {
333337
if (rawData === undefined) {
334338
return "";
335339
}
@@ -347,42 +351,53 @@ export async function prettyPrintPacket(rawData: any, dataType?: string): Promis
347351
if (typeof rawData === "string") {
348352
rawData = safeJsonParse(rawData);
349353
}
350-
return JSON.stringify(rawData, safeReplacer, 2);
354+
return JSON.stringify(rawData, makeSafeReplacer(options), 2);
351355
}
352356

353357
if (typeof rawData === "string") {
354358
return rawData;
355359
}
356360

357-
return JSON.stringify(rawData, safeReplacer, 2);
361+
return JSON.stringify(rawData, makeSafeReplacer(options), 2);
358362
}
359363

360-
function safeReplacer(key: string, value: any) {
361-
// If it is a BigInt
362-
if (typeof value === "bigint") {
363-
return value.toString(); // Convert to string
364-
}
364+
interface ReplacerOptions {
365+
filteredKeys?: string[];
366+
}
365367

366-
// if it is a Regex
367-
if (value instanceof RegExp) {
368-
return value.toString(); // Convert to string
369-
}
368+
function makeSafeReplacer(options?: ReplacerOptions) {
369+
return function replacer(key: string, value: any) {
370+
// Check if the key should be filtered out
371+
if (options?.filteredKeys?.includes(key)) {
372+
return undefined;
373+
}
370374

371-
// if it is a Set
372-
if (value instanceof Set) {
373-
return Array.from(value); // Convert to array
374-
}
375+
// If it is a BigInt
376+
if (typeof value === "bigint") {
377+
return value.toString();
378+
}
375379

376-
// if it is a Map, convert it to an object
377-
if (value instanceof Map) {
378-
const obj: Record<string, any> = {};
379-
value.forEach((v, k) => {
380-
obj[k] = v;
381-
});
382-
return obj;
383-
}
380+
// if it is a Regex
381+
if (value instanceof RegExp) {
382+
return value.toString();
383+
}
384+
385+
// if it is a Set
386+
if (value instanceof Set) {
387+
return Array.from(value);
388+
}
384389

385-
return value; // Otherwise return the value as is
390+
// if it is a Map, convert it to an object
391+
if (value instanceof Map) {
392+
const obj: Record<string, any> = {};
393+
value.forEach((v, k) => {
394+
obj[k] = v;
395+
});
396+
return obj;
397+
}
398+
399+
return value;
400+
};
386401
}
387402

388403
function getPacketExtension(outputType: string): string {

packages/core/test/runStream.test.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,7 @@ describe("RunSubscription", () => {
219219
status: "COMPLETED",
220220
});
221221
});
222-
});
223222

224-
describe("RunSubscription withStreams", () => {
225223
it("should handle stream data", async () => {
226224
const streamFactory = new TestStreamSubscriptionFactory();
227225

@@ -246,7 +244,7 @@ describe("RunSubscription withStreams", () => {
246244
isTest: false,
247245
runTags: [],
248246
metadata: JSON.stringify({
249-
"$$stream.openai": "openai",
247+
$$streams: ["openai"],
250248
}),
251249
metadataType: "application/json",
252250
},
@@ -312,7 +310,7 @@ describe("RunSubscription withStreams", () => {
312310
isTest: false,
313311
runTags: [],
314312
metadata: JSON.stringify({
315-
"$$stream.openai": "openai",
313+
$$streams: ["openai"],
316314
}),
317315
metadataType: "application/json",
318316
},
@@ -331,7 +329,7 @@ describe("RunSubscription withStreams", () => {
331329
isTest: false,
332330
runTags: [],
333331
metadata: JSON.stringify({
334-
"$$stream.openai": "openai",
332+
$$streams: ["openai"],
335333
}),
336334
metadataType: "application/json",
337335
},
@@ -410,8 +408,7 @@ describe("RunSubscription withStreams", () => {
410408
isTest: false,
411409
runTags: [],
412410
metadata: JSON.stringify({
413-
"$$stream.openai": "openai",
414-
"$$stream.anthropic": "anthropic",
411+
$$streams: ["openai", "anthropic"],
415412
}),
416413
metadataType: "application/json",
417414
},
@@ -494,7 +491,7 @@ describe("RunSubscription withStreams", () => {
494491
isTest: false,
495492
runTags: [],
496493
metadata: JSON.stringify({
497-
"$$stream.openai": "openai",
494+
$$streams: ["openai"],
498495
}),
499496
metadataType: "application/json",
500497
},
@@ -513,8 +510,7 @@ describe("RunSubscription withStreams", () => {
513510
isTest: false,
514511
runTags: [],
515512
metadata: JSON.stringify({
516-
"$$stream.openai": "openai",
517-
"$$stream.anthropic": "anthropic",
513+
$$streams: ["openai", "anthropic"],
518514
}),
519515
metadataType: "application/json",
520516
},
@@ -534,8 +530,7 @@ describe("RunSubscription withStreams", () => {
534530
isTest: false,
535531
runTags: [],
536532
metadata: JSON.stringify({
537-
"$$stream.openai": "openai",
538-
"$$stream.anthropic": "anthropic",
533+
$$streams: ["openai", "anthropic"],
539534
}),
540535
metadataType: "application/json",
541536
},

0 commit comments

Comments
 (0)