Skip to content

re2: Add attempt metrics in dev (prod WIP). Added max concurrent runs setting to dev using p-limit #1766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ const EnvironmentSchema = z.object({
/** The max number of runs per API call that we'll dequeue in DEV */
DEV_DEQUEUE_MAX_RUNS_PER_PULL: z.coerce.number().int().default(10),

/** The maximum concurrent local run processes executing at once in dev */
DEV_MAX_CONCURRENT_RUNS: z.coerce.number().int().default(25),

LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/engine.v1.dev.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export const loader = createLoaderApiRoute(
environmentId: authentication.environment.id,
dequeueIntervalWithRun: env.DEV_DEQUEUE_INTERVAL_WITH_RUN,
dequeueIntervalWithoutRun: env.DEV_DEQUEUE_INTERVAL_WITHOUT_RUN,
maxConcurrentRuns: env.DEV_MAX_CONCURRENT_RUNS,
});
} catch (error) {
logger.error("Failed to get dev settings", {
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ export class RunEngine {

return {
version: "1" as const,
dequeuedAt: new Date(),
snapshot: {
id: newSnapshot.id,
friendlyId: newSnapshot.friendlyId,
Expand Down
1 change: 1 addition & 0 deletions packages/cli-v3/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
"nypm": "^0.3.9",
"object-hash": "^3.0.0",
"open": "^10.0.3",
"p-limit": "^6.2.0",
"p-retry": "^6.1.0",
"partysocket": "^1.0.2",
"pkg-types": "^1.1.3",
Expand Down
5 changes: 5 additions & 0 deletions packages/cli-v3/src/commands/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const DevCommandOptions = CommonCommandOptions.extend({
skipUpdateCheck: z.boolean().default(false),
envFile: z.string().optional(),
keepTmpFiles: z.boolean().default(false),
maxConcurrentRuns: z.coerce.number().optional(),
});

export type DevCommandOptions = z.infer<typeof DevCommandOptions>;
Expand All @@ -37,6 +38,10 @@ export function configureDevCommand(program: Command) {
"--env-file <env file>",
"Path to the .env file to use for the dev session. Defaults to .env in the project directory."
)
.option(
"--max-concurrent-runs <max concurrent runs>",
"The maximum number of concurrent runs to allow in the dev session"
)
.option("--debug-otel", "Enable OpenTelemetry debugging")
.option("--skip-update-check", "Skip checking for @trigger.dev package updates")
.option(
Expand Down
34 changes: 30 additions & 4 deletions packages/cli-v3/src/dev/devSupervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
WorkerClientToServerEvents,
WorkerServerToClientEvents,
} from "@trigger.dev/core/v3/workers";
import pLimit from "p-limit";

export type WorkerRuntimeOptions = {
name: string | undefined;
Expand Down Expand Up @@ -65,6 +66,8 @@ class DevSupervisor implements WorkerRuntime {

private socketConnections = new Set<string>();

private runLimiter?: ReturnType<typeof pLimit>;

constructor(public readonly options: WorkerRuntimeOptions) {}

async init(): Promise<void> {
Expand All @@ -81,6 +84,15 @@ class DevSupervisor implements WorkerRuntime {
logger.debug("[DevSupervisor] Got dev settings", { settings: settings.data });
this.config = settings.data;

const maxConcurrentRuns = Math.min(
this.config.maxConcurrentRuns,
this.options.args.maxConcurrentRuns ?? this.config.maxConcurrentRuns
);

logger.debug("[DevSupervisor] Using maxConcurrentRuns", { maxConcurrentRuns });

this.runLimiter = pLimit(maxConcurrentRuns);

this.#createSocket();

//start an SSE connection for presence
Expand Down Expand Up @@ -178,6 +190,14 @@ class DevSupervisor implements WorkerRuntime {
return;
}

if (
this.runLimiter &&
this.runLimiter.activeCount + this.runLimiter.pendingCount > this.runLimiter.concurrency
) {
logger.debug(`[DevSupervisor] dequeueRuns. Run limit reached, trying again later`);
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithoutRun);
}

//get relevant versions
//ignore deprecated and the latest worker
const oldWorkerIds = this.#getActiveOldWorkers();
Expand Down Expand Up @@ -287,10 +307,16 @@ class DevSupervisor implements WorkerRuntime {

this.runControllers.set(message.run.friendlyId, runController);

//don't await for run completion, we want to dequeue more runs
runController.start(message).then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
});
if (this.runLimiter) {
this.runLimiter(() => runController.start(message)).then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
});
} else {
//don't await for run completion, we want to dequeue more runs
runController.start(message).then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
});
}
Comment on lines +310 to +319
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding error handling for run start.

Currently, if runController.start(message) rejects, no error path is handled. Logging or capturing that error will help diagnose startup failures.

-this.runLimiter(() => runController.start(message)).then(() => {
+this.runLimiter(() => runController.start(message))
+  .then(() => {
     logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
   })
+  .catch((error) => {
+    logger.debug("[DevSupervisor] Run failed to start", {
+      runId: message.run.friendlyId,
+      error
+    });
+  });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (this.runLimiter) {
this.runLimiter(() => runController.start(message)).then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
});
} else {
//don't await for run completion, we want to dequeue more runs
runController.start(message).then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
});
}
if (this.runLimiter) {
this.runLimiter(() => runController.start(message))
.then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
})
.catch((error) => {
logger.debug("[DevSupervisor] Run failed to start", {
runId: message.run.friendlyId,
error
});
});
} else {
//don't await for run completion, we want to dequeue more runs
runController.start(message).then(() => {
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
});
}

}

setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithRun);
Expand Down
36 changes: 34 additions & 2 deletions packages/cli-v3/src/entryPoints/dev-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
LogLevel,
RunExecutionData,
TaskRunExecution,
TaskRunExecutionMetrics,
TaskRunExecutionResult,
TaskRunFailedExecutionResult,
} from "@trigger.dev/core/v3";
Expand Down Expand Up @@ -475,17 +476,21 @@ export class DevRunController {
private async startAndExecuteRunAttempt({
runFriendlyId,
snapshotFriendlyId,
dequeuedAt,
isWarmStart = false,
}: {
runFriendlyId: string;
snapshotFriendlyId: string;
dequeuedAt?: Date;
isWarmStart?: boolean;
}) {
this.subscribeToRunNotifications({
run: { friendlyId: runFriendlyId },
snapshot: { friendlyId: snapshotFriendlyId },
});

const attemptStartedAt = Date.now();

const start = await this.httpClient.dev.startRunAttempt(runFriendlyId, snapshotFriendlyId);

if (!start.success) {
Expand All @@ -495,6 +500,8 @@ export class DevRunController {
return;
}

const attemptDuration = Date.now() - attemptStartedAt;

const { run, snapshot, execution, envVars } = start.data;

eventBus.emit("runStarted", this.opts.worker, execution);
Expand All @@ -508,8 +515,28 @@ export class DevRunController {
// This is the only case where incrementing the attempt number is allowed
this.enterRunPhase(run, snapshot);

const metrics = [
{
name: "start",
event: "create_attempt",
timestamp: attemptStartedAt,
duration: attemptDuration,
},
].concat(
dequeuedAt
? [
{
name: "start",
event: "dequeue",
timestamp: dequeuedAt.getTime(),
duration: 0,
},
]
: []
);

try {
return await this.executeRun({ run, snapshot, execution, envVars });
return await this.executeRun({ run, snapshot, execution, envVars, metrics });
} catch (error) {
// TODO: Handle the case where we're in the warm start phase or executing a new run
// This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch
Expand Down Expand Up @@ -566,7 +593,10 @@ export class DevRunController {
snapshot,
execution,
envVars,
}: WorkloadRunAttemptStartResponseBody) {
metrics,
}: WorkloadRunAttemptStartResponseBody & {
metrics?: TaskRunExecutionMetrics;
}) {
if (!this.opts.worker.serverWorker) {
throw new Error(`No server worker for Dev ${run.friendlyId}`);
}
Expand Down Expand Up @@ -594,6 +624,7 @@ export class DevRunController {
payload: {
execution,
traceContext: execution.run.traceContext ?? {},
metrics,
},
messageId: run.friendlyId,
});
Expand Down Expand Up @@ -753,6 +784,7 @@ export class DevRunController {
await this.startAndExecuteRunAttempt({
runFriendlyId: dequeueMessage.run.friendlyId,
snapshotFriendlyId: dequeueMessage.snapshot.friendlyId,
dequeuedAt: dequeueMessage.dequeuedAt,
}).finally(async () => {});
}

Expand Down
31 changes: 25 additions & 6 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
runTimelineMetrics,
} from "@trigger.dev/core/v3";
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
import {
Expand All @@ -36,6 +37,7 @@ import {
TracingSDK,
usage,
UsageTimeoutManager,
StandardRunTimelineMetricsManager,
} from "@trigger.dev/core/v3/workers";
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
import { readFile } from "node:fs/promises";
Expand Down Expand Up @@ -87,6 +89,10 @@ process.on("uncaughtException", function (error, origin) {

const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS");

const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager();
runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager);
standardRunTimelineMetricsManager.seedMetricsFromEnvironment();

const devUsageManager = new DevUsageManager();
usage.setGlobalUsageManager(devUsageManager);
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
Expand Down Expand Up @@ -189,9 +195,11 @@ const zodIpc = new ZodIpcConnection({
emitSchema: ExecutorToWorkerMessageCatalog,
process,
handlers: {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata }, sender) => {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);

standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);

if (_isRunning) {
logError("Worker is already running a task");

Expand Down Expand Up @@ -246,11 +254,22 @@ const zodIpc = new ZodIpcConnection({
}

try {
const beforeImport = performance.now();
await import(normalizeImportPath(taskManifest.entryPoint));
const durationMs = performance.now() - beforeImport;

log(`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`);
await runTimelineMetrics.measureMetric(
"trigger.dev/start",
"import",
{
entryPoint: taskManifest.entryPoint,
},
async () => {
const beforeImport = performance.now();
await import(normalizeImportPath(taskManifest.entryPoint));
const durationMs = performance.now() - beforeImport;

log(
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
);
}
);
} catch (err) {
logError(`Failed to import task ${execution.task.id}`, err);

Expand Down
31 changes: 24 additions & 7 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
runMetadata,
waitUntil,
apiClientManager,
runTimelineMetrics,
} from "@trigger.dev/core/v3";
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
import {
Expand All @@ -37,6 +38,7 @@ import {
StandardMetadataManager,
StandardWaitUntilManager,
ManagedRuntimeManager,
StandardRunTimelineMetricsManager,
} from "@trigger.dev/core/v3/workers";
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
import { readFile } from "node:fs/promises";
Expand Down Expand Up @@ -91,6 +93,10 @@ const usageEventUrl = getEnvVar("USAGE_EVENT_URL");
const triggerJWT = getEnvVar("TRIGGER_JWT");
const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS");

const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager();
runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager);
standardRunTimelineMetricsManager.seedMetricsFromEnvironment();

const devUsageManager = new DevUsageManager();
const prodUsageManager = new ProdUsageManager(devUsageManager, {
heartbeatIntervalMs: usageIntervalMs ? parseInt(usageIntervalMs, 10) : undefined,
Expand Down Expand Up @@ -199,7 +205,9 @@ const zodIpc = new ZodIpcConnection({
emitSchema: ExecutorToWorkerMessageCatalog,
process,
handlers: {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata }, sender) => {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);

console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);

if (_isRunning) {
Expand Down Expand Up @@ -256,12 +264,21 @@ const zodIpc = new ZodIpcConnection({
}

try {
const beforeImport = performance.now();
await import(normalizeImportPath(taskManifest.entryPoint));
const durationMs = performance.now() - beforeImport;

console.log(
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
await runTimelineMetrics.measureMetric(
"trigger.dev/start",
"import",
{
entryPoint: taskManifest.entryPoint,
},
async () => {
const beforeImport = performance.now();
await import(normalizeImportPath(taskManifest.entryPoint));
const durationMs = performance.now() - beforeImport;

console.log(
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
);
}
);
} catch (err) {
console.error(`Failed to import task ${execution.task.id}`, err);
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ export const DevConfigResponseBody = z.object({
environmentId: z.string(),
dequeueIntervalWithRun: z.number(),
dequeueIntervalWithoutRun: z.number(),
maxConcurrentRuns: z.number(),
});
export type DevConfigResponseBody = z.infer<typeof DevConfigResponseBody>;

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export type ExecutionResult = z.infer<typeof ExecutionResult>;
export const DequeuedMessage = z.object({
version: z.literal("1"),
snapshot: ExecutionSnapshot,
dequeuedAt: z.coerce.date(),
image: z.string().optional(),
checkpoint: z
.object({
Expand Down
Loading
Loading