Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 77 additions & 53 deletions apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { ClickHouse } from "@internal/clickhouse";
import { ScheduledTaskPayload, parsePacket, prettyPrintPacket } from "@trigger.dev/core/v3";
import {
type TaskRunTemplate,
type RuntimeEnvironmentType,
type TaskRunStatus,
type TaskRunTemplate,
PrismaClientOrTransaction,
} from "@trigger.dev/database";
import { type PrismaClient, prisma, sqlDatabaseSchema } from "~/db.server";
import parse from "parse-duration";
import { type PrismaClient } from "~/db.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getTimezones } from "~/utils/timezones.server";
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
import { queueTypeFromType } from "./QueueRetrievePresenter.server";
import parse from "parse-duration";

export type RunTemplate = TaskRunTemplate & {
scheduledTaskPayload?: ScheduledRun["payload"];
Expand All @@ -20,6 +23,8 @@ type TestTaskOptions = {
environment: {
id: string;
type: RuntimeEnvironmentType;
projectId: string;
organizationId: string;
};
taskIdentifier: string;
};
Expand Down Expand Up @@ -111,11 +116,10 @@ export type ScheduledRun = Omit<RawRun, "payload" | "ttl"> & {
};

export class TestTaskPresenter {
#prismaClient: PrismaClient;

constructor(prismaClient: PrismaClient = prisma) {
this.#prismaClient = prismaClient;
}
constructor(
private readonly replica: PrismaClientOrTransaction,
private readonly clickhouse: ClickHouse
) {}

public async call({
userId,
Expand All @@ -128,7 +132,7 @@ export class TestTaskPresenter {
? (
await findCurrentWorkerDeployment({ environmentId: environment.id })
)?.worker?.tasks.find((t) => t.slug === taskIdentifier)
: await this.#prismaClient.backgroundWorkerTask.findFirst({
: await this.replica.backgroundWorkerTask.findFirst({
where: {
slug: taskIdentifier,
runtimeEnvironmentId: environment.id,
Expand All @@ -145,7 +149,7 @@ export class TestTaskPresenter {
}

const taskQueue = task.queueId
? await this.#prismaClient.taskQueue.findFirst({
? await this.replica.taskQueue.findFirst({
where: {
runtimeEnvironmentId: environment.id,
id: task.queueId,
Expand All @@ -159,7 +163,7 @@ export class TestTaskPresenter {
})
: undefined;

const backgroundWorkers = await this.#prismaClient.backgroundWorker.findMany({
const backgroundWorkers = await this.replica.backgroundWorker.findMany({
where: {
runtimeEnvironmentId: environment.id,
},
Expand All @@ -173,7 +177,7 @@ export class TestTaskPresenter {
take: 20, // last 20 versions should suffice
});

const taskRunTemplates = await this.#prismaClient.taskRunTemplate.findMany({
const taskRunTemplates = await this.replica.taskRunTemplate.findMany({
where: {
projectId,
taskSlug: task.slug,
Expand All @@ -190,47 +194,55 @@ export class TestTaskPresenter {
const disableVersionSelection = environment.type === "DEVELOPMENT";
const allowArbitraryQueues = backgroundWorkers[0]?.engine === "V1";

const latestRuns = await this.#prismaClient.$queryRaw<RawRun[]>`
WITH taskruns AS (
SELECT
tr.*
FROM
${sqlDatabaseSchema}."TaskRun" as tr
JOIN
${sqlDatabaseSchema}."BackgroundWorkerTask" as bwt
ON
tr."taskIdentifier" = bwt.slug
WHERE
bwt."friendlyId" = ${task.friendlyId} AND
tr."runtimeEnvironmentId" = ${environment.id}
ORDER BY
tr."createdAt" DESC
LIMIT 10
)
SELECT
taskr.id,
taskr."queue",
taskr."friendlyId",
taskr."taskIdentifier",
taskr."createdAt",
taskr.status,
taskr.payload,
taskr."payloadType",
taskr."seedMetadata",
taskr."seedMetadataType",
taskr."runtimeEnvironmentId",
taskr."concurrencyKey",
taskr."maxAttempts",
taskr."maxDurationInSeconds",
taskr."machinePreset",
taskr."ttl",
taskr."runTags"
FROM
taskruns AS taskr
WHERE
taskr."payloadType" = 'application/json' OR taskr."payloadType" = 'application/super+json'
ORDER BY
taskr."createdAt" DESC;`;
// Get the latest runs, for the payloads
const runsRepository = new RunsRepository({
clickhouse: this.clickhouse,
prisma: this.replica as PrismaClient,
});

const runIds = await runsRepository.listRunIds({
organizationId: environment.organizationId,
environmentId: environment.id,
projectId: environment.projectId,
tasks: [task.slug],
period: "30d",
page: {
size: 10,
},
});

const latestRuns = await this.replica.taskRun.findMany({
select: {
id: true,
queue: true,
friendlyId: true,
taskIdentifier: true,
createdAt: true,
status: true,
payload: true,
payloadType: true,
seedMetadata: true,
seedMetadataType: true,
runtimeEnvironmentId: true,
concurrencyKey: true,
maxAttempts: true,
maxDurationInSeconds: true,
machinePreset: true,
ttl: true,
runTags: true,
},
where: {
id: {
in: runIds,
},
payloadType: {
in: ["application/json", "application/super+json"],
},
},
orderBy: {
createdAt: "desc",
},
});

const taskWithEnvironment = {
id: task.id,
Expand Down Expand Up @@ -258,6 +270,12 @@ export class TestTaskPresenter {
async (r) =>
({
...r,
seedMetadata: r.seedMetadata ?? undefined,
seedMetadataType: r.seedMetadataType ?? undefined,
concurrencyKey: r.concurrencyKey ?? undefined,
maxAttempts: r.maxAttempts ?? undefined,
maxDurationInSeconds: r.maxDurationInSeconds ?? undefined,
machinePreset: r.machinePreset ?? undefined,
payload: await prettyPrintPacket(r.payload, r.payloadType),
metadata: r.seedMetadata
? await prettyPrintPacket(r.seedMetadata, r.seedMetadataType)
Expand Down Expand Up @@ -300,6 +318,12 @@ export class TestTaskPresenter {
if (payload.success) {
return {
...r,
seedMetadata: r.seedMetadata ?? undefined,
seedMetadataType: r.seedMetadataType ?? undefined,
concurrencyKey: r.concurrencyKey ?? undefined,
maxAttempts: r.maxAttempts ?? undefined,
maxDurationInSeconds: r.maxDurationInSeconds ?? undefined,
machinePreset: r.machinePreset ?? undefined,
payload: payload.data,
ttlSeconds: r.ttl ? parse(r.ttl, "s") ?? undefined : undefined,
} satisfies ScheduledRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ import { DeleteTaskRunTemplateData, RunTemplateData } from "~/v3/taskRunTemplate
import { Dialog, DialogContent, DialogHeader, DialogTrigger } from "~/components/primitives/Dialog";
import { DialogClose, DialogDescription } from "@radix-ui/react-dialog";
import { FormButtons } from "~/components/primitives/FormButtons";
import { $replica } from "~/db.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";

type FormAction = "create-template" | "delete-template" | "run-scheduled" | "run-standard";

Expand All @@ -96,7 +98,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
});
}

const presenter = new TestTaskPresenter();
const presenter = new TestTaskPresenter($replica, clickhouseClient);
try {
const result = await presenter.call({
userId,
Expand Down