diff --git a/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts b/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts index 23fe6bca94..2817b7c8b8 100644 --- a/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts @@ -1,14 +1,17 @@ +import { ClickHouse } from "@internal/clickhouse"; import { ScheduledTaskPayload, parsePacket, prettyPrintPacket } from "@trigger.dev/core/v3"; import { - type TaskRunTemplate, type RuntimeEnvironmentType, type TaskRunStatus, + type TaskRunTemplate, + PrismaClientOrTransaction, } from "@trigger.dev/database"; -import { type PrismaClient, prisma, sqlDatabaseSchema } from "~/db.server"; +import parse from "parse-duration"; +import { type PrismaClient } from "~/db.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { getTimezones } from "~/utils/timezones.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "./QueueRetrievePresenter.server"; -import parse from "parse-duration"; export type RunTemplate = TaskRunTemplate & { scheduledTaskPayload?: ScheduledRun["payload"]; @@ -20,6 +23,8 @@ type TestTaskOptions = { environment: { id: string; type: RuntimeEnvironmentType; + projectId: string; + organizationId: string; }; taskIdentifier: string; }; @@ -111,11 +116,10 @@ export type ScheduledRun = Omit & { }; export class TestTaskPresenter { - #prismaClient: PrismaClient; - - constructor(prismaClient: PrismaClient = prisma) { - this.#prismaClient = prismaClient; - } + constructor( + private readonly replica: PrismaClientOrTransaction, + private readonly clickhouse: ClickHouse + ) {} public async call({ userId, @@ -128,7 +132,7 @@ export class TestTaskPresenter { ? ( await findCurrentWorkerDeployment({ environmentId: environment.id }) )?.worker?.tasks.find((t) => t.slug === taskIdentifier) - : await this.#prismaClient.backgroundWorkerTask.findFirst({ + : await this.replica.backgroundWorkerTask.findFirst({ where: { slug: taskIdentifier, runtimeEnvironmentId: environment.id, @@ -145,7 +149,7 @@ export class TestTaskPresenter { } const taskQueue = task.queueId - ? await this.#prismaClient.taskQueue.findFirst({ + ? await this.replica.taskQueue.findFirst({ where: { runtimeEnvironmentId: environment.id, id: task.queueId, @@ -159,7 +163,7 @@ export class TestTaskPresenter { }) : undefined; - const backgroundWorkers = await this.#prismaClient.backgroundWorker.findMany({ + const backgroundWorkers = await this.replica.backgroundWorker.findMany({ where: { runtimeEnvironmentId: environment.id, }, @@ -173,7 +177,7 @@ export class TestTaskPresenter { take: 20, // last 20 versions should suffice }); - const taskRunTemplates = await this.#prismaClient.taskRunTemplate.findMany({ + const taskRunTemplates = await this.replica.taskRunTemplate.findMany({ where: { projectId, taskSlug: task.slug, @@ -190,47 +194,55 @@ export class TestTaskPresenter { const disableVersionSelection = environment.type === "DEVELOPMENT"; const allowArbitraryQueues = backgroundWorkers[0]?.engine === "V1"; - const latestRuns = await this.#prismaClient.$queryRaw` - WITH taskruns AS ( - SELECT - tr.* - FROM - ${sqlDatabaseSchema}."TaskRun" as tr - JOIN - ${sqlDatabaseSchema}."BackgroundWorkerTask" as bwt - ON - tr."taskIdentifier" = bwt.slug - WHERE - bwt."friendlyId" = ${task.friendlyId} AND - tr."runtimeEnvironmentId" = ${environment.id} - ORDER BY - tr."createdAt" DESC - LIMIT 10 - ) - SELECT - taskr.id, - taskr."queue", - taskr."friendlyId", - taskr."taskIdentifier", - taskr."createdAt", - taskr.status, - taskr.payload, - taskr."payloadType", - taskr."seedMetadata", - taskr."seedMetadataType", - taskr."runtimeEnvironmentId", - taskr."concurrencyKey", - taskr."maxAttempts", - taskr."maxDurationInSeconds", - taskr."machinePreset", - taskr."ttl", - taskr."runTags" - FROM - taskruns AS taskr - WHERE - taskr."payloadType" = 'application/json' OR taskr."payloadType" = 'application/super+json' - ORDER BY - taskr."createdAt" DESC;`; + // Get the latest runs, for the payloads + const runsRepository = new RunsRepository({ + clickhouse: this.clickhouse, + prisma: this.replica as PrismaClient, + }); + + const runIds = await runsRepository.listRunIds({ + organizationId: environment.organizationId, + environmentId: environment.id, + projectId: environment.projectId, + tasks: [task.slug], + period: "30d", + page: { + size: 10, + }, + }); + + const latestRuns = await this.replica.taskRun.findMany({ + select: { + id: true, + queue: true, + friendlyId: true, + taskIdentifier: true, + createdAt: true, + status: true, + payload: true, + payloadType: true, + seedMetadata: true, + seedMetadataType: true, + runtimeEnvironmentId: true, + concurrencyKey: true, + maxAttempts: true, + maxDurationInSeconds: true, + machinePreset: true, + ttl: true, + runTags: true, + }, + where: { + id: { + in: runIds, + }, + payloadType: { + in: ["application/json", "application/super+json"], + }, + }, + orderBy: { + createdAt: "desc", + }, + }); const taskWithEnvironment = { id: task.id, @@ -258,6 +270,12 @@ export class TestTaskPresenter { async (r) => ({ ...r, + seedMetadata: r.seedMetadata ?? undefined, + seedMetadataType: r.seedMetadataType ?? undefined, + concurrencyKey: r.concurrencyKey ?? undefined, + maxAttempts: r.maxAttempts ?? undefined, + maxDurationInSeconds: r.maxDurationInSeconds ?? undefined, + machinePreset: r.machinePreset ?? undefined, payload: await prettyPrintPacket(r.payload, r.payloadType), metadata: r.seedMetadata ? await prettyPrintPacket(r.seedMetadata, r.seedMetadataType) @@ -300,6 +318,12 @@ export class TestTaskPresenter { if (payload.success) { return { ...r, + seedMetadata: r.seedMetadata ?? undefined, + seedMetadataType: r.seedMetadataType ?? undefined, + concurrencyKey: r.concurrencyKey ?? undefined, + maxAttempts: r.maxAttempts ?? undefined, + maxDurationInSeconds: r.maxDurationInSeconds ?? undefined, + machinePreset: r.machinePreset ?? undefined, payload: payload.data, ttlSeconds: r.ttl ? parse(r.ttl, "s") ?? undefined : undefined, } satisfies ScheduledRun; diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx index f104a4e545..5d6fdb80ff 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx @@ -73,6 +73,8 @@ import { DeleteTaskRunTemplateData, RunTemplateData } from "~/v3/taskRunTemplate import { Dialog, DialogContent, DialogHeader, DialogTrigger } from "~/components/primitives/Dialog"; import { DialogClose, DialogDescription } from "@radix-ui/react-dialog"; import { FormButtons } from "~/components/primitives/FormButtons"; +import { $replica } from "~/db.server"; +import { clickhouseClient } from "~/services/clickhouseInstance.server"; type FormAction = "create-template" | "delete-template" | "run-scheduled" | "run-standard"; @@ -96,7 +98,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); } - const presenter = new TestTaskPresenter(); + const presenter = new TestTaskPresenter($replica, clickhouseClient); try { const result = await presenter.call({ userId,