Skip to content

Commit 4f95c9d

Browse files
authored
v3: Cancel awaited subtasks and reliable rate-limit recovery (#1200)
* v3: cancel subtasks when parent task runs are cancelled * v3: recover from server rate limiting errors in a more reliable way - Changing from sliding window to token bucket in the API rate limiter, to help smooth out traffic - Adding spans to the API Client core & SDK functions - Added waiting spans when retrying in the API Client - Retrying in the API Client now respects the x-ratelimit-reset - Retrying ApiError’s in tasks now respects the x-ratelimit-reset - Added AbortTaskRunError that when thrown will stop retries - Added idempotency keys SDK functions and automatically injecting the run ID when inside a task - Added the ability to configure ApiRequestOptions (retries only for now) globally and on specific calls - Implement the maxAttempts TaskRunOption (it wasn’t doing anything before) * Adding some docs about the request options * Fix type error * Remove context propagation through graphile jobs * Remove logger * only select a subset of task run columns * limit columns selected in batchTrigger as well * added idempotency doc * allow scoped idempotency keys, and fixed an issue with the unique index on BatchTaskRun and TaskRun * Removed old cancel task run children code
1 parent b53a575 commit 4f95c9d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2600
-631
lines changed

.changeset/lemon-sloths-hide.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
v3: recover from server rate limiting errors in a more reliable way

apps/webapp/app/components/runs/v3/CancelRunDialog.tsx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialog
2323
<DialogContent key="cancel">
2424
<DialogHeader>Cancel this run?</DialogHeader>
2525
<DialogDescription>
26-
Canceling a run will stop execution. If you want to run this later you will have to replay
27-
the entire run with the original payload.
26+
Canceling a run will stop execution, along with any executing subtasks.
2827
</DialogDescription>
2928
<DialogFooter>
3029
<Form action={`/resources/taskruns/${runFriendlyId}/cancel`} method="post">

apps/webapp/app/entry.server.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import {
1616
} from "./components/primitives/OperatingSystemProvider";
1717
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer";
1818
import { singleton } from "./utils/singleton";
19-
import { logger } from "./services/logger.server";
2019

2120
const ABORT_DELAY = 30000;
2221

@@ -186,6 +185,7 @@ export { apiRateLimiter } from "./services/apiRateLimit.server";
186185
export { socketIo } from "./v3/handleSocketIo.server";
187186
export { wss } from "./v3/handleWebsockets.server";
188187
export { registryProxy } from "./v3/registryProxy.server";
188+
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
189189
import { eventLoopMonitor } from "./eventLoopMonitor.server";
190190
import { env } from "./env.server";
191191

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ const EnvironmentSchema = z.object({
9797
* @example "1000ms"
9898
* @example "1000s"
9999
*/
100-
API_RATE_LIMIT_WINDOW: z.string().default("60s"),
101-
API_RATE_LIMIT_MAX: z.coerce.number().int().default(600),
100+
API_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"), // refill 250 tokens every 10 seconds
101+
API_RATE_LIMIT_MAX: z.coerce.number().int().default(750), // allow bursts of 750 requests
102+
API_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(250), // refix 250 tokens every 10 seconds
102103
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
103104
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
104105

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
12
import type {
23
CronItem,
34
CronItemOptions,
@@ -11,20 +12,19 @@ import type {
1112
WorkerUtils,
1213
} from "graphile-worker";
1314
import {
15+
Logger as GraphileLogger,
1416
run as graphileRun,
1517
makeWorkerUtils,
1618
parseCronItems,
17-
Logger as GraphileLogger,
1819
} from "graphile-worker";
19-
import { SpanKind, trace } from "@opentelemetry/api";
2020

21+
import { flattenAttributes } from "@trigger.dev/core/v3";
2122
import omit from "lodash.omit";
2223
import { z } from "zod";
2324
import { $replica, PrismaClient, PrismaClientOrTransaction } from "~/db.server";
25+
import { env } from "~/env.server";
2426
import { PgListenService } from "~/services/db/pgListen.server";
2527
import { workerLogger as logger } from "~/services/logger.server";
26-
import { flattenAttributes } from "@trigger.dev/core/v3";
27-
import { env } from "~/env.server";
2828

2929
const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");
3030

@@ -338,11 +338,45 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
338338
}
339339
}
340340

341-
const { job, durationInMs } = await this.#addJob(
342-
identifier as string,
343-
payload,
344-
spec,
345-
options?.tx ?? this.#prisma
341+
const { job, durationInMs } = await tracer.startActiveSpan(
342+
`Enqueue ${identifier as string}`,
343+
{
344+
kind: SpanKind.PRODUCER,
345+
attributes: {
346+
"job.task_identifier": identifier as string,
347+
"job.payload": payload,
348+
"job.priority": spec.priority,
349+
"job.run_at": spec.runAt?.toISOString(),
350+
"job.jobKey": spec.jobKey,
351+
"job.flags": spec.flags,
352+
"job.max_attempts": spec.maxAttempts,
353+
"worker.name": this.#name,
354+
},
355+
},
356+
async (span) => {
357+
try {
358+
const results = await this.#addJob(
359+
identifier as string,
360+
payload,
361+
spec,
362+
options?.tx ?? this.#prisma
363+
);
364+
365+
return results;
366+
} catch (error) {
367+
if (error instanceof Error) {
368+
span.recordException(error);
369+
} else {
370+
span.recordException(new Error(String(error)));
371+
}
372+
373+
span.setStatus({ code: SpanStatusCode.ERROR });
374+
375+
throw error;
376+
} finally {
377+
span.end();
378+
}
379+
}
346380
);
347381

348382
logger.debug("Enqueued worker task", {
@@ -401,6 +435,12 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
401435
const rows = AddJobResultsSchema.safeParse(results);
402436

403437
if (!rows.success) {
438+
logger.debug("results returned from add_job could not be parsed", {
439+
identifier,
440+
payload,
441+
spec,
442+
});
443+
404444
throw new Error(
405445
`Failed to add job to queue, zod parsing error: ${JSON.stringify(rows.error)}`
406446
);
@@ -422,9 +462,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
422462
const job = AddJobResultsSchema.safeParse(result);
423463

424464
if (!job.success) {
425-
logger.debug("results returned from remove_job could not be parsed", {
426-
error: job.error.flatten(),
427-
result,
465+
logger.debug("could not remove job, job_key did not exist", {
428466
jobKey,
429467
});
430468

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { env } from "~/env.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
77
import { logger } from "~/services/logger.server";
88
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
9+
import { ServiceValidationError } from "~/v3/services/baseService.server";
910
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
1011
import { startActiveSpan } from "~/v3/tracer.server";
1112

@@ -92,18 +93,12 @@ export async function action({ request, params }: ActionFunctionArgs) {
9293
traceContext,
9394
});
9495

95-
const run = await service.call(
96-
taskId,
97-
authenticationResult.environment,
98-
{ ...body.data },
99-
// { ...body.data, payload: (anyBody as any).payload },
100-
{
101-
idempotencyKey: idempotencyKey ?? undefined,
102-
triggerVersion: triggerVersion ?? undefined,
103-
traceContext,
104-
spanParentAsLink: spanParentAsLink === 1,
105-
}
106-
);
96+
const run = await service.call(taskId, authenticationResult.environment, body.data, {
97+
idempotencyKey: idempotencyKey ?? undefined,
98+
triggerVersion: triggerVersion ?? undefined,
99+
traceContext,
100+
spanParentAsLink: spanParentAsLink === 1,
101+
});
107102

108103
if (!run) {
109104
return json({ error: "Task not found" }, { status: 404 });
@@ -113,7 +108,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
113108
id: run.friendlyId,
114109
});
115110
} catch (error) {
116-
if (error instanceof Error) {
111+
if (error instanceof ServiceValidationError) {
112+
return json({ error: error.message }, { status: 422 });
113+
} else if (error instanceof Error) {
117114
return json({ error: error.message }, { status: 400 });
118115
}
119116

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ function RunActionButtons({ span }: { span: Span }) {
261261

262262
if (span.isPartial) {
263263
return (
264-
<Dialog>
264+
<Dialog key="in-progress">
265265
<LinkButton
266266
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
267267
LeadingIcon={CloudArrowDownIcon}
@@ -290,7 +290,7 @@ function RunActionButtons({ span }: { span: Span }) {
290290
}
291291

292292
return (
293-
<Dialog>
293+
<Dialog key="complete">
294294
<LinkButton
295295
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
296296
LeadingIcon={CloudArrowDownIcon}

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ export function authorizationRateLimitMiddleware({
106106
hashedAuthorizationValue
107107
);
108108

109+
const $remaining = Math.max(0, remaining); // remaining can be negative if the user has exceeded the limit, so clamp it to 0
110+
109111
res.set("x-ratelimit-limit", limit.toString());
110-
res.set("x-ratelimit-remaining", remaining.toString());
112+
res.set("x-ratelimit-remaining", $remaining.toString());
111113
res.set("x-ratelimit-reset", reset.toString());
112114

113115
if (success) {
@@ -122,12 +124,12 @@ export function authorizationRateLimitMiddleware({
122124
title: "Rate Limit Exceeded",
123125
status: 429,
124126
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429",
125-
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
127+
detail: `Rate limit exceeded ${$remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
126128
reset,
127129
limit,
128130
remaining,
129131
secondsUntilReset,
130-
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
132+
error: `Rate limit exceeded ${$remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
131133
},
132134
null,
133135
2
@@ -138,7 +140,11 @@ export function authorizationRateLimitMiddleware({
138140

139141
export const apiRateLimiter = authorizationRateLimitMiddleware({
140142
keyPrefix: "api",
141-
limiter: Ratelimit.slidingWindow(env.API_RATE_LIMIT_MAX, env.API_RATE_LIMIT_WINDOW as Duration),
143+
limiter: Ratelimit.tokenBucket(
144+
env.API_RATE_LIMIT_REFILL_RATE,
145+
env.API_RATE_LIMIT_REFILL_INTERVAL as Duration,
146+
env.API_RATE_LIMIT_MAX
147+
),
142148
pathMatchers: [/^\/api/],
143149
// Allow /api/v1/tasks/:id/callback/:secret
144150
pathWhiteList: [
@@ -152,6 +158,8 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
152158
/^\/api\/v1\/sources\/http\/[^\/]+$/, // /api/v1/sources/http/$id
153159
/^\/api\/v1\/endpoints\/[^\/]+\/[^\/]+\/index\/[^\/]+$/, // /api/v1/endpoints/$environmentId/$endpointSlug/index/$indexHookIdentifier
154160
"/api/v1/timezones",
161+
"/api/v1/usage/ingest",
162+
/^\/api\/v1\/runs\/[^\/]+\/attempts$/, // /api/v1/runs/$runFriendlyId/attempts
155163
],
156164
log: {
157165
rejections: env.API_RATE_LIMIT_REJECTION_LOGS_ENABLED === "1",
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { AsyncLocalStorage } from "node:async_hooks";
2+
3+
export type HttpLocalStorage = {
4+
requestId: string;
5+
path: string;
6+
host: string;
7+
};
8+
9+
const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
10+
11+
export type RunWithHttpContextFunction = <T>(context: HttpLocalStorage, fn: () => T) => T;
12+
13+
export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T {
14+
return httpLocalStorage.run(context, fn);
15+
}
16+
17+
export function getHttpContext(): HttpLocalStorage | undefined {
18+
return httpLocalStorage.getStore();
19+
}

apps/webapp/app/services/logger.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { LogLevel } from "@trigger.dev/core-backend";
22
import { Logger } from "@trigger.dev/core-backend";
33
import { sensitiveDataReplacer } from "./sensitiveDataReplacer";
44
import { AsyncLocalStorage } from "async_hooks";
5+
import { getHttpContext } from "./httpAsyncStorage.server";
56

67
const currentFieldsStore = new AsyncLocalStorage<Record<string, unknown>>();
78

@@ -16,7 +17,8 @@ export const logger = new Logger(
1617
sensitiveDataReplacer,
1718
() => {
1819
const fields = currentFieldsStore.getStore();
19-
return fields ? { ...fields } : {};
20+
const httpContext = getHttpContext();
21+
return { ...fields, http: httpContext };
2022
}
2123
);
2224

0 commit comments

Comments
 (0)