1+ import { ClickHouse } from "@internal/clickhouse" ;
12import { ScheduledTaskPayload , parsePacket , prettyPrintPacket } from "@trigger.dev/core/v3" ;
23import {
3- type TaskRunTemplate ,
44 type RuntimeEnvironmentType ,
55 type TaskRunStatus ,
6+ type TaskRunTemplate ,
7+ PrismaClientOrTransaction ,
68} from "@trigger.dev/database" ;
7- import { type PrismaClient , prisma , sqlDatabaseSchema } from "~/db.server" ;
9+ import parse from "parse-duration" ;
10+ import { type PrismaClient } from "~/db.server" ;
11+ import { RunsRepository } from "~/services/runsRepository/runsRepository.server" ;
812import { getTimezones } from "~/utils/timezones.server" ;
913import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server" ;
1014import { queueTypeFromType } from "./QueueRetrievePresenter.server" ;
11- import parse from "parse-duration" ;
1215
1316export type RunTemplate = TaskRunTemplate & {
1417 scheduledTaskPayload ?: ScheduledRun [ "payload" ] ;
@@ -20,6 +23,8 @@ type TestTaskOptions = {
2023 environment : {
2124 id : string ;
2225 type : RuntimeEnvironmentType ;
26+ projectId : string ;
27+ organizationId : string ;
2328 } ;
2429 taskIdentifier : string ;
2530} ;
@@ -111,11 +116,10 @@ export type ScheduledRun = Omit<RawRun, "payload" | "ttl"> & {
111116} ;
112117
113118export class TestTaskPresenter {
114- #prismaClient: PrismaClient ;
115-
116- constructor ( prismaClient : PrismaClient = prisma ) {
117- this . #prismaClient = prismaClient ;
118- }
119+ constructor (
120+ private readonly replica : PrismaClientOrTransaction ,
121+ private readonly clickhouse : ClickHouse
122+ ) { }
119123
120124 public async call ( {
121125 userId,
@@ -128,7 +132,7 @@ export class TestTaskPresenter {
128132 ? (
129133 await findCurrentWorkerDeployment ( { environmentId : environment . id } )
130134 ) ?. worker ?. tasks . find ( ( t ) => t . slug === taskIdentifier )
131- : await this . #prismaClient . backgroundWorkerTask . findFirst ( {
135+ : await this . replica . backgroundWorkerTask . findFirst ( {
132136 where : {
133137 slug : taskIdentifier ,
134138 runtimeEnvironmentId : environment . id ,
@@ -145,7 +149,7 @@ export class TestTaskPresenter {
145149 }
146150
147151 const taskQueue = task . queueId
148- ? await this . #prismaClient . taskQueue . findFirst ( {
152+ ? await this . replica . taskQueue . findFirst ( {
149153 where : {
150154 runtimeEnvironmentId : environment . id ,
151155 id : task . queueId ,
@@ -159,7 +163,7 @@ export class TestTaskPresenter {
159163 } )
160164 : undefined ;
161165
162- const backgroundWorkers = await this . #prismaClient . backgroundWorker . findMany ( {
166+ const backgroundWorkers = await this . replica . backgroundWorker . findMany ( {
163167 where : {
164168 runtimeEnvironmentId : environment . id ,
165169 } ,
@@ -173,7 +177,7 @@ export class TestTaskPresenter {
173177 take : 20 , // last 20 versions should suffice
174178 } ) ;
175179
176- const taskRunTemplates = await this . #prismaClient . taskRunTemplate . findMany ( {
180+ const taskRunTemplates = await this . replica . taskRunTemplate . findMany ( {
177181 where : {
178182 projectId,
179183 taskSlug : task . slug ,
@@ -190,47 +194,55 @@ export class TestTaskPresenter {
190194 const disableVersionSelection = environment . type === "DEVELOPMENT" ;
191195 const allowArbitraryQueues = backgroundWorkers [ 0 ] ?. engine === "V1" ;
192196
193- const latestRuns = await this . #prismaClient. $queryRaw < RawRun [ ] > `
194- WITH taskruns AS (
195- SELECT
196- tr.*
197- FROM
198- ${ sqlDatabaseSchema } ."TaskRun" as tr
199- JOIN
200- ${ sqlDatabaseSchema } ."BackgroundWorkerTask" as bwt
201- ON
202- tr."taskIdentifier" = bwt.slug
203- WHERE
204- bwt."friendlyId" = ${ task . friendlyId } AND
205- tr."runtimeEnvironmentId" = ${ environment . id }
206- ORDER BY
207- tr."createdAt" DESC
208- LIMIT 10
209- )
210- SELECT
211- taskr.id,
212- taskr."queue",
213- taskr."friendlyId",
214- taskr."taskIdentifier",
215- taskr."createdAt",
216- taskr.status,
217- taskr.payload,
218- taskr."payloadType",
219- taskr."seedMetadata",
220- taskr."seedMetadataType",
221- taskr."runtimeEnvironmentId",
222- taskr."concurrencyKey",
223- taskr."maxAttempts",
224- taskr."maxDurationInSeconds",
225- taskr."machinePreset",
226- taskr."ttl",
227- taskr."runTags"
228- FROM
229- taskruns AS taskr
230- WHERE
231- taskr."payloadType" = 'application/json' OR taskr."payloadType" = 'application/super+json'
232- ORDER BY
233- taskr."createdAt" DESC;` ;
197+ // Get the latest runs, for the payloads
198+ const runsRepository = new RunsRepository ( {
199+ clickhouse : this . clickhouse ,
200+ prisma : this . replica as PrismaClient ,
201+ } ) ;
202+
203+ const runIds = await runsRepository . listRunIds ( {
204+ organizationId : environment . organizationId ,
205+ environmentId : environment . id ,
206+ projectId : environment . projectId ,
207+ tasks : [ task . slug ] ,
208+ period : "30d" ,
209+ page : {
210+ size : 10 ,
211+ } ,
212+ } ) ;
213+
214+ const latestRuns = await this . replica . taskRun . findMany ( {
215+ select : {
216+ id : true ,
217+ queue : true ,
218+ friendlyId : true ,
219+ taskIdentifier : true ,
220+ createdAt : true ,
221+ status : true ,
222+ payload : true ,
223+ payloadType : true ,
224+ seedMetadata : true ,
225+ seedMetadataType : true ,
226+ runtimeEnvironmentId : true ,
227+ concurrencyKey : true ,
228+ maxAttempts : true ,
229+ maxDurationInSeconds : true ,
230+ machinePreset : true ,
231+ ttl : true ,
232+ runTags : true ,
233+ } ,
234+ where : {
235+ id : {
236+ in : runIds ,
237+ } ,
238+ payloadType : {
239+ in : [ "application/json" , "application/super+json" ] ,
240+ } ,
241+ } ,
242+ orderBy : {
243+ createdAt : "desc" ,
244+ } ,
245+ } ) ;
234246
235247 const taskWithEnvironment = {
236248 id : task . id ,
@@ -258,6 +270,12 @@ export class TestTaskPresenter {
258270 async ( r ) =>
259271 ( {
260272 ...r ,
273+ seedMetadata : r . seedMetadata ?? undefined ,
274+ seedMetadataType : r . seedMetadataType ?? undefined ,
275+ concurrencyKey : r . concurrencyKey ?? undefined ,
276+ maxAttempts : r . maxAttempts ?? undefined ,
277+ maxDurationInSeconds : r . maxDurationInSeconds ?? undefined ,
278+ machinePreset : r . machinePreset ?? undefined ,
261279 payload : await prettyPrintPacket ( r . payload , r . payloadType ) ,
262280 metadata : r . seedMetadata
263281 ? await prettyPrintPacket ( r . seedMetadata , r . seedMetadataType )
@@ -300,6 +318,12 @@ export class TestTaskPresenter {
300318 if ( payload . success ) {
301319 return {
302320 ...r ,
321+ seedMetadata : r . seedMetadata ?? undefined ,
322+ seedMetadataType : r . seedMetadataType ?? undefined ,
323+ concurrencyKey : r . concurrencyKey ?? undefined ,
324+ maxAttempts : r . maxAttempts ?? undefined ,
325+ maxDurationInSeconds : r . maxDurationInSeconds ?? undefined ,
326+ machinePreset : r . machinePreset ?? undefined ,
303327 payload : payload . data ,
304328 ttlSeconds : r . ttl ? parse ( r . ttl , "s" ) ?? undefined : undefined ,
305329 } satisfies ScheduledRun ;
0 commit comments