Skip to content

Commit bd0cc54

Browse files
authored
Create new partitioned TaskEvent table, and switch to it gradually as new runs are created (#1696)
* Create new partitioned TaskEvent table, and switch to it gradually as new runs are created * Add env var for partition window in seconds * Make startCreatedAt required in task event store
1 parent f0726af commit bd0cc54

File tree

21 files changed

+664
-414
lines changed

21 files changed

+664
-414
lines changed

apps/webapp/app/components/runs/v3/RunInspector.tsx

-39
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,11 @@ import { cn } from "~/utils/cn";
3232
import { formatCurrencyAccurate } from "~/utils/numberFormatter";
3333
import {
3434
v3RunDownloadLogsPath,
35-
v3RunPath,
3635
v3RunSpanPath,
3736
v3RunsPath,
3837
v3SchedulePath,
39-
v3TraceSpanPath,
4038
} from "~/utils/pathBuilder";
4139
import { TraceSpan } from "~/utils/taskEvent";
42-
import { SpanLink } from "~/v3/eventRepository.server";
4340
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
4441
import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline";
4542
import { RunTag } from "./RunTag";
@@ -317,18 +314,6 @@ export function RunInspector({
317314
)}
318315
</Property.Value>
319316
</Property.Item>
320-
{span?.links && span.links.length > 0 && (
321-
<Property.Item>
322-
<Property.Label>Links</Property.Label>
323-
<Property.Value>
324-
<div className="space-y-1">
325-
{span.links.map((link, index) => (
326-
<SpanLinkElement key={index} link={link} />
327-
))}
328-
</div>
329-
</Property.Value>
330-
</Property.Item>
331-
)}
332317
<Property.Item>
333318
<Property.Label>Max duration</Property.Label>
334319
<Property.Value>
@@ -647,27 +632,3 @@ function PacketDisplay({
647632
}
648633
}
649634
}
650-
651-
function SpanLinkElement({ link }: { link: SpanLink }) {
652-
const organization = useOrganization();
653-
const project = useProject();
654-
655-
switch (link.type) {
656-
case "run": {
657-
return (
658-
<TextLink to={v3RunPath(organization, project, { friendlyId: link.runId })}>
659-
{link.title}
660-
</TextLink>
661-
);
662-
}
663-
case "span": {
664-
return (
665-
<TextLink to={v3TraceSpanPath(organization, project, link.traceId, link.spanId)}>
666-
{link.title}
667-
</TextLink>
668-
);
669-
}
670-
}
671-
672-
return null;
673-
}

apps/webapp/app/components/runs/v3/SpanInspector.tsx

+3-52
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import { formatDuration, nanosecondsToMilliseconds } from "@trigger.dev/core/v3";
12
import { ExitIcon } from "~/assets/icons/ExitIcon";
23
import { CodeBlock } from "~/components/code/CodeBlock";
34
import { Button } from "~/components/primitives/Buttons";
45
import { DateTimeAccurate } from "~/components/primitives/DateTime";
56
import { Header2 } from "~/components/primitives/Headers";
67
import * as Property from "~/components/primitives/PropertyTable";
8+
import { Spinner } from "~/components/primitives/Spinner";
79
import { TabButton, TabContainer } from "~/components/primitives/Tabs";
810
import { TextLink } from "~/components/primitives/TextLink";
911
import { InfoIconTooltip, SimpleTooltip } from "~/components/primitives/Tooltip";
@@ -15,13 +17,10 @@ import { useOrganization } from "~/hooks/useOrganizations";
1517
import { useProject } from "~/hooks/useProject";
1618
import { useSearchParams } from "~/hooks/useSearchParam";
1719
import { cn } from "~/utils/cn";
18-
import { v3RunPath, v3RunsPath, v3TraceSpanPath } from "~/utils/pathBuilder";
20+
import { v3RunsPath } from "~/utils/pathBuilder";
1921
import { TraceSpan } from "~/utils/taskEvent";
20-
import { SpanLink } from "~/v3/eventRepository.server";
2122
import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline";
22-
import { Spinner } from "~/components/primitives/Spinner";
2323
import { LiveTimer } from "./LiveTimer";
24-
import { formatDuration, nanosecondsToMilliseconds } from "@trigger.dev/core/v3";
2524

2625
export function SpanInspector({
2726
span,
@@ -150,18 +149,6 @@ export function SpanInspector({
150149
)}
151150
</Property.Value>
152151
</Property.Item>
153-
{span.links && span.links.length > 0 && (
154-
<Property.Item>
155-
<Property.Label>Links</Property.Label>
156-
<Property.Value>
157-
<div className="space-y-1">
158-
{span.links.map((link, index) => (
159-
<SpanLinkElement key={index} link={link} />
160-
))}
161-
</div>
162-
</Property.Value>
163-
</Property.Item>
164-
)}
165152
</Property.Table>
166153
</div>
167154
) : (
@@ -203,18 +190,6 @@ export function SpanInspector({
203190
<Property.Label>Message</Property.Label>
204191
<Property.Value>{span.message}</Property.Value>
205192
</Property.Item>
206-
{span.links && span.links.length > 0 && (
207-
<Property.Item>
208-
<Property.Label>Links</Property.Label>
209-
<Property.Value>
210-
<div className="space-y-1">
211-
{span.links.map((link, index) => (
212-
<SpanLinkElement key={index} link={link} />
213-
))}
214-
</div>
215-
</Property.Value>
216-
</Property.Item>
217-
)}
218193
</Property.Table>
219194

220195
{span.events !== undefined && <SpanEvents spanEvents={span.events} />}
@@ -287,27 +262,3 @@ export function SpanTimeline({ startTime, duration, inProgress, isError }: Timel
287262
</>
288263
);
289264
}
290-
291-
function SpanLinkElement({ link }: { link: SpanLink }) {
292-
const organization = useOrganization();
293-
const project = useProject();
294-
295-
switch (link.type) {
296-
case "run": {
297-
return (
298-
<TextLink to={v3RunPath(organization, project, { friendlyId: link.runId })}>
299-
{link.title}
300-
</TextLink>
301-
);
302-
}
303-
case "span": {
304-
return (
305-
<TextLink to={v3TraceSpanPath(organization, project, link.traceId, link.spanId)}>
306-
{link.title}
307-
</TextLink>
308-
);
309-
}
310-
}
311-
312-
return null;
313-
}

apps/webapp/app/env.server.ts

+3
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,9 @@ const EnvironmentSchema = z.object({
481481
.transform((v) => v ?? process.env.REDIS_PASSWORD),
482482
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
483483
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
484+
485+
TASK_EVENT_PARTITIONING_ENABLED: z.string().default("0"),
486+
TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS: z.coerce.number().int().default(60), // 1 minute
484487
});
485488

486489
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/presenters/v3/RunPresenter.server.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
33
import { PrismaClient, prisma } from "~/db.server";
44
import { getUsername } from "~/utils/username";
55
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
67
import { isFinalRunStatus } from "~/v3/taskStatus";
78

89
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
@@ -32,6 +33,8 @@ export class RunPresenter {
3233
const run = await this.#prismaClient.taskRun.findFirstOrThrow({
3334
select: {
3435
id: true,
36+
createdAt: true,
37+
taskEventStore: true,
3538
number: true,
3639
traceId: true,
3740
spanId: true,
@@ -105,7 +108,12 @@ export class RunPresenter {
105108
}
106109

107110
// get the events
108-
const traceSummary = await eventRepository.getTraceSummary(run.traceId);
111+
const traceSummary = await eventRepository.getTraceSummary(
112+
getTaskEventStoreTableForRun(run),
113+
run.traceId,
114+
run.createdAt,
115+
run.completedAt ?? undefined
116+
);
109117
if (!traceSummary) {
110118
return {
111119
run: runData,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

+19-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { machinePresetFromName } from "~/v3/machinePresets.server";
1010
import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1111
import { BasePresenter } from "./basePresenter.server";
1212
import { getMaxDuration } from "~/v3/utils/maxDuration";
13+
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1314

1415
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
1516
export type Span = NonNullable<NonNullable<Result>["span"]>;
@@ -71,6 +72,7 @@ export class SpanPresenter extends BasePresenter {
7172
friendlyId: true,
7273
isTest: true,
7374
maxDurationInSeconds: true,
75+
taskEventStore: true,
7476
tags: {
7577
select: {
7678
name: true,
@@ -205,7 +207,13 @@ export class SpanPresenter extends BasePresenter {
205207
}
206208
}
207209

208-
const span = await eventRepository.getSpan(spanId, run.traceId);
210+
const span = await eventRepository.getSpan(
211+
getTaskEventStoreTableForRun(run),
212+
spanId,
213+
run.traceId,
214+
run.createdAt,
215+
run.completedAt ?? undefined
216+
);
209217

210218
const metadata = run.metadata
211219
? await prettyPrintPacket(run.metadata, run.metadataType, {
@@ -342,6 +350,9 @@ export class SpanPresenter extends BasePresenter {
342350
const run = await this._prisma.taskRun.findFirst({
343351
select: {
344352
traceId: true,
353+
createdAt: true,
354+
completedAt: true,
355+
taskEventStore: true,
345356
},
346357
where: {
347358
friendlyId: runFriendlyId,
@@ -352,7 +363,13 @@ export class SpanPresenter extends BasePresenter {
352363
return;
353364
}
354365

355-
const span = await eventRepository.getSpan(spanId, run.traceId);
366+
const span = await eventRepository.getSpan(
367+
getTaskEventStoreTableForRun(run),
368+
spanId,
369+
run.traceId,
370+
run.createdAt,
371+
run.completedAt ?? undefined
372+
);
356373

357374
if (!span) {
358375
return;

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.traces.$traceId.spans.$spanId/route.tsx

-35
This file was deleted.

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

-28
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import { RunTag } from "~/components/runs/v3/RunTag";
4444
import { SpanEvents } from "~/components/runs/v3/SpanEvents";
4545
import { SpanTitle } from "~/components/runs/v3/SpanTitle";
4646
import { TaskRunAttemptStatusCombo } from "~/components/runs/v3/TaskRunAttemptStatus";
47-
import { TaskRunsTable } from "~/components/runs/v3/TaskRunsTable";
4847
import { TaskRunStatusCombo } from "~/components/runs/v3/TaskRunStatus";
4948
import { useOrganization } from "~/hooks/useOrganizations";
5049
import { useProject } from "~/hooks/useProject";
@@ -57,16 +56,13 @@ import { cn } from "~/utils/cn";
5756
import { formatCurrencyAccurate } from "~/utils/numberFormatter";
5857
import {
5958
v3BatchPath,
60-
v3BatchRunsPath,
6159
v3RunDownloadLogsPath,
6260
v3RunPath,
6361
v3RunSpanPath,
6462
v3RunsPath,
6563
v3SchedulePath,
6664
v3SpanParamsSchema,
67-
v3TraceSpanPath,
6865
} from "~/utils/pathBuilder";
69-
import { SpanLink } from "~/v3/eventRepository.server";
7066

7167
export const loader = async ({ request, params }: LoaderFunctionArgs) => {
7268
const userId = await requireUserId(request);
@@ -1152,27 +1148,3 @@ function classNameForState(state: TimelineState) {
11521148
}
11531149
}
11541150
}
1155-
1156-
function SpanLinkElement({ link }: { link: SpanLink }) {
1157-
const organization = useOrganization();
1158-
const project = useProject();
1159-
1160-
switch (link.type) {
1161-
case "run": {
1162-
return (
1163-
<TextLink to={v3RunPath(organization, project, { friendlyId: link.runId })}>
1164-
{link.title}
1165-
</TextLink>
1166-
);
1167-
}
1168-
case "span": {
1169-
return (
1170-
<TextLink to={v3TraceSpanPath(organization, project, link.traceId, link.spanId)}>
1171-
{link.title}
1172-
</TextLink>
1173-
);
1174-
}
1175-
}
1176-
1177-
return null;
1178-
}

apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { createGzip } from "zlib";
77
import { Readable } from "stream";
88
import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations";
99
import { getDateFromNanoseconds } from "~/utils/taskEvent";
10+
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1011

1112
export async function loader({ params, request }: LoaderFunctionArgs) {
1213
const userId = await requireUserId(request);
@@ -31,7 +32,12 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
3132
return new Response("Not found", { status: 404 });
3233
}
3334

34-
const runEvents = await eventRepository.getRunEvents(run.friendlyId);
35+
const runEvents = await eventRepository.getRunEvents(
36+
getTaskEventStoreTableForRun(run),
37+
run.friendlyId,
38+
run.createdAt,
39+
run.completedAt ?? undefined
40+
);
3541

3642
// Create a Readable stream from the runEvents array
3743
const readable = new Readable({

apps/webapp/app/utils/pathBuilder.ts

-9
Original file line numberDiff line numberDiff line change
@@ -396,15 +396,6 @@ export function v3RunSpanPath(
396396
return `${v3RunPath(organization, project, run)}?span=${span.spanId}`;
397397
}
398398

399-
export function v3TraceSpanPath(
400-
organization: OrgForPath,
401-
project: ProjectForPath,
402-
traceId: string,
403-
spanId: string
404-
) {
405-
return `${v3ProjectPath(organization, project)}/traces/${traceId}/spans/${spanId}`;
406-
}
407-
408399
export function v3RunStreamingPath(
409400
organization: OrgForPath,
410401
project: ProjectForPath,

apps/webapp/app/utils/taskEvent.ts

-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ export function prepareTrace(events: TaskEvent[]): TraceSummary | undefined {
6262
);
6363

6464
const span = {
65-
recordId: event.id,
6665
id: event.spanId,
6766
parentId: event.parentId ?? undefined,
6867
runId: event.runId,
@@ -247,7 +246,6 @@ export function createSpanFromEvent(events: TaskEvent[], event: PreparedEvent) {
247246
}
248247

249248
const span = {
250-
recordId: event.id,
251249
id: event.spanId,
252250
parentId: event.parentId ?? undefined,
253251
runId: event.runId,

0 commit comments

Comments
 (0)