Skip to content

Fix controller waitpoint resolution, suspendable state, and snapshot race conditions #2006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 49 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b384dad
remove dead code
nicktrn Apr 29, 2025
082926a
rename managed to shared runtime manager
nicktrn Apr 29, 2025
786416c
rename to resolve waitpoint for clarity
nicktrn Apr 29, 2025
b5ae558
add resolver id helper
nicktrn Apr 29, 2025
4d46d20
store and correctly resolve waipoints that come in early
nicktrn Apr 29, 2025
e72bcc8
fix ipc message type change
nicktrn Apr 29, 2025
38273ae
branded type for resolver ids
nicktrn Apr 29, 2025
f190420
add fixme comments
nicktrn Apr 29, 2025
3592db2
remove more unused ipc schemas
nicktrn Apr 29, 2025
d6dfc66
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn Apr 29, 2025
b13d8b7
fix entitlement validation when client doesn't exist
nicktrn Apr 29, 2025
3cb44a1
restore hello world reference workspace imports
nicktrn Apr 29, 2025
9e2729c
runtime manager debug logs
nicktrn Apr 29, 2025
752770f
prefix engine run logs
nicktrn Apr 30, 2025
5f52292
managed run logger accepts nested props
nicktrn Apr 30, 2025
bb9ac50
runtime suspendable state and improved logs
nicktrn Apr 30, 2025
3d978d3
require suspendable state for checkpoints, fix snapshot processing queue
nicktrn Apr 30, 2025
63b3bc0
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn Apr 30, 2025
479c304
add terminal link as cli module so we can more easily patch it
nicktrn Apr 30, 2025
a053114
apply cursor patch
nicktrn Apr 30, 2025
c3333ab
add license info
nicktrn Apr 30, 2025
8bc2e33
remove terminal-link package and add deprecation notice
nicktrn Apr 30, 2025
784e151
remove old patch
nicktrn Apr 30, 2025
641ca67
remove terminal-link from sdk
nicktrn Apr 30, 2025
ccd9e5b
rename snapshot module
nicktrn Apr 30, 2025
f62047b
add cli test tsconfig
nicktrn Apr 30, 2025
c3dbb8a
add run logger base type
nicktrn Apr 30, 2025
140e2d7
add snapshot manager tests
nicktrn Apr 30, 2025
8df5122
Merge branch 'fix/terminal-links' into fix/resolve-waitpoints
nicktrn Apr 30, 2025
ddb40ae
fix cli builds
nicktrn May 1, 2025
0ed24db
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 1, 2025
54db582
improve QUEUED_EXECUTING test
nicktrn May 1, 2025
b17a947
changeset
nicktrn May 1, 2025
9292667
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 1, 2025
68ea4c4
make testcontainers wait until container has stopped
nicktrn May 1, 2025
c36e274
require unit tests for publishing again
nicktrn May 1, 2025
87b0ce1
avoid mutation during iteration when resolving pending waitpoints
nicktrn May 1, 2025
2622b0d
improve debug logs and make them less noisy
nicktrn May 1, 2025
ffa2a73
always update poller snapshot id for accurate logs
nicktrn May 1, 2025
7a37a26
detach task run process handlers
nicktrn May 1, 2025
55b835d
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 1, 2025
8fd09e3
check for env overrides in a few more places and add verbose logs
nicktrn May 1, 2025
da24a79
log when poller is still executing when we stop it
nicktrn May 1, 2025
c62cf10
add supervisor to publish workflow
nicktrn May 1, 2025
467f9de
always print full deploy logs in CI
nicktrn May 2, 2025
246c1a9
Revert "avoid mutation during iteration when resolving pending waitpo…
nicktrn May 2, 2025
9403409
disable pre
nicktrn May 2, 2025
4607b05
print prerelease script errors
nicktrn May 2, 2025
ed1a44c
Revert "disable pre"
nicktrn May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/plenty-dolphins-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

- Correctly resolve waitpoints that come in early
- Ensure correct state before requesting suspension
- Fix race conditions in snapshot processing
5 changes: 5 additions & 0 deletions .changeset/sweet-dolphins-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Always print full deploy logs in CI
16 changes: 8 additions & 8 deletions .github/workflows/publish-worker-re2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ jobs:
REPOSITORY: ${{ steps.get_repository.outputs.repo }}
IMAGE_TAG: ${{ steps.get_tag.outputs.tag }}

- name: 🐙 Push 'v3' tag to GitHub Container Registry
if: steps.get_tag.outputs.is_semver == 'true'
run: |
docker tag infra_image "$REGISTRY/$REPOSITORY:v3"
docker push "$REGISTRY/$REPOSITORY:v3"
env:
REGISTRY: ghcr.io/triggerdotdev
REPOSITORY: ${{ steps.get_repository.outputs.repo }}
# - name: 🐙 Push 'v3' tag to GitHub Container Registry
# if: steps.get_tag.outputs.is_semver == 'true'
# run: |
# docker tag infra_image "$REGISTRY/$REPOSITORY:v3"
# docker push "$REGISTRY/$REPOSITORY:v3"
# env:
# REGISTRY: ghcr.io/triggerdotdev
# REPOSITORY: ${{ steps.get_repository.outputs.repo }}
16 changes: 8 additions & 8 deletions .github/workflows/publish-worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ jobs:
REPOSITORY: ${{ steps.get_repository.outputs.repo }}
IMAGE_TAG: ${{ steps.get_tag.outputs.tag }}

- name: 🐙 Push 'v3' tag to GitHub Container Registry
if: steps.get_tag.outputs.is_semver == 'true'
run: |
docker tag infra_image "$REGISTRY/$REPOSITORY:v3"
docker push "$REGISTRY/$REPOSITORY:v3"
env:
REGISTRY: ghcr.io/triggerdotdev
REPOSITORY: ${{ steps.get_repository.outputs.repo }}
# - name: 🐙 Push 'v3' tag to GitHub Container Registry
# if: steps.get_tag.outputs.is_semver == 'true'
# run: |
# docker tag infra_image "$REGISTRY/$REPOSITORY:v3"
# docker push "$REGISTRY/$REPOSITORY:v3"
# env:
# REGISTRY: ghcr.io/triggerdotdev
# REPOSITORY: ${{ steps.get_repository.outputs.repo }}
11 changes: 9 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,22 @@ jobs:
secrets: inherit

publish-webapp:
needs: [typecheck]
needs: [typecheck, units]
uses: ./.github/workflows/publish-webapp.yml
secrets: inherit
with:
image_tag: ${{ inputs.image_tag }}

publish-worker:
needs: [typecheck]
needs: [typecheck, units]
uses: ./.github/workflows/publish-worker.yml
secrets: inherit
with:
image_tag: ${{ inputs.image_tag }}

publish-worker-v4:
needs: [typecheck, units]
uses: ./.github/workflows/publish-worker-v4.yml
secrets: inherit
with:
image_tag: ${{ inputs.image_tag }}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class DefaultTriggerTaskValidator implements TriggerTaskValidator {

const result = await getEntitlement(environment.organizationId);

if (!result || result.hasAccess === false) {
if (result && result.hasAccess === false) {
return {
ok: false,
error: new OutOfEntitlementError(),
Expand Down
6 changes: 4 additions & 2 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ export function registerRunEngineEventBusHandlers() {
engine.eventBus.on("executionSnapshotCreated", async ({ time, run, snapshot }) => {
const eventResult = await recordRunDebugLog(
run.id,
`${snapshot.executionStatus} - ${snapshot.description}`,
`[engine] ${snapshot.executionStatus} - ${snapshot.description}`,
{
attributes: {
properties: {
Expand Down Expand Up @@ -450,6 +450,7 @@ export function registerRunEngineEventBusHandlers() {
// Record notification event
const eventResult = await recordRunDebugLog(
run.id,
// don't prefix this with [engine] - "run:notify" is the correct prefix
`run:notify platform -> supervisor: ${snapshot.executionStatus}`,
{
attributes: {
Expand Down Expand Up @@ -479,6 +480,7 @@ export function registerRunEngineEventBusHandlers() {
// Record notification event
const eventResult = await recordRunDebugLog(
run.id,
// don't prefix this with [engine] - "run:notify" is the correct prefix
`run:notify ERROR platform -> supervisor: ${snapshot.executionStatus}`,
{
attributes: {
Expand All @@ -505,7 +507,7 @@ export function registerRunEngineEventBusHandlers() {
engine.eventBus.on("incomingCheckpointDiscarded", async ({ time, run, snapshot, checkpoint }) => {
const eventResult = await recordRunDebugLog(
run.id,
`Checkpoint discarded: ${checkpoint.discardReason}`,
`[engine] Checkpoint discarded: ${checkpoint.discardReason}`,
{
attributes: {
properties: {
Expand Down
12 changes: 9 additions & 3 deletions internal-packages/testcontainers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const postgresContainer = async (
try {
await use(container);
} finally {
await container.stop();
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
await container.stop({ timeout: 10 });
}
};

Expand Down Expand Up @@ -92,7 +94,9 @@ const redisContainer = async (
try {
await use(container);
} finally {
await container.stop();
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
await container.stop({ timeout: 10 });
}
};

Expand Down Expand Up @@ -142,7 +146,9 @@ const electricOrigin = async (
try {
await use(origin);
} finally {
await container.stop();
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
await container.stop({ timeout: 10 });
}
};

Expand Down
4 changes: 4 additions & 0 deletions packages/cli-v3/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
"esm"
],
"project": "./tsconfig.src.json",
"exclude": [
"**/*.test.ts"
],
"exports": {
"./package.json": "./package.json",
".": "./src/index.ts"
Expand Down Expand Up @@ -70,6 +73,7 @@
"typecheck": "tsc -p tsconfig.src.json --noEmit",
"build": "tshy && pnpm run update-version",
"dev": "tshy --watch",
"test": "vitest",
"test:e2e": "vitest --run -c ./e2e/vitest.config.ts",
"update-version": "tsx ../../scripts/updateVersion.ts"
},
Expand Down
63 changes: 45 additions & 18 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { intro, outro } from "@clack/prompts";
import { intro, log, outro } from "@clack/prompts";
import { prepareDeploymentError } from "@trigger.dev/core/v3";
import { InitializeDeploymentResponseBody } from "@trigger.dev/core/v3/schemas";
import { Command, Option as CommandOption } from "commander";
import { resolve } from "node:path";
import { x } from "tinyexec";
import { z } from "zod";
import { isCI } from "std-env";
import { CliApiClient } from "../apiClient.js";
import { buildWorker } from "../build/buildWorker.js";
import { resolveAlwaysExternal } from "../build/externals.js";
Expand Down Expand Up @@ -321,24 +322,24 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {

const version = deployment.version;

const deploymentLink = cliLink(
"View deployment",
`${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`
);
const rawDeploymentLink = `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`;
const rawTestLink = `${authorization.dashboardUrl}/projects/v3/${
resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`;

const testLink = cliLink(
"Test tasks",
`${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/test?environment=${
options.env === "prod" ? "prod" : "stg"
}`
);
const deploymentLink = cliLink("View deployment", rawDeploymentLink);
const testLink = cliLink("Test tasks", rawTestLink);

const $spinner = spinner();

if (isLinksSupported) {
$spinner.start(`Building version ${version} ${deploymentLink}`);
if (isCI) {
log.step(`Building version ${version}\n`);
} else {
$spinner.start(`Building version ${version}`);
if (isLinksSupported) {
$spinner.start(`Building version ${version} ${deploymentLink}`);
} else {
$spinner.start(`Building version ${version}`);
}
}

const selfHostedRegistryHost = deployment.registryHost ?? options.registry;
Expand Down Expand Up @@ -368,6 +369,11 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
buildEnvVars: buildManifest.build.env,
network: options.network,
onLog: (logMessage) => {
if (isCI) {
console.log(logMessage);
return;
}

if (isLinksSupported) {
$spinner.message(`Building version ${version} ${deploymentLink}: ${logMessage}`);
} else {
Expand Down Expand Up @@ -441,10 +447,14 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
}`
: `${buildResult.image}${buildResult.digest ? `@${buildResult.digest}` : ""}`;

if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}`);
if (isCI) {
log.step(`Deploying version ${version}\n`);
} else {
$spinner.message(`Deploying version ${version}`);
if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}`);
} else {
$spinner.message(`Deploying version ${version}`);
}
}

const finalizeResponse = await projectClient.client.finalizeDeployment(
Expand All @@ -455,6 +465,11 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
skipPromotion: options.skipPromotion,
},
(logMessage) => {
if (isCI) {
console.log(logMessage);
return;
}

if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}: ${logMessage}`);
} else {
Expand All @@ -475,7 +490,11 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
throw new SkipLoggingError("Failed to finalize deployment");
}

$spinner.stop(`Successfully deployed version ${version}`);
if (isCI) {
log.step(`Successfully deployed version ${version}`);
} else {
$spinner.stop(`Successfully deployed version ${version}`);
}

const taskCount = deploymentWithWorker.worker?.tasks.length ?? 0;

Expand All @@ -485,6 +504,14 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
}`
);

if (!isLinksSupported) {
console.log("View deployment");
console.log(rawDeploymentLink);
console.log(); // new line
console.log("Test tasks");
console.log(rawTestLink);
}

setGithubActionsOutputAndEnvVars({
envVars: {
TRIGGER_DEPLOYMENT_VERSION: version,
Expand Down
19 changes: 5 additions & 14 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import {
getEnvVar,
getNumberEnvVar,
logLevels,
ManagedRuntimeManager,
SharedRuntimeManager,
OtelTaskLogger,
populateEnv,
StandardLifecycleHooksManager,
Expand Down Expand Up @@ -452,20 +452,11 @@ const zodIpc = new ZodIpcConnection({
});
}
},
TASK_RUN_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
WAIT_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
FLUSH: async ({ timeoutInMs }, sender) => {
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
},
WAITPOINT_COMPLETED: async ({ waitpoint }) => {
managedWorkerRuntime.completeWaitpoints([waitpoint]);
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
sharedWorkerRuntime.resolveWaitpoints([waitpoint]);
},
},
});
Expand Down Expand Up @@ -537,8 +528,8 @@ async function flushMetadata(timeoutInMs: number = 10_000) {
};
}

const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, showInternalLogs);
runtime.setGlobalRuntimeManager(managedWorkerRuntime);
const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs);
runtime.setGlobalRuntimeManager(sharedWorkerRuntime);

process.title = "trigger-managed-worker";

Expand Down
19 changes: 5 additions & 14 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
getEnvVar,
getNumberEnvVar,
logLevels,
ManagedRuntimeManager,
SharedRuntimeManager,
OtelTaskLogger,
populateEnv,
ProdUsageManager,
Expand Down Expand Up @@ -445,20 +445,11 @@ const zodIpc = new ZodIpcConnection({
});
}
},
TASK_RUN_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
WAIT_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
FLUSH: async ({ timeoutInMs }, sender) => {
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
},
WAITPOINT_COMPLETED: async ({ waitpoint }) => {
managedWorkerRuntime.completeWaitpoints([waitpoint]);
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
sharedWorkerRuntime.resolveWaitpoints([waitpoint]);
},
},
});
Expand Down Expand Up @@ -565,9 +556,9 @@ function initializeUsageManager({
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
}

const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, true);
const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true);

runtime.setGlobalRuntimeManager(managedWorkerRuntime);
runtime.setGlobalRuntimeManager(sharedWorkerRuntime);

process.title = "trigger-managed-worker";

Expand Down
Loading