Skip to content

Commit d16f736

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/batch-completion-checker
2 parents bf39997 + 67592ec commit d16f736

File tree

4 files changed

+78
-35
lines changed

4 files changed

+78
-35
lines changed

.changeset/modern-nails-refuse.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Multiple streams can now be consumed simultaneously

packages/core/src/v3/apiClient/runStream.ts

+52-28
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
IOPacket,
99
parsePacket,
1010
} from "../utils/ioSerialization.js";
11+
import { ApiError } from "./errors.js";
1112
import { ApiClient } from "./index.js";
1213
import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js";
1314
import { EventSourceParserStream } from "eventsource-parser/stream";
@@ -97,7 +98,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
9798

9899
// First, define interfaces for the stream handling
99100
export interface StreamSubscription {
100-
subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void>;
101+
subscribe(): Promise<ReadableStream<unknown>>;
101102
}
102103

103104
export interface StreamSubscriptionFactory {
@@ -111,33 +112,38 @@ export class SSEStreamSubscription implements StreamSubscription {
111112
private options: { headers?: Record<string, string>; signal?: AbortSignal }
112113
) {}
113114

114-
async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
115-
const response = await fetch(this.url, {
115+
async subscribe(): Promise<ReadableStream<unknown>> {
116+
return fetch(this.url, {
116117
headers: {
117118
Accept: "text/event-stream",
118119
...this.options.headers,
119120
},
120121
signal: this.options.signal,
121-
});
122-
123-
if (!response.body) {
124-
throw new Error("No response body");
125-
}
126-
127-
const reader = response.body
128-
.pipeThrough(new TextDecoderStream())
129-
.pipeThrough(new EventSourceParserStream())
130-
.getReader();
131-
132-
while (true) {
133-
const { done, value } = await reader.read();
134-
135-
if (done) break;
122+
}).then((response) => {
123+
if (!response.ok) {
124+
throw ApiError.generate(
125+
response.status,
126+
{},
127+
"Could not subscribe to stream",
128+
Object.fromEntries(response.headers)
129+
);
130+
}
136131

137-
await onChunk(safeParseJSON(value.data));
138-
}
132+
if (!response.body) {
133+
throw new Error("No response body");
134+
}
139135

140-
return () => reader.cancel();
136+
return response.body
137+
.pipeThrough(new TextDecoderStream())
138+
.pipeThrough(new EventSourceParserStream())
139+
.pipeThrough(
140+
new TransformStream({
141+
transform(chunk, controller) {
142+
controller.enqueue(safeParseJSON(chunk.data));
143+
},
144+
})
145+
);
146+
});
141147
}
142148
}
143149

@@ -254,13 +260,31 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
254260
this.options.client?.baseUrl
255261
);
256262

257-
await subscription.subscribe(async (chunk) => {
258-
controller.enqueue({
259-
type: streamKey,
260-
chunk: chunk as TStreams[typeof streamKey],
261-
run,
262-
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
263-
});
263+
const stream = await subscription.subscribe();
264+
265+
// Create the pipeline and start it
266+
stream
267+
.pipeThrough(
268+
new TransformStream({
269+
transform(chunk, controller) {
270+
controller.enqueue({
271+
type: streamKey,
272+
chunk: chunk as TStreams[typeof streamKey],
273+
run,
274+
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
275+
},
276+
})
277+
)
278+
.pipeTo(
279+
new WritableStream({
280+
write(chunk) {
281+
controller.enqueue(chunk);
282+
},
283+
})
284+
)
285+
.catch((error) => {
286+
console.error(`Error in stream ${streamKey}:`, error);
287+
});
264288
}
265289
}
266290
}

packages/core/test/runStream.test.ts

+11-5
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@ import {
99
import type { SubscribeRunRawShape } from "../src/v3/schemas/api.js";
1010

1111
// Test implementations
12+
// Update TestStreamSubscription to return a ReadableStream
1213
class TestStreamSubscription implements StreamSubscription {
1314
constructor(private chunks: unknown[]) {}
1415

15-
async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
16-
for (const chunk of this.chunks) {
17-
await onChunk(chunk);
18-
}
19-
return () => {};
16+
async subscribe(): Promise<ReadableStream<unknown>> {
17+
return new ReadableStream({
18+
start: async (controller) => {
19+
for (const chunk of this.chunks) {
20+
controller.enqueue(chunk);
21+
}
22+
controller.close();
23+
},
24+
});
2025
}
2126
}
2227

28+
// TestStreamSubscriptionFactory can remain the same
2329
class TestStreamSubscriptionFactory implements StreamSubscriptionFactory {
2430
private streams = new Map<string, unknown[]>();
2531

scripts/publish-prerelease.sh

+10-2
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,17 @@ else
3636
fi
3737

3838
# Run your commands
39-
39+
# Run changeset version command and capture its output
4040
echo "Running: pnpm exec changeset version --snapshot $version"
41-
pnpm exec changeset version --snapshot $version
41+
if output=$(pnpm exec changeset version --snapshot $version 2>&1); then
42+
if echo "$output" | grep -q "No unreleased changesets found"; then
43+
echo "No unreleased changesets found. Exiting."
44+
exit 0
45+
fi
46+
else
47+
echo "Error running changeset version command"
48+
exit 1
49+
fi
4250

4351
echo "Running: pnpm run build --filter \"@trigger.dev/*\" --filter \"trigger.dev\""
4452
pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev"

0 commit comments

Comments
 (0)