11import { Prisma , TaskSchedule } from "@trigger.dev/database" ;
22import cronstrue from "cronstrue" ;
33import { nanoid } from "nanoid" ;
4- import { $transaction , PrismaClientOrTransaction } from "~/db.server" ;
4+ import { $transaction } from "~/db.server" ;
55import { generateFriendlyId } from "../friendlyIdentifiers" ;
66import { UpsertSchedule } from "../schedules" ;
77import { calculateNextScheduledTimestamp } from "../utils/calculateNextSchedule.server" ;
@@ -31,124 +31,45 @@ export class UpsertTaskScheduleService extends BaseService {
3131 const checkSchedule = new CheckScheduleService ( this . _prisma ) ;
3232 await checkSchedule . call ( projectId , schedule ) ;
3333
34- const result = await $transaction ( this . _prisma , async ( tx ) => {
35- const deduplicationKey =
36- typeof schedule . deduplicationKey === "string" && schedule . deduplicationKey !== ""
37- ? schedule . deduplicationKey
38- : nanoid ( 24 ) ;
34+ const deduplicationKey =
35+ typeof schedule . deduplicationKey === "string" && schedule . deduplicationKey !== ""
36+ ? schedule . deduplicationKey
37+ : nanoid ( 24 ) ;
3938
40- const existingSchedule = schedule . friendlyId
41- ? await tx . taskSchedule . findUnique ( {
42- where : {
43- friendlyId : schedule . friendlyId ,
44- } ,
45- } )
46- : await tx . taskSchedule . findUnique ( {
47- where : {
48- projectId_deduplicationKey : {
49- projectId,
50- deduplicationKey,
51- } ,
39+ const existingSchedule = schedule . friendlyId
40+ ? await this . _prisma . taskSchedule . findUnique ( {
41+ where : {
42+ friendlyId : schedule . friendlyId ,
43+ } ,
44+ } )
45+ : await this . _prisma . taskSchedule . findUnique ( {
46+ where : {
47+ projectId_deduplicationKey : {
48+ projectId,
49+ deduplicationKey,
5250 } ,
53- } ) ;
51+ } ,
52+ } ) ;
5453
54+ const result = await ( async ( tx ) => {
5555 if ( existingSchedule ) {
5656 if ( existingSchedule . type === "DECLARATIVE" ) {
5757 throw new ServiceValidationError ( "Cannot update a declarative schedule" ) ;
5858 }
5959
60- return await this . #updateExistingSchedule( tx , existingSchedule , schedule , projectId ) ;
60+ return await this . #updateExistingSchedule( existingSchedule , schedule ) ;
6161 } else {
62- return await this . #createNewSchedule( tx , schedule , projectId , deduplicationKey ) ;
62+ return await this . #createNewSchedule( schedule , projectId , deduplicationKey ) ;
6363 }
64- } ) ;
64+ } ) ( ) ;
6565
6666 if ( ! result ) {
67- throw new Error ( "Failed to create or update the schedule" ) ;
68- }
69-
70- const { scheduleRecord, instances } = result ;
71-
72- return this . #createReturnObject( scheduleRecord , instances ) ;
73- }
74-
75- async #createNewSchedule(
76- tx : PrismaClientOrTransaction ,
77- options : UpsertTaskScheduleServiceOptions ,
78- projectId : string ,
79- deduplicationKey : string
80- ) {
81- const scheduleRecord = await tx . taskSchedule . create ( {
82- data : {
83- projectId,
84- friendlyId : generateFriendlyId ( "sched" ) ,
85- taskIdentifier : options . taskIdentifier ,
86- deduplicationKey,
87- userProvidedDeduplicationKey :
88- options . deduplicationKey !== undefined && options . deduplicationKey !== "" ,
89- generatorExpression : options . cron ,
90- generatorDescription : cronstrue . toString ( options . cron ) ,
91- timezone : options . timezone ?? "UTC" ,
92- externalId : options . externalId ? options . externalId : undefined ,
93- } ,
94- } ) ;
95-
96- const registerNextService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
97-
98- //create the instances (links to environments)
99- let instances : InstanceWithEnvironment [ ] = [ ] ;
100- for ( const environmentId of options . environments ) {
101- const instance = await tx . taskScheduleInstance . create ( {
102- data : {
103- taskScheduleId : scheduleRecord . id ,
104- environmentId,
105- } ,
106- include : {
107- environment : {
108- include : {
109- orgMember : {
110- include : {
111- user : true ,
112- } ,
113- } ,
114- } ,
115- } ,
116- } ,
117- } ) ;
118-
119- await registerNextService . call ( instance . id ) ;
120-
121- instances . push ( instance ) ;
67+ throw new ServiceValidationError ( "Failed to create or update schedule" ) ;
12268 }
12369
124- return { scheduleRecord, instances } ;
125- }
70+ const { scheduleRecord } = result ;
12671
127- async #updateExistingSchedule(
128- tx : PrismaClientOrTransaction ,
129- existingSchedule : TaskSchedule ,
130- options : UpsertTaskScheduleServiceOptions ,
131- projectId : string
132- ) {
133- //update the schedule
134- const scheduleRecord = await tx . taskSchedule . update ( {
135- where : {
136- id : existingSchedule . id ,
137- } ,
138- data : {
139- generatorExpression : options . cron ,
140- generatorDescription : cronstrue . toString ( options . cron ) ,
141- timezone : options . timezone ?? "UTC" ,
142- externalId : options . externalId ? options . externalId : null ,
143- } ,
144- } ) ;
145-
146- const scheduleHasChanged =
147- scheduleRecord . generatorExpression !== existingSchedule . generatorExpression ||
148- scheduleRecord . timezone !== existingSchedule . timezone ;
149-
150- // find the existing instances
151- const existingInstances = await tx . taskScheduleInstance . findMany ( {
72+ const instances = await this . _prisma . taskScheduleInstance . findMany ( {
15273 where : {
15374 taskScheduleId : scheduleRecord . id ,
15475 } ,
@@ -165,18 +86,35 @@ export class UpsertTaskScheduleService extends BaseService {
16586 } ,
16687 } ) ;
16788
168- // create the new instances
169- const newInstances : InstanceWithEnvironment [ ] = [ ] ;
170- const updatingInstances : InstanceWithEnvironment [ ] = [ ] ;
89+ return this . #createReturnObject( scheduleRecord , instances ) ;
90+ }
91+
92+ async #createNewSchedule(
93+ options : UpsertTaskScheduleServiceOptions ,
94+ projectId : string ,
95+ deduplicationKey : string
96+ ) {
97+ return await $transaction ( this . _prisma , async ( tx ) => {
98+ const scheduleRecord = await tx . taskSchedule . create ( {
99+ data : {
100+ projectId,
101+ friendlyId : generateFriendlyId ( "sched" ) ,
102+ taskIdentifier : options . taskIdentifier ,
103+ deduplicationKey,
104+ userProvidedDeduplicationKey :
105+ options . deduplicationKey !== undefined && options . deduplicationKey !== "" ,
106+ generatorExpression : options . cron ,
107+ generatorDescription : cronstrue . toString ( options . cron ) ,
108+ timezone : options . timezone ?? "UTC" ,
109+ externalId : options . externalId ? options . externalId : undefined ,
110+ } ,
111+ } ) ;
112+
113+ const registerNextService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
171114
172- for ( const environmentId of options . environments ) {
173- const existingInstance = existingInstances . find ( ( i ) => i . environmentId === environmentId ) ;
115+ //create the instances (links to environments)
174116
175- if ( existingInstance ) {
176- // Update the existing instance
177- updatingInstances . push ( existingInstance ) ;
178- } else {
179- // Create a new instance
117+ for ( const environmentId of options . environments ) {
180118 const instance = await tx . taskScheduleInstance . create ( {
181119 data : {
182120 taskScheduleId : scheduleRecord . id ,
@@ -195,39 +133,21 @@ export class UpsertTaskScheduleService extends BaseService {
195133 } ,
196134 } ) ;
197135
198- newInstances . push ( instance ) ;
136+ await registerNextService . call ( instance . id ) ;
199137 }
200- }
201-
202- // find the instances that need to be removed
203- const instancesToDeleted = existingInstances . filter (
204- ( i ) => ! options . environments . includes ( i . environmentId )
205- ) ;
206138
207- // delete the instances no longer selected
208- for ( const instance of instancesToDeleted ) {
209- await tx . taskScheduleInstance . delete ( {
210- where : {
211- id : instance . id ,
212- } ,
213- } ) ;
214- }
215-
216- const registerService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
217-
218- for ( const instance of newInstances ) {
219- await registerService . call ( instance . id ) ;
220- }
221-
222- if ( scheduleHasChanged ) {
223- for ( const instance of updatingInstances ) {
224- await registerService . call ( instance . id ) ;
225- }
226- }
139+ return { scheduleRecord } ;
140+ } ) ;
141+ }
227142
228- const instances = await tx . taskScheduleInstance . findMany ( {
143+ async #updateExistingSchedule(
144+ existingSchedule : TaskSchedule ,
145+ options : UpsertTaskScheduleServiceOptions
146+ ) {
147+ // find the existing instances
148+ const existingInstances = await this . _prisma . taskScheduleInstance . findMany ( {
229149 where : {
230- taskScheduleId : scheduleRecord . id ,
150+ taskScheduleId : existingSchedule . id ,
231151 } ,
232152 include : {
233153 environment : {
@@ -242,7 +162,89 @@ export class UpsertTaskScheduleService extends BaseService {
242162 } ,
243163 } ) ;
244164
245- return { scheduleRecord, instances } ;
165+ return await $transaction (
166+ this . _prisma ,
167+ async ( tx ) => {
168+ const scheduleRecord = await tx . taskSchedule . update ( {
169+ where : {
170+ id : existingSchedule . id ,
171+ } ,
172+ data : {
173+ generatorExpression : options . cron ,
174+ generatorDescription : cronstrue . toString ( options . cron ) ,
175+ timezone : options . timezone ?? "UTC" ,
176+ externalId : options . externalId ? options . externalId : null ,
177+ } ,
178+ } ) ;
179+
180+ const scheduleHasChanged =
181+ scheduleRecord . generatorExpression !== existingSchedule . generatorExpression ||
182+ scheduleRecord . timezone !== existingSchedule . timezone ;
183+
184+ // create the new instances
185+ const newInstances : InstanceWithEnvironment [ ] = [ ] ;
186+ const updatingInstances : InstanceWithEnvironment [ ] = [ ] ;
187+
188+ for ( const environmentId of options . environments ) {
189+ const existingInstance = existingInstances . find ( ( i ) => i . environmentId === environmentId ) ;
190+
191+ if ( existingInstance ) {
192+ // Update the existing instance
193+ updatingInstances . push ( existingInstance ) ;
194+ } else {
195+ // Create a new instance
196+ const instance = await tx . taskScheduleInstance . create ( {
197+ data : {
198+ taskScheduleId : scheduleRecord . id ,
199+ environmentId,
200+ } ,
201+ include : {
202+ environment : {
203+ include : {
204+ orgMember : {
205+ include : {
206+ user : true ,
207+ } ,
208+ } ,
209+ } ,
210+ } ,
211+ } ,
212+ } ) ;
213+
214+ newInstances . push ( instance ) ;
215+ }
216+ }
217+
218+ // find the instances that need to be removed
219+ const instancesToDeleted = existingInstances . filter (
220+ ( i ) => ! options . environments . includes ( i . environmentId )
221+ ) ;
222+
223+ // delete the instances no longer selected
224+ for ( const instance of instancesToDeleted ) {
225+ await tx . taskScheduleInstance . delete ( {
226+ where : {
227+ id : instance . id ,
228+ } ,
229+ } ) ;
230+ }
231+
232+ const registerService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
233+
234+ for ( const instance of newInstances ) {
235+ await registerService . call ( instance . id ) ;
236+ }
237+
238+ if ( scheduleHasChanged ) {
239+ for ( const instance of updatingInstances ) {
240+ await registerService . call ( instance . id ) ;
241+ }
242+ }
243+
244+ return { scheduleRecord } ;
245+ } ,
246+ { timeout : 10_000 }
247+ ) ;
246248 }
247249
248250 #createReturnObject( taskSchedule : TaskSchedule , instances : InstanceWithEnvironment [ ] ) {
0 commit comments