Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend] swf catchup option integration tests #3139

Merged
merged 15 commits into from
Feb 21, 2020
1 change: 1 addition & 0 deletions backend/test/integration/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ go_test(
"//backend/test:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_ghodss_yaml//:go_default_library",
"@com_github_go_openapi_strfmt//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//suite:go_default_library",
166 changes: 166 additions & 0 deletions backend/test/integration/job_api_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (

"github.com/kubeflow/pipelines/backend/test"

"github.com/go-openapi/strfmt"
"github.com/golang/glog"
experimentparams "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client/experiment_service"
"github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_model"
@@ -23,6 +24,12 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
)

const (
second = 1
minute = 60 * second
hour = 60 * minute
)

type JobApiTestSuite struct {
suite.Suite
namespace string
@@ -214,6 +221,109 @@ func (s *JobApiTestSuite) TestJobApis() {
s.checkArgParamsRun(t, argParamsRun, argParamsExperiment.ID, argParamsExperiment.Name, argParamsJob.ID, argParamsJob.Name)
}

func (s *JobApiTestSuite) TestJobApis_noCatchupOption() {
t := s.T()

/* ---------- Upload pipelines YAML ---------- */
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
assert.Nil(t, err)

/* ---------- Create a periodic job with start and end date in the past and catchup = true ---------- */
experiment := &experiment_model.APIExperiment{Name: "periodic catchup true"}
periodicCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job := jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: periodicCatchupTrueExperiment.ID,
periodic: true,
})
job.Name = "periodic-catchup-true-"
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule."
job.NoCatchup = false // This is the key difference.
createJobRequest := &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

/* -------- Create another periodic job with start and end date in the past but catchup = false ------ */
experiment = &experiment_model.APIExperiment{Name: "periodic catchup false"}
periodicCatchupFalseExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: periodicCatchupFalseExperiment.ID,
periodic: true,
})
job.Name = "periodic-catchup-false-"
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule."
job.NoCatchup = true // This is the key difference.
createJobRequest = &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

/* ---------- Create a cron job with start and end date in the past and catchup = true ---------- */
experiment = &experiment_model.APIExperiment{Name: "cron catchup true"}
cronCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: cronCatchupTrueExperiment.ID,
periodic: false,
})
job.Name = "cron-catchup-true-"
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule."
job.NoCatchup = false // This is the key difference.
createJobRequest = &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

/* -------- Create another cron job with start and end date in the past but catchup = false ------ */
experiment = &experiment_model.APIExperiment{Name: "cron catchup false"}
cronCatchupFalseExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)

job = jobInThePastForTwoMinutes(jobOptions{
pipelineId: pipeline.ID,
experimentId: cronCatchupFalseExperiment.ID,
periodic: false,
})
job.Name = "cron-catchup-false-"
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule."
job.NoCatchup = true // This is the key difference.
createJobRequest = &jobparams.CreateJobParams{Body: job}
_, err = s.jobClient.Create(createJobRequest)
assert.Nil(t, err)

// The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent.
// This could take a few seconds to finish.
// TODO: Retry list run every 5 seconds instead of sleeping for 40 seconds.
time.Sleep(40 * time.Second)

/* ---------- Assert number of runs when catchup = true ---------- */
_, runsWhenCatchupTrue, _, err := s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(periodicCatchupTrueExperiment.ID)})
assert.Nil(t, err)
assert.Equal(t, 2, runsWhenCatchupTrue)
_, runsWhenCatchupTrue, _, err = s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(cronCatchupTrueExperiment.ID)})

/* ---------- Assert number of runs when catchup = false ---------- */
_, runsWhenCatchupFalse, _, err := s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(periodicCatchupFalseExperiment.ID)})
assert.Nil(t, err)
assert.Equal(t, 1, runsWhenCatchupFalse)
_, runsWhenCatchupFalse, _, err = s.runClient.List(&runParams.ListRunsParams{
ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)),
ResourceReferenceKeyID: util.StringPointer(cronCatchupFalseExperiment.ID)})
assert.Nil(t, err)
assert.Equal(t, 1, runsWhenCatchupFalse)
}

func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineID string) {
// Check workflow manifest is not empty
assert.Contains(t, job.PipelineSpec.WorkflowManifest, "whalesay")
@@ -320,9 +430,65 @@ func (s *JobApiTestSuite) TearDownSuite() {
}
}

/** ======== the following are util functions ========= **/

func (s *JobApiTestSuite) cleanUp() {
test.DeleteAllExperiments(s.experimentClient, s.T())
test.DeleteAllPipelines(s.pipelineClient, s.T())
test.DeleteAllJobs(s.jobClient, s.T())
test.DeleteAllRuns(s.runClient, s.T())
}

func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob {
return &job_model.APIJob{
Name: "default-pipeline-name",
Description: "This is a default pipeline",
PipelineSpec: &job_model.APIPipelineSpec{
PipelineID: pipelineId,
},
ResourceReferences: []*job_model.APIResourceReference{
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentId},
Relationship: job_model.APIRelationshipOWNER},
},
MaxConcurrency: 10,
NoCatchup: false,
Trigger: &job_model.APITrigger{
PeriodicSchedule: &job_model.APIPeriodicSchedule{
StartTime: strfmt.NewDateTime(),
EndTime: strfmt.NewDateTime(),
IntervalSecond: 60,
},
},
Enabled: true,
}
}

type jobOptions struct {
pipelineId, experimentId string
periodic bool
}

func jobInThePastForTwoMinutes(options jobOptions) *job_model.APIJob {
startTime := strfmt.DateTime(time.Unix(10*hour, 0))
endTime := strfmt.DateTime(time.Unix(10*hour+2*minute, 0))

job := defaultApiJob(options.pipelineId, options.experimentId)
if options.periodic {
job.Trigger = &job_model.APITrigger{
PeriodicSchedule: &job_model.APIPeriodicSchedule{
StartTime: startTime,
EndTime: endTime,
IntervalSecond: 60, // Runs every 1 minute.
},
}
} else {
job.Trigger = &job_model.APITrigger{
CronSchedule: &job_model.APICronSchedule{
StartTime: startTime,
EndTime: endTime,
Cron: "0 * * * * ?", // Runs every 1 minute.
},
}
}
return job
}