Skip to content

Commit

Permalink
Store job entry ID for fixed rate scheduler and also fixes cron sched…
Browse files Browse the repository at this point in the history
…ule for missing key (flyteorg#431)

* Store job entry ID for fixed rate scheduler

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Add kickoff time input arg only if the user set it in the workflow

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Added unit test for no kick off time arg

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>
  • Loading branch information
pmahindrakar-oss authored May 27, 2022
1 parent 268b0b7 commit 4080d3f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
4 changes: 3 additions & 1 deletion flyteadmin/scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo
var jobFunc cron.TimedFuncJob
jobFunc = job.Run

g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
// Update the enttry id in the job which is handle to be used for removal
job.entryID = entryID
logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v",
job.nameOfSchedule, job.schedule)

Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/scheduler/executor/executor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (w *executor) Execute(ctx context.Context, scheduledTime time.Time, s model

literalsInputMap := map[string]*core.Literal{}
// Only add kickoff time input arg for cron based schedules
if len(s.CronExpression) > 0 {
if len(s.CronExpression) > 0 && len(s.KickoffTimeInputArg) > 0 {
literalsInputMap[s.KickoffTimeInputArg] = &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Expand Down
42 changes: 29 additions & 13 deletions flyteadmin/scheduler/executor/executor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,36 @@ func setupExecutor(scope string) Executor {
func TestExecutor(t *testing.T) {
executor := setupExecutor("testExecutor1")
active := true
schedule := models.SchedulableEntity{
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron_schedule",
Version: "v1",
},
CronExpression: "*/1 * * * *",
KickoffTimeInputArg: "kickoff_time",
Active: &active,
}
mockAdminClient.OnCreateExecutionMatch(context.Background(), mock.Anything).Return(&admin.ExecutionCreateResponse{}, nil)
err := executor.Execute(context.Background(), time.Now(), schedule)
assert.Nil(t, err)
t.Run("kickoff_time_arg", func(t *testing.T) {
schedule := models.SchedulableEntity{
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron_schedule",
Version: "v1",
},
CronExpression: "*/1 * * * *",
KickoffTimeInputArg: "kickoff_time",
Active: &active,
}
err := executor.Execute(context.Background(), time.Now(), schedule)
assert.Nil(t, err)
})
t.Run("without kickoff_time_arg", func(t *testing.T) {
schedule := models.SchedulableEntity{
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron_schedule",
Version: "v1",
},
CronExpression: "*/1 * * * *",
Active: &active,
}
err := executor.Execute(context.Background(), time.Now(), schedule)
assert.Nil(t, err)
})
}

func TestExecutorAlreadyExists(t *testing.T) {
Expand Down

0 comments on commit 4080d3f

Please sign in to comment.