Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9567a9c
fix(realtime): gracefully recover from ECONNRESET errors when sending…
ericallam Oct 17, 2025
ef7389b
Add support for multiple writers to a single stream by removing the E…
ericallam Oct 17, 2025
da7d035
Make the MetadataStream client more robust to failure and add tests
ericallam Oct 17, 2025
91b09b3
Make the stream client more resilient and robust, including implement…
ericallam Oct 18, 2025
20a5323
Add some more streaming examples and markdown streaming
ericallam Oct 18, 2025
5ef2336
s2 WIP
ericallam Oct 20, 2025
3820e0c
Added realtimeStreams column to TaskRun to replace using metadata for…
ericallam Oct 21, 2025
46686dd
Write to s2 from the client instead of the server
ericallam Oct 21, 2025
1f27fc0
WIP
ericallam Oct 22, 2025
0a4b990
Add env var
ericallam Oct 22, 2025
193afd8
Loads more stuff
ericallam Oct 23, 2025
f956523
The stream.read() span now works better when specifying a startIndex
ericallam Oct 23, 2025
e2fbbf2
WIP
ericallam Oct 23, 2025
5f7a5da
Configure the waitUntil timeout via an env var
ericallam Oct 23, 2025
c5d32ef
Return stream parts from SSE class
ericallam Oct 23, 2025
df3016b
Adds new streams icon
samejr Oct 23, 2025
91db7ed
Layout improvements to streams inspector
samejr Oct 23, 2025
810272c
Improve layout of streams inspector
samejr Oct 23, 2025
4f6035c
Remove tabs if only Overview is shown
samejr Oct 23, 2025
2b09eed
Added compact view for streams and sticky copy button
ericallam Oct 23, 2025
d8e0598
Add AI SDK demo
ericallam Oct 23, 2025
ce5dac4
experiment_throttle is now just throttle
ericallam Oct 23, 2025
8f6a94f
Show textwrapping, copy and modal buttons on the Properties code blocks
samejr Oct 24, 2025
13cbac7
Use simplr library for plurals and flex wrap the heading info nicely
samejr Oct 24, 2025
65962c8
Moves toggle button functionality into the header with new icons
samejr Oct 24, 2025
744aca5
Make sure the scroll view knows if it’s at the top or bottom even if …
samejr Oct 24, 2025
44c277c
Improve loading states
samejr Oct 24, 2025
f50fd55
Add divide between heads and inspector content
samejr Oct 24, 2025
ea890a9
Remove 0 padding
samejr Oct 24, 2025
d4a5771
Adds nice behaviour if a long stream key is used
samejr Oct 24, 2025
183e68a
Disable the header buttons if the content is loading
samejr Oct 24, 2025
bc404d0
Fixed failing test
ericallam Oct 25, 2025
e48ff0d
Improved X-Resume-From-Chunk header parsing
ericallam Oct 25, 2025
064b562
Unify inactivity timeout threshold
ericallam Oct 25, 2025
9c46d5d
If v2 streams is requested, throw an error if S2 env vars are not set
ericallam Oct 25, 2025
3b559e1
Use implicit radix arg when calling parseInt
ericallam Oct 25, 2025
1a1ebad
Consistent API client creation
ericallam Oct 25, 2025
a0f88db
Normalize the stream source to an async iterable before passing to th…
ericallam Oct 25, 2025
8d2625a
Refactor the metadata streams stuff to be better
ericallam Oct 25, 2025
2417983
properly abort streams when the waitUntil timeout occurs
ericallam Oct 25, 2025
377dbc7
fix the new configurable waitUntil timeout
ericallam Oct 25, 2025
8f93d86
prevent memory leaks by cleaning up responses and requests
ericallam Oct 25, 2025
0c5da21
Fix timer leak
ericallam Oct 25, 2025
7495397
use server provided options for the s2 writer
ericallam Oct 25, 2025
2eaf483
s2 stream writer now handles abort signals
ericallam Oct 25, 2025
6b018e0
Fixed core tests
ericallam Oct 25, 2025
80569bd
No need to use keys and a Map for stream management
ericallam Oct 25, 2025
792c02f
Fixed runStream tests
ericallam Oct 25, 2025
667bf8d
Implement TRIGGER_V2_REALTIME_STREAMS env var
ericallam Oct 25, 2025
2da5fe2
Runs now have a "default" stream
ericallam Nov 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-falcons-approve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

gracefully recover from ECONNRESET errors when sending stream data from tasks to the server
30 changes: 30 additions & 0 deletions apps/webapp/app/assets/icons/ListBulletIcon.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
export function ListBulletIcon({ className }: { className?: string }) {
return (
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path
d="M9 5H20"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
<path
d="M9 12H20"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
<path
d="M9 19H20"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
<circle cx="4" cy="5" r="1" fill="currentColor" />
<circle cx="4" cy="12" r="1" fill="currentColor" />
<circle cx="4" cy="19" r="1" fill="currentColor" />
</svg>
);
}
27 changes: 27 additions & 0 deletions apps/webapp/app/assets/icons/MoveToBottomIcon.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export function MoveToBottomIcon({ className }: { className?: string }) {
return (
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path
d="M12 15L12 3"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
<path
d="M3 21L21 21"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
<path
d="M7.5 12.5L12 17L16.5 12.5"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
</svg>
);
}
20 changes: 20 additions & 0 deletions apps/webapp/app/assets/icons/SnakedArrowIcon.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export function SnakedArrowIcon({ className }: { className?: string }) {
return (
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path
d="M5 5H16C17.6569 5 19 6.34315 19 8L19 8.5C19 10.1569 17.6569 11.5 16 11.5H8C6.34314 11.5 5 12.8431 5 14.5L5 15C4.99999 16.6569 6.34314 18 8 18H18.634"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
<path
d="M16 21L19 18L16 15"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
/>
</svg>
);
}
10 changes: 10 additions & 0 deletions apps/webapp/app/assets/icons/StreamsIcon.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export function StreamsIcon({ className }: { className?: string }) {
return (
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M3 19C3 19 5.01155 17 8 17C10.9885 17 13 18.9973 16 18.9973C19 18.9973 21 17 21 17" stroke="currentColor" strokeWidth="2" strokeLinecap="round"/>
<path d="M3 13.0001C3 13.0001 5.01155 11 8 11C10.9885 11 13 13 16 13C19 13 21 11.0001 21 11.0001" stroke="currentColor" strokeWidth="2" strokeLinecap="round"/>
<path d="M3 7C3 7 5.01155 5 8 5C10.9885 5 13 6.9973 16 6.9973C19 6.9973 21 5 21 5" stroke="currentColor" strokeWidth="2" strokeLinecap="round"/>
</svg>
);
}

3 changes: 3 additions & 0 deletions apps/webapp/app/components/runs/v3/RunIcon.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { TriggerIcon } from "~/assets/icons/TriggerIcon";
import { PythonLogoIcon } from "~/assets/icons/PythonLogoIcon";
import { TraceIcon } from "~/assets/icons/TraceIcon";
import { WaitpointTokenIcon } from "~/assets/icons/WaitpointTokenIcon";
import { StreamsIcon } from "~/assets/icons/StreamsIcon";

type TaskIconProps = {
name: string | undefined;
Expand Down Expand Up @@ -107,6 +108,8 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
case "task-hook-onFailure":
case "task-hook-catchError":
return <FunctionIcon className={cn(className, "text-error")} />;
case "streams":
return <StreamsIcon className={cn(className, "text-text-dimmed")} />;
}

return <InformationCircleIcon className={cn(className, "text-text-dimmed")} />;
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ const EnvironmentSchema = z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce.number().int().default(60000), // 1 minute

REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
.number()
Expand Down Expand Up @@ -1201,6 +1202,16 @@ const EnvironmentSchema = z
EVENT_LOOP_MONITOR_UTILIZATION_SAMPLE_RATE: z.coerce.number().default(0.05),

VERY_SLOW_QUERY_THRESHOLD_MS: z.coerce.number().int().optional(),

REALTIME_STREAMS_S2_BASIN: z.string().optional(),
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
REALTIME_STREAMS_S2_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
})
.and(GithubAppEnvSchema);

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/models/organization.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function createOrganization(
role: "ADMIN",
},
},
v3Enabled: !features.isManagedCloud,
v3Enabled: true,
},
include: {
members: true,
Expand Down
36 changes: 36 additions & 0 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { WaitpointPresenter } from "./WaitpointPresenter.server";
import { engine } from "~/v3/runEngine.server";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
import { safeJsonParse } from "~/utils/json";

type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
export type Span = NonNullable<NonNullable<Result>["span"]>;
Expand Down Expand Up @@ -551,6 +552,41 @@ export class SpanPresenter extends BasePresenter {
},
};
}
case "realtime-stream": {
if (!span.entity.id) {
logger.error(`SpanPresenter: No realtime stream id`, {
spanId,
realtimeStreamId: span.entity.id,
});
return { ...data, entity: null };
}

const [runId, streamKey] = span.entity.id.split(":");

if (!runId || !streamKey) {
logger.error(`SpanPresenter: Invalid realtime stream id`, {
spanId,
realtimeStreamId: span.entity.id,
});
return { ...data, entity: null };
}

const metadata = span.entity.metadata
? (safeJsonParse(span.entity.metadata) as Record<string, unknown> | undefined)
: undefined;

return {
...data,
entity: {
type: "realtime-stream" as const,
object: {
runId,
streamKey,
metadata,
},
},
};
}
default:
return { ...data, entity: null };
}
Expand Down
12 changes: 4 additions & 8 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export const HeadersSchema = z.object({
"x-trigger-client": z.string().nullish(),
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
"x-trigger-request-idempotency-key": z.string().nullish(),
"x-trigger-realtime-streams-version": z.string().nullish(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});
Expand Down Expand Up @@ -63,6 +64,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-client": triggerClient,
"x-trigger-engine-version": engineVersion,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
} = headers;

const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
Expand Down Expand Up @@ -108,14 +110,7 @@ const { action, loader } = createActionApiRoute(
options: body.options,
isFromWorker,
traceContext,
});

logger.debug("[otelContext]", {
taskId: params.taskId,
headers,
options: body.options,
isFromWorker,
traceContext,
realtimeStreamsVersion,
});

const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);
Expand All @@ -131,6 +126,7 @@ const { action, loader } = createActionApiRoute(
traceContext,
spanParentAsLink: spanParentAsLink === 1,
oneTimeUseToken,
realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,
},
engineVersion ?? undefined
);
Expand Down
43 changes: 26 additions & 17 deletions apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
import { ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
runId: z.string(),
streamId: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
const $params = ParamsSchema.parse(params);

if (!request.body) {
return new Response("No body provided", { status: 400 });
}

return relayRealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
}

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
Expand Down Expand Up @@ -51,12 +40,32 @@ export const loader = createLoaderApiRoute(
},
},
async ({ params, request, resource: run, authentication }) => {
return relayRealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
// Get Last-Event-ID header for resuming from a specific position
const lastEventId = request.headers.get("Last-Event-ID") || undefined;

const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined;

if (timeoutInSeconds && isNaN(timeoutInSeconds)) {
return new Response("Invalid timeout seconds", { status: 400 });
}

if (timeoutInSeconds && timeoutInSeconds < 1) {
return new Response("Timeout seconds must be greater than 0", { status: 400 });
}

if (timeoutInSeconds && timeoutInSeconds > 600) {
return new Response("Timeout seconds must be less than 600", { status: 400 });
}

const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
request.signal
run.realtimeStreamsVersion
);

return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, request.signal, {
lastEventId,
timeoutInSeconds,
});
}
);
Loading