Skip to content

Commit a236826

Browse files
committed
Write to s2 from the client instead of the server
1 parent ff56740 commit a236826

File tree

16 files changed

+561
-266
lines changed

16 files changed

+561
-266
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,7 @@ const EnvironmentSchema = z
12111211
.enum(["log", "error", "warn", "info", "debug"])
12121212
.default("info"),
12131213
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
1214-
REALTIME_STREAMS_S2_RESUME_TTL_SECONDS: z.coerce.number().int().default(86400),
1214+
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
12151215
})
12161216
.and(GithubAppEnvSchema);
12171217

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import { json } from "@remix-run/server-runtime";
12
import { z } from "zod";
2-
import { $replica } from "~/db.server";
3+
import { $replica, prisma } from "~/db.server";
34
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
45
import {
56
createActionApiRoute,
@@ -53,26 +54,58 @@ const { action } = createActionApiRoute(
5354
return new Response("Target not found", { status: 404 });
5455
}
5556

56-
// Extract client ID from header, default to "default" if not provided
57-
const clientId = request.headers.get("X-Client-Id") || "default";
58-
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
59-
60-
if (!request.body) {
61-
return new Response("No body provided", { status: 400 });
62-
}
57+
if (request.method === "PUT") {
58+
// This is the "create" endpoint
59+
const updatedRun = await prisma.taskRun.update({
60+
where: {
61+
friendlyId: targetId,
62+
runtimeEnvironmentId: authentication.environment.id,
63+
},
64+
data: {
65+
realtimeStreams: {
66+
push: params.streamId,
67+
},
68+
},
69+
select: {
70+
realtimeStreamsVersion: true,
71+
},
72+
});
6373

64-
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
65-
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
74+
const realtimeStream = getRealtimeStreamInstance(
75+
authentication.environment,
76+
updatedRun.realtimeStreamsVersion
77+
);
6678

67-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
79+
const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId);
6880

69-
return realtimeStream.ingestData(
70-
request.body,
71-
targetId,
72-
params.streamId,
73-
clientId,
74-
resumeFromChunkNumber
75-
);
81+
return json(
82+
{
83+
version: updatedRun.realtimeStreamsVersion,
84+
},
85+
{ status: 202, headers: responseHeaders }
86+
);
87+
} else {
88+
// Extract client ID from header, default to "default" if not provided
89+
const clientId = request.headers.get("X-Client-Id") || "default";
90+
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
91+
92+
if (!request.body) {
93+
return new Response("No body provided", { status: 400 });
94+
}
95+
96+
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
97+
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
98+
99+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
100+
101+
return realtimeStream.ingestData(
102+
request.body,
103+
targetId,
104+
params.streamId,
105+
clientId,
106+
resumeFromChunkNumber
107+
);
108+
}
76109
}
77110
);
78111

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
2929
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds
3030
}
3131

32+
async initializeStream(
33+
runId: string,
34+
streamId: string
35+
): Promise<{ responseHeaders?: Record<string, string> }> {
36+
return {};
37+
}
38+
3239
async streamResponse(
3340
request: Request,
3441
runId: string,

0 commit comments

Comments
 (0)