Skip to content

fix: runs.retrieve now uses the output stored on the TaskRun table instead of deprecated attempts table #1853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const commonRunSelect = {
friendlyId: true,
},
},
runTags: true,
} satisfies Prisma.TaskRunSelect;

type CommonRelatedRun = Prisma.Result<
Expand All @@ -69,26 +70,29 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
friendlyId,
runtimeEnvironmentId: env.id,
},
include: {
attempts: true,
lockedToVersion: true,
tags: true,
batch: {
select: {
...commonRunSelect,
payload: true,
payloadType: true,
output: true,
outputType: true,
error: true,
attempts: {
select: {
id: true,
friendlyId: true,
},
},
attemptNumber: true,
engine: true,
taskEventStore: true,
parentTaskRun: {
select: commonRunSelect,
},
rootTaskRun: {
select: commonRunSelect,
},
childRuns: {
select: {
...commonRunSelect,
},
select: commonRunSelect,
},
},
});
Expand Down Expand Up @@ -124,29 +128,23 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
}

if (taskRun.status === "COMPLETED_SUCCESSFULLY") {
const completedAttempt = taskRun.attempts.find(
(a) => a.status === "COMPLETED" && typeof a.output !== null
);

if (completedAttempt && completedAttempt.output) {
const outputPacket = await conditionallyImportPacket({
data: completedAttempt.output,
dataType: completedAttempt.outputType,
});
const outputPacket = await conditionallyImportPacket({
data: taskRun.output ?? undefined,
dataType: taskRun.outputType,
});

if (
outputPacket.dataType === "application/store" &&
typeof outputPacket.data === "string"
) {
$outputPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
outputPacket.data,
"GET"
);
} else {
$output = await parsePacket(outputPacket);
}
if (
outputPacket.dataType === "application/store" &&
typeof outputPacket.data === "string"
) {
$outputPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
outputPacket.data,
"GET"
);
} else {
$output = await parsePacket(outputPacket);
}
}

Expand All @@ -159,7 +157,8 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
error: ApiRetrieveRunPresenter.apiErrorFromError(taskRun.error),
schedule: await resolveSchedule(taskRun),
// We're removing attempts from the API
attemptCount: taskRun.attempts.length,
attemptCount:
taskRun.engine === "V1" ? taskRun.attempts.length : taskRun.attemptNumber ?? 0,
attempts: [],
relatedRuns: {
root: taskRun.rootTaskRun
Expand Down
85 changes: 84 additions & 1 deletion references/hello-world/src/trigger/tags.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { logger, task, wait } from "@trigger.dev/sdk";
import { logger, runs, task, wait } from "@trigger.dev/sdk";
import assert from "node:assert";
import { setTimeout } from "node:timers/promises";

export const tagsTester = task({
id: "tags-tester",
Expand All @@ -20,3 +22,84 @@ export const tagsChildTask = task({
logger.log("Hello, world from the child", { payload });
},
});

// Task that will be triggered with tags
export const taggedTask = task({
id: "tagged-task",
run: async (payload: { waitSeconds: number }, { ctx }) => {
logger.info("Running tagged task", { tags: ctx.run.tags });

// Verify initial tags from trigger
const expectedInitialTags = ["test-tag-1", "test-tag-2"];
for (const tag of expectedInitialTags) {
if (!ctx.run.tags.includes(tag)) {
throw new Error(`Expected tag ${tag} to be present initially`);
}
}

// Wait a bit to ensure we can query the running task
await setTimeout(payload.waitSeconds * 1000);

return {
initialTags: ctx.run.tags,
};
},
});

// Test task that verifies tag behavior
export const tagTestTask = task({
id: "tag-test",
run: async (payload, { ctx }) => {
logger.info("Starting tag verification test");

// Trigger a task with initial tags
const handle = await taggedTask.trigger(
{ waitSeconds: 3 },
{
tags: ["test-tag-1", "test-tag-2"],
}
);

// Wait a moment to ensure the task is running
await setTimeout(3_000);

// Query for running tasks with our tags
const runningTasks = await runs.list({
status: "EXECUTING",
tag: ["test-tag-1", "test-tag-2"],
});

let foundRun = false;
for await (const run of runningTasks) {
if (run.id === handle.id) {
foundRun = true;
break;
}
}

if (!foundRun) {
throw new Error("Could not find running task with tags test-tag-1 and test-tag-2");
}

logger.info("Found running task with tags test-tag-1 and test-tag-2");

await wait.for({ seconds: 10 });

const finalRun = await runs.retrieve<typeof taggedTask>(handle.id);

logger.info("Final run", { finalRun });

assert.ok(finalRun.status === "COMPLETED", "Run should be completed");
assert.ok(finalRun.output, "Output should be defined");

// Verify the tags were preserved in the task context
const outputTags = finalRun.output.initialTags;
if (!outputTags.includes("test-tag-1") || !outputTags.includes("test-tag-2")) {
throw new Error(
`Expected tags test-tag-1 and test-tag-2 in output, got ${outputTags.join(", ")}`
);
}

logger.info("✅ Tag verification test passed");
},
});