Skip to content

Commit 8e9d6a2

Browse files
committed
Add support for multiple writers to a single stream by removing the END_SENTINEL system
1 parent 50fbba5 commit 8e9d6a2

File tree

10 files changed

+1350
-83
lines changed

10 files changed

+1350
-83
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ const EnvironmentSchema = z
198198
.string()
199199
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
200200
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
201+
REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce
202+
.number()
203+
.int()
204+
.default(60000 * 5), // 5 minutes
201205

202206
REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
203207
.number()

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
44
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
5+
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
56
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
67

78
const ParamsSchema = z.object({
@@ -12,6 +13,25 @@ const ParamsSchema = z.object({
1213
export async function action({ request, params }: ActionFunctionArgs) {
1314
const $params = ParamsSchema.parse(params);
1415

16+
// Extract client ID from header, default to "default" if not provided
17+
const clientId = request.headers.get("X-Client-Id") || "default";
18+
19+
// Handle HEAD request to get last chunk index for this client
20+
if (request.method === "HEAD") {
21+
const lastChunkIndex = await relayRealtimeStreams.getLastChunkIndex(
22+
$params.runId,
23+
$params.streamId,
24+
clientId
25+
);
26+
27+
return new Response(null, {
28+
status: 200,
29+
headers: {
30+
"X-Last-Chunk-Index": lastChunkIndex.toString(),
31+
},
32+
});
33+
}
34+
1535
if (!request.body) {
1636
return new Response("No body provided", { status: 400 });
1737
}
@@ -23,6 +43,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
2343
request.body,
2444
$params.runId,
2545
$params.streamId,
46+
clientId,
2647
resumeFromChunkNumber
2748
);
2849
}
@@ -59,11 +80,10 @@ export const loader = createLoaderApiRoute(
5980
},
6081
},
6182
async ({ params, request, resource: run, authentication }) => {
62-
return relayRealtimeStreams.streamResponse(
83+
return v1RealtimeStreams.streamResponse(
6384
request,
6485
run.friendlyId,
6586
params.streamId,
66-
authentication.environment,
6787
request.signal
6888
);
6989
}

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
3-
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
3+
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
44
import {
55
createActionApiRoute,
66
createLoaderApiRoute,
@@ -53,17 +53,21 @@ const { action } = createActionApiRoute(
5353
return new Response("Target not found", { status: 404 });
5454
}
5555

56+
// Extract client ID from header, default to "default" if not provided
57+
const clientId = request.headers.get("X-Client-Id") || "default";
58+
5659
if (!request.body) {
5760
return new Response("No body provided", { status: 400 });
5861
}
5962

6063
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
6164
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
6265

63-
return relayRealtimeStreams.ingestData(
66+
return v1RealtimeStreams.ingestData(
6467
request.body,
6568
targetId,
6669
params.streamId,
70+
clientId,
6771
resumeFromChunkNumber
6872
);
6973
}
@@ -118,7 +122,14 @@ const loader = createLoaderApiRoute(
118122
return new Response("Only HEAD requests are allowed for this endpoint", { status: 405 });
119123
}
120124

121-
const lastChunkIndex = await relayRealtimeStreams.getLastChunkIndex(targetId, params.streamId);
125+
// Extract client ID from header, default to "default" if not provided
126+
const clientId = request.headers.get("X-Client-Id") || "default";
127+
128+
const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex(
129+
targetId,
130+
params.streamId,
131+
clientId
132+
);
122133

123134
return new Response(null, {
124135
status: 200,

0 commit comments

Comments
 (0)