From 143df43b1abc91b56d44bbbffeb17792e8927a2e Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Wed, 10 Oct 2018 10:48:01 -0700 Subject: [PATCH 1/7] Combine logic to grab Tasks and their Runs for a PipelineRun It turns out that we need to look at TaskRuns for a few reasons, including 1) figuring out what to run next and 2) determining the status of the PipelineRun, so I've refactored the logic that grabs these to collect a bunch of related state that can be reused. When the graph becomes more sophisticated, we will need to make this structure more than just a list. --- pkg/apis/pipeline/v1alpha1/pipeline_test.go | 69 -------- pkg/apis/pipeline/v1alpha1/pipeline_types.go | 22 --- .../v1alpha1/pipelinerun/pipelinerun.go | 30 ++-- .../pipelinerun/resources/pipelinestate.go | 99 +++++++++++ .../resources/pipelinestate_test.go | 167 ++++++++++++++++++ .../pipelinerun/resources/taskruns.go | 55 ------ .../pipelinerun/resources/taskruns_test.go | 98 ---------- test/pipelinerun_test.go | 2 - 8 files changed, 280 insertions(+), 262 deletions(-) delete mode 100644 pkg/apis/pipeline/v1alpha1/pipeline_test.go create mode 100644 pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go create mode 100644 pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go delete mode 100644 pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns.go delete mode 100644 pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns_test.go diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_test.go b/pkg/apis/pipeline/v1alpha1/pipeline_test.go deleted file mode 100644 index bcf61eb8175..00000000000 --- a/pkg/apis/pipeline/v1alpha1/pipeline_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2018 The Knative Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package v1alpha1 - -import ( - "fmt" - "testing" - - "github.com/google/go-cmp/cmp" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -var p = &Pipeline{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace", - Name: "pipeline", - }, - Spec: PipelineSpec{ - Tasks: []PipelineTask{{ - Name: "mytask1", - TaskRef: TaskRef{Name: "task"}, - }, { - Name: "mytask2", - TaskRef: TaskRef{Name: "task"}, - }}, - }, -} - -func TestGetTasks(t *testing.T) { - task := &Task{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace", - Name: "task", - }, - Spec: TaskSpec{}, - } - tasks, err := p.GetTasks(func(namespace, name string) (*Task, error) { - return task, nil - }) - if err != nil { - t.Fatalf("Error getting tasks for fake pipeline %s: %s", p.ObjectMeta.Name, err) - } - expectedTasks := map[string]*Task{ - "mytask1": task, - "mytask2": task, - } - if d := cmp.Diff(tasks, expectedTasks); d != "" { - t.Fatalf("Expected to get map of tasks %v, but actual differed: %s", expectedTasks, d) - } -} - -func TestGetTasksDoesntExist(t *testing.T) { - _, err := p.GetTasks(func(namespace, name string) (*Task, error) { - return nil, fmt.Errorf("failed to get tasks for pipeline %s", p.Name) - }) - if err == nil { - t.Fatalf("Expected error getting non-existent Tasks for Pipeline %s but got none", p.ObjectMeta.Name) - } -} diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_types.go b/pkg/apis/pipeline/v1alpha1/pipeline_types.go index 01ffea851da..d8409557bf0 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_types.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1alpha1 import ( - "fmt" "github.com/knative/pkg/apis" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -133,24 +132,3 @@ type PipelineList struct { metav1.ListMeta `json:"metadata,omitempty"` Items []Pipeline `json:"items"` } - -// GetTask is a function that will retrieve the Task name from namespace. -type GetTask func(namespace, name string) (*Task, error) - -// GetTasks retrieves all Tasks instances which the pipeline p references, getting -// instances from function g. If it is unable to retrieve an instance of a referenced -// Task, it will return an error, otherwise it returns a map from the name of the -// Task in the Pipeline to the name of the Task object itself. -func (p *Pipeline)GetTasks(g GetTask) (map[string]*Task, error) { - tasks := map[string]*Task{} - for _, pt := range p.Spec.Tasks { - t, err := g(p.Namespace, pt.TaskRef.Name) - if err != nil { - return nil, fmt.Errorf("failed to get tasks for Pipeline %q: Error getting task %q : %s", - fmt.Sprintf("%s/%s", p.Namespace, p.Name), - fmt.Sprintf("%s/%s", p.Namespace, pt.TaskRef.Name), err) - } - tasks[pt.Name] = t - } - return tasks, nil -} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 029b742e433..56d47a4a041 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -134,7 +134,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) error { - // fetch the equivelant pipeline for this pipelinerun Run p, err := c.pipelineLister.Pipelines(pr.Namespace).Get(pr.Spec.PipelineRef.Name) if err != nil { c.Logger.Errorf("%q failed to Get Pipeline: %q", @@ -142,30 +141,29 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er fmt.Sprintf("%s/%s", pr.Namespace, pr.Spec.PipelineRef.Name)) return nil } - pipelineTasks, err := p.GetTasks(func(namespace, name string) (*v1alpha1.Task, error) { - return c.taskLister.Tasks(namespace).Get(name) - }) - if err != nil { - return fmt.Errorf("error getting Tasks for Pipeline %s, Pipeline is invalid!: %s", p.Name, err) - } - pipelineTaskName, trName, err := resources.GetNextPipelineRunTaskRun( + pipelineTaskRuns, err := resources.GetPipelineState( + func(namespace, name string) (*v1alpha1.Task, error) { + return c.taskLister.Tasks(namespace).Get(name) + }, func(namespace, name string) (*v1alpha1.TaskRun, error) { return c.taskRunLister.TaskRuns(namespace).Get(name) }, - p, pr.Name) + p, pr.Name, + ) if err != nil { - return fmt.Errorf("error getting next TaskRun to create for PipelineRun %s: %s", pr.Name, err) + return fmt.Errorf("error getting Tasks for Pipeline %s, Pipeline may be invalid!: %s", p.Name, err) } - if pipelineTaskName != "" { - _, err = c.createTaskRun(pipelineTasks[pipelineTaskName], trName, pr) - if err != nil { - return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", trName, pipelineTaskName, pr.Name, err) - } + prtr := resources.GetNextTask(pipelineTaskRuns) + prtr.TaskRun, err = c.createTaskRun(prtr.Task, prtr.TaskRunName, pr) + if err != nil { + return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err) } // TODO fetch the taskruns status for this pipeline run. + // get all taskruns for all tasks - // TODO check status of tasks and update status of PipelineRuns + // if any either dont exist yet or are themselves unknown, status is unknown + // if the status of any is failed, then this pipeline run is failed and we should stop trying to run more return nil } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go new file mode 100644 index 00000000000..c32bc623008 --- /dev/null +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go @@ -0,0 +1,99 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" +) + +// GetNextTask returns the first Task in pipelineTaskRuns that does +// not have a corresponding TaskRun and can run. +func GetNextTask(pipelineTaskRuns []*PipelineRunTaskRun) *PipelineRunTaskRun { + for _, prtr := range pipelineTaskRuns { + if prtr.TaskRun == nil && canTaskRun(prtr.PipelineTask) { + return prtr + } + } + return nil +} + +func canTaskRun(pt *v1alpha1.PipelineTask) bool { + // Check if Task can run now. Go through all the input constraints and see if + // the upstream tasks have completed successfully and inputs are available. + + // TODO: only should try to run this Task if the previous one has completed + + return true +} + +// PipelineRunTaskRun contains a Task and its associated TaskRun, if it +// exists. TaskRun can be nil to represent there being no TaskRun. +type PipelineRunTaskRun struct { + Task *v1alpha1.Task + PipelineTask *v1alpha1.PipelineTask + TaskRunName string + TaskRun *v1alpha1.TaskRun +} + +// GetTask is a function that will retrieve the Task name from namespace. +type GetTask func(namespace, name string) (*v1alpha1.Task, error) + +// GetTaskRun is a function that will retrieve the TaskRun name from namespace. +type GetTaskRun func(namespace, name string) (*v1alpha1.TaskRun, error) + +// GetPipelineState retrieves all Tasks instances which the pipeline p references, getting +// instances from getTask. It will also check if there is a corresponding TaskRun for the +// Task using getTaskRun (the name is built from pipelineRunName). If it is unable to +// retrieve an instance of a referenced Task, it will return an error, otherwise it +// returns a list of all of the Tasks retrieved, and their TaskRuns if applicable. +func GetPipelineState(getTask GetTask, getTaskRun GetTaskRun, p *v1alpha1.Pipeline, pipelineRunName string) ([]*PipelineRunTaskRun, error) { + state := []*PipelineRunTaskRun{} + for i := range p.Spec.Tasks { + pt := p.Spec.Tasks[i] + t, err := getTask(p.Namespace, pt.TaskRef.Name) + if err != nil { + return nil, fmt.Errorf("failed to get tasks for Pipeline %q: Error getting task %q : %s", + fmt.Sprintf("%s/%s", p.Namespace, p.Name), + fmt.Sprintf("%s/%s", p.Namespace, pt.TaskRef.Name), err) + } + prtr := PipelineRunTaskRun{ + Task: t, + PipelineTask: &pt, + } + prtr.TaskRunName = getTaskRunName(pipelineRunName, &pt) + taskRun, err := getTaskRun(p.Namespace, prtr.TaskRunName) + if err != nil { + // If the TaskRun isn't found, it just means it hasn't been run yet + if !errors.IsNotFound(err) { + return nil, fmt.Errorf("error retrieving TaskRun %s for Task %s: %s", prtr.TaskRunName, t.Name, err) + } + } else { + prtr.TaskRun = taskRun + } + state = append(state, &prtr) + } + return state, nil +} + +// getTaskRunName should return a uniquie name for a `TaskRun`. +func getTaskRunName(prName string, pt *v1alpha1.PipelineTask) string { + return fmt.Sprintf("%s-%s", prName, pt.Name) +} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go new file mode 100644 index 00000000000..7449a15f3c9 --- /dev/null +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go @@ -0,0 +1,167 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + namespace = "foo" +) + +var pts = []v1alpha1.PipelineTask{{ + Name: "mytask1", + TaskRef: v1alpha1.TaskRef{Name: "task"}, +}, { + Name: "mytask2", + TaskRef: v1alpha1.TaskRef{Name: "task"}, +}} + +var p = &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "pipeline", + }, + Spec: v1alpha1.PipelineSpec{ + Tasks: pts, + }, +} + +var task = &v1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "task", + }, + Spec: v1alpha1.TaskSpec{}, +} + +var trs = []v1alpha1.TaskRun{{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "pipelinerun-mytask1", + }, + Spec: v1alpha1.TaskRunSpec{}, +}, { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "pipelinerun-mytask2", + }, + Spec: v1alpha1.TaskRunSpec{}, +}} + +func TestGetNextTask_NoneStarted(t *testing.T) { + noneStartedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: nil, + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, + }} + // TODO: one started + firstFinishedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: &trs[0], + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, + }} + // TODO: all finished + tcs := []struct { + name string + state []*PipelineRunTaskRun + expectedTask *PipelineRunTaskRun + }{ + { + name: "no-tasks-started", + state: noneStartedState, + expectedTask: noneStartedState[0], + }, + { + name: "first-task-finished", + state: firstFinishedState, + expectedTask: firstFinishedState[1], + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + nextTask := GetNextTask(tc.state) + if d := cmp.Diff(nextTask, tc.expectedTask); d != "" { + t.Fatalf("Expected to indicate first task should be run, but different state returned: %s", d) + } + }) + } +} + +func TestGetPipelineState(t *testing.T) { + getTask := func(namespace, name string) (*v1alpha1.Task, error) { + return task, nil + } + getTaskRun := func(namespace, name string) (*v1alpha1.TaskRun, error) { + // We'll make it so that only the first Task has started running + if name == "pipelinerun-mytask1" { + return &trs[0], nil + } + return nil, errors.NewNotFound(v1alpha1.Resource("taskrun"), name) + } + pipelineState, err := GetPipelineState(getTask, getTaskRun, p, "pipelinerun") + if err != nil { + t.Fatalf("Error getting tasks for fake pipeline %s: %s", p.ObjectMeta.Name, err) + } + expectedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: &trs[0], + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, + }} + if d := cmp.Diff(pipelineState, expectedState); d != "" { + t.Fatalf("Expected to get current pipeline state %v, but actual differed: %s", expectedState, d) + } +} + +func TestGetPipelineState_TaskDoesntExist(t *testing.T) { + getTask := func(namespace, name string) (*v1alpha1.Task, error) { + return nil, fmt.Errorf("Task %s doesn't exist", name) + } + getTaskRun := func(namespace, name string) (*v1alpha1.TaskRun, error) { + return nil, nil + } + _, err := GetPipelineState(getTask, getTaskRun, p, "pipelinerun") + if err == nil { + t.Fatalf("Expected error getting non-existent Tasks for Pipeline %s but got none", p.Name) + } +} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns.go deleted file mode 100644 index be197b1f122..00000000000 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/api/errors" - - "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" -) - -// GetTaskRun is a function that will retrieve the TaskRun name from namespace. -type GetTaskRun func(namespace, name string) (*v1alpha1.TaskRun, error) - - -// GetNextPipelineRunTaskRun returns the first TaskRun it find in pr's Pipeline (p) -// for which it does not find a corresponding TaskRun via g which can be run. It -// returns the name of the Task referenced in the Pipeline and the expected name of -// the TaskRun that should be created. -func GetNextPipelineRunTaskRun(g GetTaskRun, p *v1alpha1.Pipeline, prName string) (string, string, error) { - for _, pt := range p.Spec.Tasks { - trName := getTaskRunName(prName, &pt) - _, err := g(p.Namespace, trName) - if errors.IsNotFound(err) && canTaskRun(&pt) { - return pt.Name, trName, nil - } - } - return "", "", nil -} - -func canTaskRun(pt *v1alpha1.PipelineTask) bool { - // Check if Task can run now. Go through all the input constraints and see if - // the upstream tasks have completed successfully and inputs are available. - return true -} - -// getTaskRunName should return a uniquie name for a `TaskRun`. -func getTaskRunName(prName string, pt *v1alpha1.PipelineTask) string { - return fmt.Sprintf("%s-%s", prName, pt.Name) -} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns_test.go deleted file mode 100644 index 6c9f43dc3df..00000000000 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/taskruns_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "testing" - - "k8s.io/apimachinery/pkg/api/errors" - - "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - namespace = "foo" -) - -func TestGetNextPipelineRunTaskRun(t *testing.T) { - ps := []*v1alpha1.Pipeline{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pipeline", - Namespace: namespace, - }, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{{ - Name: "unit-test-1", - TaskRef: v1alpha1.TaskRef{Name: "unit-test-task"}, - }, { - Name: "unit-test-2", - TaskRef: v1alpha1.TaskRef{Name: "unit-test-task"}, - }}, - }}} - pr := v1alpha1.PipelineRun{ObjectMeta: metav1.ObjectMeta{ - Name: "mypipelinerun", - Namespace: namespace, - }} - tcs := []struct { - name string - expectedPipelineTask string - expectedTaskRunName string - getTaskRun GetTaskRun - }{ - { - name: "shd-kick-first-task", - expectedPipelineTask: ps[0].Spec.Tasks[0].Name, - expectedTaskRunName: "mypipelinerun-unit-test-1", - getTaskRun: func(ns, name string) (*v1alpha1.TaskRun, error) { - return nil, errors.NewNotFound(v1alpha1.Resource("taskrun"), name) - }, - }, - { - name: "shd-kick-second-task", - expectedPipelineTask: ps[0].Spec.Tasks[1].Name, - expectedTaskRunName: "mypipelinerun-unit-test-2", - getTaskRun: func(ns, name string) (*v1alpha1.TaskRun, error) { - // Return the first TaskRun as if it has already been created - if name == "mypipelinerun-unit-test-1" { - return &v1alpha1.TaskRun{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mypipelinerun-unit-test-1", - Namespace: namespace, - }, - Spec: v1alpha1.TaskRunSpec{}, - }, nil - } - return nil, errors.NewNotFound(v1alpha1.Resource("taskrun"), name) - }, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - pipelineTaskName, trName, err := GetNextPipelineRunTaskRun(tc.getTaskRun, ps[0], pr.Name) - if err != nil { - t.Fatalf("Got error getting name of next Task to Run: %s", err) - } - if pipelineTaskName != tc.expectedPipelineTask { - t.Errorf("Expected to try to create %s but was %s instead", tc.expectedPipelineTask, pipelineTaskName) - } - if trName != tc.expectedTaskRunName { - t.Errorf("Expected to return TaskRun name %s but was %s instead", tc.expectedTaskRunName, trName) - } - }) - } -} diff --git a/test/pipelinerun_test.go b/test/pipelinerun_test.go index 6763fdd135a..f337deb3ead 100644 --- a/test/pipelinerun_test.go +++ b/test/pipelinerun_test.go @@ -28,8 +28,6 @@ import ( ) func TestPipelineRun(t *testing.T) { - t.Skip("Will fail until #61 is completed :D") - logger := logging.GetContextLogger(t.Name()) c, namespace := setup(t, logger) From e7ad309787bbc6e907348dc82df8742e61ef7487 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Wed, 10 Oct 2018 16:54:34 -0700 Subject: [PATCH 2/7] Check status of TaskRuns when finding TaskRun to start Added logic to check statuses of other TaskRuns when deciding if a new one should be started for #61 --- pkg/apis/pipeline/v1alpha1/taskrun_types.go | 7 ++ .../pipelinerun/resources/pipelinestate.go | 20 +++- .../resources/pipelinestate_test.go | 103 +++++++++++++++++- 3 files changed, 121 insertions(+), 9 deletions(-) diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types.go b/pkg/apis/pipeline/v1alpha1/taskrun_types.go index fe760ce42f6..f1e8490190a 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types.go @@ -74,6 +74,13 @@ type TaskRunStatus struct { Conditions duckv1alpha1.Conditions `json:"conditions,omitempty"` } +var taskRunCondSet = duckv1alpha1.NewBatchConditionSet() + +// GetCondition returns the Condition matching the given type. +func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { + return taskRunCondSet.Manage(tr).GetCondition(t) +} + // StepRun reports the results of running a step in the Task. Each // task has the potential to succeed or fail (based on the exit code) // and produces logs. diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go index c32bc623008..ad5ea85c2af 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go @@ -19,6 +19,8 @@ package resources import ( "fmt" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" @@ -28,7 +30,17 @@ import ( // not have a corresponding TaskRun and can run. func GetNextTask(pipelineTaskRuns []*PipelineRunTaskRun) *PipelineRunTaskRun { for _, prtr := range pipelineTaskRuns { - if prtr.TaskRun == nil && canTaskRun(prtr.PipelineTask) { + if prtr.TaskRun != nil { + switch s := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded); s.Status { + // if any of the TaskRuns failed, there is no new TaskRun to start + case corev1.ConditionFalse: + return nil + // if the current TaskRun is currently running, don't start another one + case corev1.ConditionUnknown: + return nil + } + // otherwise the TaskRun has finished successfully, so we should move on + } else if canTaskRun(prtr.PipelineTask) { return prtr } } @@ -36,11 +48,7 @@ func GetNextTask(pipelineTaskRuns []*PipelineRunTaskRun) *PipelineRunTaskRun { } func canTaskRun(pt *v1alpha1.PipelineTask) bool { - // Check if Task can run now. Go through all the input constraints and see if - // the upstream tasks have completed successfully and inputs are available. - - // TODO: only should try to run this Task if the previous one has completed - + // Check if Task can run now. Go through all the input constraints return true } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go index 7449a15f3c9..9ae8be7984b 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go @@ -21,6 +21,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" @@ -71,6 +73,39 @@ var trs = []v1alpha1.TaskRun{{ Spec: v1alpha1.TaskRunSpec{}, }} +func makeStarted(tr v1alpha1.TaskRun) *v1alpha1.TaskRun { + newTr := newTaskRun(tr) + newTr.Status.Conditions[0].Status = corev1.ConditionUnknown + return newTr +} + +func makeSucceeded(tr v1alpha1.TaskRun) *v1alpha1.TaskRun { + newTr := newTaskRun(tr) + newTr.Status.Conditions[0].Status = corev1.ConditionTrue + return newTr +} + +func makeFailed(tr v1alpha1.TaskRun) *v1alpha1.TaskRun { + newTr := newTaskRun(tr) + newTr.Status.Conditions[0].Status = corev1.ConditionFalse + return newTr +} + +func newTaskRun(tr v1alpha1.TaskRun) *v1alpha1.TaskRun { + return &v1alpha1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + Spec: tr.Spec, + Status: v1alpha1.TaskRunStatus{ + Conditions: []duckv1alpha1.Condition{{ + Type: duckv1alpha1.ConditionSucceeded, + }}, + }, + } +} + func TestGetNextTask_NoneStarted(t *testing.T) { noneStartedState := []*PipelineRunTaskRun{{ Task: task, @@ -83,19 +118,61 @@ func TestGetNextTask_NoneStarted(t *testing.T) { TaskRunName: "pipelinerun-mytask2", TaskRun: nil, }} - // TODO: one started + oneStartedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeStarted(trs[0]), + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, + }} + oneFinishedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeSucceeded(trs[0]), + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, + }} + oneFailedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeFailed(trs[0]), + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, + }} firstFinishedState := []*PipelineRunTaskRun{{ Task: task, PipelineTask: &pts[0], TaskRunName: "pipelinerun-mytask1", - TaskRun: &trs[0], + TaskRun: makeSucceeded(trs[0]), }, { Task: task, PipelineTask: &pts[1], TaskRunName: "pipelinerun-mytask2", TaskRun: nil, }} - // TODO: all finished + allFinishedState := []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeSucceeded(trs[0]), + }, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: makeSucceeded(trs[0]), + }} tcs := []struct { name string state []*PipelineRunTaskRun @@ -106,11 +183,31 @@ func TestGetNextTask_NoneStarted(t *testing.T) { state: noneStartedState, expectedTask: noneStartedState[0], }, + { + name: "one-task-started", + state: oneStartedState, + expectedTask: nil, + }, + { + name: "one-task-finished", + state: oneFinishedState, + expectedTask: oneFinishedState[1], + }, + { + name: "one-task-failed", + state: oneFailedState, + expectedTask: nil, + }, { name: "first-task-finished", state: firstFinishedState, expectedTask: firstFinishedState[1], }, + { + name: "all-finished", + state: allFinishedState, + expectedTask: nil, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { From 7d60a382b1d969697b5f421ec2c5da3cc764a7a0 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Thu, 11 Oct 2018 09:30:16 -0700 Subject: [PATCH 3/7] Add condition status to PipelineRun PipelineRun status will be based on the condition of the TaskRuns which it has created, for #61. If any TaskRuns have failed, the PipelineRun has failed. If all are successful, it is successful. If any are in progress, it is in progress. This is assuming a linear Pipeline, we will have to tweak this a bit when we implement the graph (for #65) --- .../pipeline/v1alpha1/pipelinerun_types.go | 49 ++--- pkg/apis/pipeline/v1alpha1/taskrun_types.go | 8 + .../v1alpha1/zz_generated.deepcopy.go | 19 +- .../v1alpha1/pipelinerun/pipelinerun.go | 23 ++- .../v1alpha1/pipelinerun/pipelinerun_test.go | 17 +- .../pipelinerun/resources/pipelinestate.go | 72 ++++++- .../resources/pipelinestate_test.go | 184 +++++++++++------- test/pipelinerun_test.go | 35 ++-- 8 files changed, 257 insertions(+), 150 deletions(-) diff --git a/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go b/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go index 96fd4a325d8..3ef0347c3a7 100644 --- a/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go @@ -17,7 +17,7 @@ limitations under the License. package v1alpha1 import ( - corev1 "k8s.io/api/core/v1" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -71,7 +71,22 @@ type PipelineRunStatus struct { // If there is no version, that means use latest // +optional ResourceVersion []PipelineResourceVersion `json:"resourceVersion,omitempty"` - Conditions []PipelineRunCondition `json:"conditions"` + Conditions duckv1alpha1.Conditions `json:"conditions"` +} + +var pipelineRunCondSet = duckv1alpha1.NewBatchConditionSet() + +// GetCondition returns the Condition matching the given type. +func (pr *PipelineRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { + return pipelineRunCondSet.Manage(pr).GetCondition(t) +} + +// SetCondition sets the condition, unsetting previous conditions with the same +// type as necessary. +func (pr *PipelineRunStatus) SetCondition(newCond *duckv1alpha1.Condition) { + if newCond != nil { + pipelineRunCondSet.Manage(pr).SetCondition(*newCond) + } } // +genclient @@ -106,33 +121,3 @@ type PipelineRunList struct { type PipelineTaskRun struct { Name string `json:"name"` } - -// PipelineRunConditionType indicates the status of the execution of the PipelineRun. -type PipelineRunConditionType string - -const ( - // PipelineRunConditionTypeStarted indicates whether or not the PipelineRun - // has started actually executing. - PipelineRunConditionTypeStarted PipelineRunConditionType = "Started" - - //PipelineRunConditionTypeCompleted indicates whether or not the PipelineRun - // has finished executing. - PipelineRunConditionTypeCompleted PipelineRunConditionType = "Completed" - - // PipelineRunConditionTypeSucceeded indicates whether or not the PipelineRun - // was successful. - PipelineRunConditionTypeSucceeded PipelineRunConditionType = "Successful" -) - -// PipelineRunCondition holds a Condition that the PipelineRun has entered into while being executed. -type PipelineRunCondition struct { - Type PipelineRunConditionType `json:"type"` - - Status corev1.ConditionStatus `json:"status"` - - LastTransitionTime metav1.Time `json:"lastTransitionTime"` - // +optional - Reason string `json:"reason,omitempty"` - // +optional - Message string `json:"message,omitempty"` -} diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types.go b/pkg/apis/pipeline/v1alpha1/taskrun_types.go index f1e8490190a..e009166e0c7 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types.go @@ -81,6 +81,14 @@ func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha return taskRunCondSet.Manage(tr).GetCondition(t) } +// SetCondition sets the condition, unsetting previous conditions with the same +// type as necessary. +func (bs *TaskRunStatus) SetCondition(newCond *duckv1alpha1.Condition) { + if newCond != nil { + taskRunCondSet.Manage(bs).SetCondition(*newCond) + } +} + // StepRun reports the results of running a step in the Task. Each // task has the potential to succeed or fail (based on the exit code) // and produces logs. diff --git a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go index 2cf4767e656..283012d57df 100644 --- a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go @@ -514,23 +514,6 @@ func (in *PipelineRun) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PipelineRunCondition) DeepCopyInto(out *PipelineRunCondition) { - *out = *in - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineRunCondition. -func (in *PipelineRunCondition) DeepCopy() *PipelineRunCondition { - if in == nil { - return nil - } - out := new(PipelineRunCondition) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PipelineRunList) DeepCopyInto(out *PipelineRunList) { *out = *in @@ -598,7 +581,7 @@ func (in *PipelineRunStatus) DeepCopyInto(out *PipelineRunStatus) { } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]PipelineRunCondition, len(*in)) + *out = make(duck_v1alpha1.Conditions, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 56d47a4a041..5fbbd89d6a8 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -141,7 +141,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er fmt.Sprintf("%s/%s", pr.Namespace, pr.Spec.PipelineRef.Name)) return nil } - pipelineTaskRuns, err := resources.GetPipelineState( + state, err := resources.GetPipelineState( func(namespace, name string) (*v1alpha1.Task, error) { return c.taskLister.Tasks(namespace).Get(name) }, @@ -153,18 +153,17 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er if err != nil { return fmt.Errorf("error getting Tasks for Pipeline %s, Pipeline may be invalid!: %s", p.Name, err) } - prtr := resources.GetNextTask(pipelineTaskRuns) - prtr.TaskRun, err = c.createTaskRun(prtr.Task, prtr.TaskRunName, pr) - if err != nil { - return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err) + prtr := resources.GetNextTask(pr.Name, state, c.Logger) + if prtr != nil { + c.Logger.Infof("Creating a new TaskRun object %s", prtr.TaskRunName) + prtr.TaskRun, err = c.createTaskRun(prtr.Task, prtr.TaskRunName, pr) + if err != nil { + return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err) + } } - // TODO fetch the taskruns status for this pipeline run. - // get all taskruns for all tasks - - // if any either dont exist yet or are themselves unknown, status is unknown - // if the status of any is failed, then this pipeline run is failed and we should stop trying to run more - + pr.Status.SetCondition(resources.GetPipelineConditionStatus(pr.Name, state, c.Logger)) + c.Logger.Infof("PipelineRun %s status is being set to %s", pr.Name, pr.Status) return nil } @@ -185,7 +184,7 @@ func (c *Reconciler) createTaskRun(t *v1alpha1.Task, trName string, pr *v1alpha1 func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) { newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name) if err != nil { - return nil, err + return nil, fmt.Errorf("Error getting PipelineRun %s when updating status: %s", pr.Name, err) } if !reflect.DeepEqual(newPr.Status, pr.Status) { newPr.Status = pr.Status diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go index 0420a74a633..f7d1ab83b89 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go @@ -157,7 +157,21 @@ type testData struct { } func seedTestData(d testData) (*fakepipelineclientset.Clientset, informersv1alpha1.PipelineRunInformer, informersv1alpha1.PipelineInformer, informersv1alpha1.TaskRunInformer, informersv1alpha1.TaskInformer) { - pipelineClient := fakepipelineclientset.NewSimpleClientset() + objs := []runtime.Object{} + for _, pr := range d.prs { + objs = append(objs, pr) + } + for _, p := range d.ps { + objs = append(objs, p) + } + for _, tr := range d.trs { + objs = append(objs, tr) + } + for _, t := range d.ts { + objs = append(objs, t) + } + pipelineClient := fakepipelineclientset.NewSimpleClientset(objs...) + sharedInfomer := informers.NewSharedInformerFactory(pipelineClient, 0) pipelineRunsInformer := sharedInfomer.Pipeline().V1alpha1().PipelineRuns() pipelineInformer := sharedInfomer.Pipeline().V1alpha1().Pipelines() @@ -170,7 +184,6 @@ func seedTestData(d testData) (*fakepipelineclientset.Clientset, informersv1alph for _, p := range d.ps { pipelineInformer.Informer().GetIndexer().Add(p) } - for _, tr := range d.trs { taskRunInformer.Informer().GetIndexer().Add(tr) } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go index ad5ea85c2af..d881c89568a 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go @@ -20,30 +20,37 @@ import ( "fmt" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" ) -// GetNextTask returns the first Task in pipelineTaskRuns that does -// not have a corresponding TaskRun and can run. -func GetNextTask(pipelineTaskRuns []*PipelineRunTaskRun) *PipelineRunTaskRun { - for _, prtr := range pipelineTaskRuns { +// GetNextTask returns the next Task for which a TaskRun should be created, +// or nil if no TaskRun should be created. +func GetNextTask(prName string, state []*PipelineRunTaskRun, logger *zap.SugaredLogger) *PipelineRunTaskRun { + for _, prtr := range state { if prtr.TaskRun != nil { - switch s := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded); s.Status { - // if any of the TaskRuns failed, there is no new TaskRun to start + c := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c == nil { + logger.Infof("TaskRun %s doesn't have a condition so it is just starting and we shouldn't start more for PipelineRun %s", prtr.TaskRunName, prName) + return nil + } + switch c.Status { case corev1.ConditionFalse: + logger.Infof("TaskRun %s has failed; we don't need to run PipelineRun %s", prtr.TaskRunName, prName) return nil - // if the current TaskRun is currently running, don't start another one case corev1.ConditionUnknown: + logger.Infof("TaskRun %s is still running so we shouldn't start more for PipelineRun %s", prtr.TaskRunName, prName) return nil } - // otherwise the TaskRun has finished successfully, so we should move on } else if canTaskRun(prtr.PipelineTask) { + logger.Infof("TaskRun %s should be started for PipelineRun %s", prtr.TaskRunName, prName) return prtr } } + logger.Infof("No TaskRuns to start for PipelineRun %s", prName) return nil } @@ -105,3 +112,52 @@ func GetPipelineState(getTask GetTask, getTaskRun GetTaskRun, p *v1alpha1.Pipeli func getTaskRunName(prName string, pt *v1alpha1.PipelineTask) string { return fmt.Sprintf("%s-%s", prName, pt.Name) } + +// GetPipelineConditionStatus will return the Condition that the PipelineRun prName should be +// updated with, based on the status of the TaskRuns in state. +func GetPipelineConditionStatus(prName string, state []*PipelineRunTaskRun, logger *zap.SugaredLogger) *duckv1alpha1.Condition { + allFinished := true + for _, prtr := range state { + if prtr.TaskRun == nil { + logger.Infof("TaskRun %s doesn't have a Status, so PipelineRun %s isn't finished", prtr.TaskRunName, prName) + allFinished = false + break + } + c := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c == nil { + logger.Infof("TaskRun %s doens't have a condition, so PipelineRun %s isn't finished", prtr.TaskRunName, prName) + allFinished = false + break + } + // If any TaskRuns have failed, we should halt execution and consider the run failed + if c.Status == corev1.ConditionFalse { + logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed", prtr.TaskRunName, prName) + return &duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: "Failed", + Message: fmt.Sprintf("TaskRun %s for Task %s has failed", prtr.TaskRun.Name, prtr.Task.Name), + } + } + if c.Status != corev1.ConditionTrue { + logger.Infof("TaskRun %s is still running so PipelineRun %s is still running", prtr.TaskRunName, prName) + allFinished = false + } + } + if !allFinished { + logger.Infof("PipelineRun %s still has running TaskRuns so it isn't yet done", prName) + return &duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: "Running", + Message: "Not all Tasks in the Pipeline have finished executing", + } + } + logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", prName) + return &duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionTrue, + Reason: "Finished", + Message: "All Tasks have completed executing", + } +} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go index 9ae8be7984b..0ae1b1c9b93 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "go.uber.org/zap" + "github.com/google/go-cmp/cmp" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" corev1 "k8s.io/api/core/v1" @@ -106,73 +108,74 @@ func newTaskRun(tr v1alpha1.TaskRun) *v1alpha1.TaskRun { } } +var noneStartedState = []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: nil, +}, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, +}} +var oneStartedState = []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeStarted(trs[0]), +}, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, +}} +var oneFinishedState = []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeSucceeded(trs[0]), +}, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, +}} +var oneFailedState = []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeFailed(trs[0]), +}, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, +}} +var firstFinishedState = []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeSucceeded(trs[0]), +}, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: nil, +}} +var allFinishedState = []*PipelineRunTaskRun{{ + Task: task, + PipelineTask: &pts[0], + TaskRunName: "pipelinerun-mytask1", + TaskRun: makeSucceeded(trs[0]), +}, { + Task: task, + PipelineTask: &pts[1], + TaskRunName: "pipelinerun-mytask2", + TaskRun: makeSucceeded(trs[0]), +}} + func TestGetNextTask_NoneStarted(t *testing.T) { - noneStartedState := []*PipelineRunTaskRun{{ - Task: task, - PipelineTask: &pts[0], - TaskRunName: "pipelinerun-mytask1", - TaskRun: nil, - }, { - Task: task, - PipelineTask: &pts[1], - TaskRunName: "pipelinerun-mytask2", - TaskRun: nil, - }} - oneStartedState := []*PipelineRunTaskRun{{ - Task: task, - PipelineTask: &pts[0], - TaskRunName: "pipelinerun-mytask1", - TaskRun: makeStarted(trs[0]), - }, { - Task: task, - PipelineTask: &pts[1], - TaskRunName: "pipelinerun-mytask2", - TaskRun: nil, - }} - oneFinishedState := []*PipelineRunTaskRun{{ - Task: task, - PipelineTask: &pts[0], - TaskRunName: "pipelinerun-mytask1", - TaskRun: makeSucceeded(trs[0]), - }, { - Task: task, - PipelineTask: &pts[1], - TaskRunName: "pipelinerun-mytask2", - TaskRun: nil, - }} - oneFailedState := []*PipelineRunTaskRun{{ - Task: task, - PipelineTask: &pts[0], - TaskRunName: "pipelinerun-mytask1", - TaskRun: makeFailed(trs[0]), - }, { - Task: task, - PipelineTask: &pts[1], - TaskRunName: "pipelinerun-mytask2", - TaskRun: nil, - }} - firstFinishedState := []*PipelineRunTaskRun{{ - Task: task, - PipelineTask: &pts[0], - TaskRunName: "pipelinerun-mytask1", - TaskRun: makeSucceeded(trs[0]), - }, { - Task: task, - PipelineTask: &pts[1], - TaskRunName: "pipelinerun-mytask2", - TaskRun: nil, - }} - allFinishedState := []*PipelineRunTaskRun{{ - Task: task, - PipelineTask: &pts[0], - TaskRunName: "pipelinerun-mytask1", - TaskRun: makeSucceeded(trs[0]), - }, { - Task: task, - PipelineTask: &pts[1], - TaskRunName: "pipelinerun-mytask2", - TaskRun: makeSucceeded(trs[0]), - }} tcs := []struct { name string state []*PipelineRunTaskRun @@ -211,7 +214,7 @@ func TestGetNextTask_NoneStarted(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - nextTask := GetNextTask(tc.state) + nextTask := GetNextTask("somepipelinerun", tc.state, zap.NewNop().Sugar()) if d := cmp.Diff(nextTask, tc.expectedTask); d != "" { t.Fatalf("Expected to indicate first task should be run, but different state returned: %s", d) } @@ -262,3 +265,50 @@ func TestGetPipelineState_TaskDoesntExist(t *testing.T) { t.Fatalf("Expected error getting non-existent Tasks for Pipeline %s but got none", p.Name) } } + +func TestGetPipelineConditionStatus(t *testing.T) { + tcs := []struct { + name string + state []*PipelineRunTaskRun + expectedStatus corev1.ConditionStatus + }{ + { + name: "no-tasks-started", + state: noneStartedState, + expectedStatus: corev1.ConditionUnknown, + }, + { + name: "one-task-started", + state: oneStartedState, + expectedStatus: corev1.ConditionUnknown, + }, + { + name: "one-task-finished", + state: oneFinishedState, + expectedStatus: corev1.ConditionUnknown, + }, + { + name: "one-task-failed", + state: oneFailedState, + expectedStatus: corev1.ConditionFalse, + }, + { + name: "first-task-finished", + state: firstFinishedState, + expectedStatus: corev1.ConditionUnknown, + }, + { + name: "all-finished", + state: allFinishedState, + expectedStatus: corev1.ConditionTrue, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + c := GetPipelineConditionStatus("somepipelinerun", tc.state, zap.NewNop().Sugar()) + if c.Status != tc.expectedStatus { + t.Fatalf("Expected to get status %s but got %s for state %v", tc.expectedStatus, c.Status, tc.state) + } + }) + } +} diff --git a/test/pipelinerun_test.go b/test/pipelinerun_test.go index f337deb3ead..6775b000009 100644 --- a/test/pipelinerun_test.go +++ b/test/pipelinerun_test.go @@ -21,8 +21,11 @@ package test import ( "testing" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" knativetest "github.com/knative/pkg/test" "github.com/knative/pkg/test/logging" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" ) @@ -48,20 +51,30 @@ func TestPipelineRun(t *testing.T) { t.Fatalf("Failed to create PipelineRun `%s`: %s", hwPipelineRunName, err) } - // TODO wait for the Run to be successful - logger.Infof("Waiting for PipelineRun %s in namespace %s to be updated by controller", hwPipelineRunName, namespace) + logger.Infof("Waiting for PipelineRun %s in namespace %s to complete", hwPipelineRunName, namespace) if err := WaitForPipelineRunState(c, hwPipelineRunName, func(tr *v1alpha1.PipelineRun) (bool, error) { - if len(tr.Status.Conditions) > 0 { - // TODO: use actual conditions + c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil && c.Status == corev1.ConditionTrue { return true, nil } return false, nil - }, "TaskRunCompleted"); err != nil { - t.Errorf("Error waiting for TaskRun %s to finish: %s", hwTaskRunName, err) + }, "PipelineRunSuccess"); err != nil { + t.Errorf("Error waiting for PipelineRun %s to finish: %s", hwTaskRunName, err) } - - // TODO check that TaskRuns created - - // Verify that the init containers Build ran had 'taskOutput' written - // VerifyBuildOutput(t, c, namespace, taskOutput) + logger.Infof("Making sure the expected TaskRuns were created") + expectedTaskRuns := []string{ + hwPipelineName + hwPipelineTaskName1, + hwPipelineName + hwPipelineTaskName2, + } + for _, runName := range expectedTaskRuns { + r, err := c.TaskRunClient.Get(runName, metav1.GetOptions{}) + if err != nil { + t.Errorf("Couldn't get expected TaskRun %s: %s", runName, err) + } + c := r.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c.Status != corev1.ConditionTrue { + t.Errorf("Expected TaskRun %s to have succeeded but Status is %s", runName, c.Status) + } + } + VerifyBuildOutput(t, c, namespace, taskOutput) } From f9e08676e55d0a5ed6dbf4bebd163f24eeb680d7 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Thu, 11 Oct 2018 09:32:00 -0700 Subject: [PATCH 4/7] Create TaskRun from PipelineRun that runs a Task Added the Task reference to the TaskRun so that when a PipelineRun creates a TaskRun, it actually executes! (For #61) While running the integration test, noticed that the PipelineRuns weren't getting reconciled quickly enough, but adding a tracker which will invoke reconcile when the created TaskRuns are updated fixed this - however it did still take > 1 minute to create 3 helloworld TaskRuns and wait for them to complete, so since 3 was arbitrary, reduced to 2. Also cleaned up the TaskRun controller a bit: using the Logger object on the controller/reconciler itself, made the log messages a bit more descriptive. --- cmd/controller/main.go | 2 +- .../v1alpha1/pipelinerun/pipelinerun.go | 25 +++++++++++++++++++ .../v1alpha1/pipelinerun/pipelinerun_test.go | 21 +++++++++++++++- pkg/reconciler/v1alpha1/taskrun/taskrun.go | 19 ++++++-------- test/crd.go | 12 +++------ test/crd_checks.go | 6 +---- test/pipelinerun_test.go | 14 ++++++----- test/taskrun_test.go | 8 +++--- 8 files changed, 71 insertions(+), 36 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index a6581a1762a..b9d73608457 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -65,7 +65,7 @@ func main() { logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, logging.ControllerLogKey) defer logger.Sync() - logger.Info("Starting the Build Controller") + logger.Info("Starting the Pipeline Controller") // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 5fbbd89d6a8..2e5d91a2fe0 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -20,13 +20,16 @@ import ( "context" "fmt" "reflect" + "time" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" "github.com/knative/build-pipeline/pkg/reconciler" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources" "github.com/knative/pkg/controller" + "github.com/knative/pkg/tracker" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -61,6 +64,7 @@ type Reconciler struct { pipelineLister listers.PipelineLister taskRunLister listers.TaskRunLister taskLister listers.TaskLister + tracker tracker.Interface } // Check that our Reconciler implements controller.Reconciler @@ -91,6 +95,11 @@ func NewController( UpdateFunc: controller.PassNew(impl.Enqueue), DeleteFunc: impl.Enqueue, }) + + r.tracker = tracker.New(impl.EnqueueKey, 30*time.Minute) + taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: controller.PassNew(r.tracker.OnChanged), + }) return impl } @@ -118,6 +127,17 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { // Don't modify the informer's copy. pr := original.DeepCopy() + taskRunRef := corev1.ObjectReference{ + APIVersion: "build-pipeline.knative.dev/v1alpha1", + Kind: "TaskRun", + Namespace: pr.Namespace, + Name: pr.Name, + } + if err := c.tracker.Track(taskRunRef, pr); err != nil { + c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err) + return err + } + // Reconcile this copy of the task run and then write back any status // updates regardless of whether the reconciliation errored out. err = c.reconcile(ctx, pr) @@ -177,6 +197,11 @@ func (c *Reconciler) createTaskRun(t *v1alpha1.Task, trName string, pr *v1alpha1 *metav1.NewControllerRef(pr, groupVersionKind), }, }, + Spec: v1alpha1.TaskRunSpec{ + TaskRef: v1alpha1.TaskRef{ + Name: t.Name, + }, + }, } return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(t.Namespace).Create(tr) } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go index f7d1ab83b89..323c7adf09b 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go @@ -23,10 +23,13 @@ import ( informers "github.com/knative/build-pipeline/pkg/client/informers/externalversions" informersv1alpha1 "github.com/knative/build-pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" "github.com/knative/build-pipeline/pkg/reconciler" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/controller" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" fakekubeclientset "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" ) @@ -78,6 +81,18 @@ func TestReconcile(t *testing.T) { if len(client.Actions()) == 0 { t.Fatalf("Expected client to have been used to create a TaskRun but it wasn't") } + + // Check that the PipelineRun was reconciled correctly + reconciledRun, err := client.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-success", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err) + } + condition := reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if condition == nil || condition.Status != corev1.ConditionUnknown { + t.Errorf("Expected PipelineRun status to be in progress, but was %s", condition) + } + + // Check that the expected TaskRun was created actual := client.Actions()[0].(ktesting.CreateAction).GetObject() trueB := true expectedTaskRun := &v1alpha1.TaskRun{ @@ -92,7 +107,11 @@ func TestReconcile(t *testing.T) { BlockOwnerDeletion: &trueB, }}, }, - Spec: v1alpha1.TaskRunSpec{}, + Spec: v1alpha1.TaskRunSpec{ + TaskRef: v1alpha1.TaskRef{ + Name: "unit-test-task", + }, + }, } if d := cmp.Diff(actual, expectedTaskRun); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, d) diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index a8cd6ca1680..c9a170585ba 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package taskrun import ( @@ -27,7 +28,6 @@ import ( buildinformers "github.com/knative/build/pkg/client/informers/externalversions/build/v1alpha1" buildlisters "github.com/knative/build/pkg/client/listers/build/v1alpha1" "github.com/knative/pkg/controller" - "github.com/knative/pkg/logging" "github.com/knative/pkg/tracker" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -122,13 +122,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Errorf("invalid resource key: %s", key) return nil } - logger := logging.FromContext(ctx) // Get the Task Run resource with this namespace/name original, err := c.taskRunLister.TaskRuns(namespace).Get(name) if errors.IsNotFound(err) { // The resource no longer exists, in which case we stop processing. - logger.Errorf("task run %q in work queue no longer exists", key) + c.Logger.Errorf("task run %q in work queue no longer exists", key) return nil } else if err != nil { return err @@ -146,19 +145,17 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. } else if _, err := c.updateStatus(tr); err != nil { - logger.Warn("Failed to update taskRun status", zap.Error(err)) + c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) return err } return err } func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error { - logger := logging.FromContext(ctx) - // get build the same as the taskrun, this is the value we use for 1:1 mapping and retrieval b, err := c.getBuild(tr.Namespace, tr.Name) if errors.IsNotFound(err) { - if b, err = c.makeBuild(tr, logger); err != nil { + if b, err = c.makeBuild(tr, c.Logger); err != nil { return fmt.Errorf("Failed to create a build for taskrun: %v", err) } } else if err != nil { @@ -174,15 +171,15 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error // taskrun has finished (as child build has finished and status is synced) if len(tr.Status.Conditions) > 0 && tr.Status.Conditions[0].Status != corev1.ConditionUnknown { - logger.Infof("Finished %s", tr.Name) + c.Logger.Infof("Finished %s", tr.Name) return nil } // sync build status with taskrun status if len(b.Status.Conditions) > 0 { - logger.Infof("Syncing taskrun conditions with build conditions %s", b.Status.Conditions[0]) + c.Logger.Infof("Syncing taskrun conditions with build conditions %s", b.Status.Conditions[0]) } else { - logger.Infof("Syncing taskrun conditions with build conditions []") + c.Logger.Infof("Syncing taskrun conditions with build conditions []") } tr.Status.Conditions = b.Status.Conditions return nil @@ -195,9 +192,7 @@ func (c *Reconciler) updateStatus(taskrun *v1alpha1.TaskRun) (*v1alpha1.TaskRun, } if !reflect.DeepEqual(taskrun.Status, newtaskrun.Status) { newtaskrun.Status = taskrun.Status - // TODO: for CRD there's no updatestatus, so use normal update return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(taskrun.Namespace).Update(newtaskrun) - // return configClient.UpdateStatus(newtaskrun) } return newtaskrun, nil } diff --git a/test/crd.go b/test/crd.go index ac26f06fcd4..275b0dcf153 100644 --- a/test/crd.go +++ b/test/crd.go @@ -42,6 +42,8 @@ const ( hwPipelineName = "helloworld-pipeline" hwPipelineRunName = "helloworld-pipelinerun" hwPipelineParamsName = "helloworld-pipelineparams" + hwPipelineTaskName1 = "helloworld-task-1" + hwPipelineTaskName2 = "helloworld-task-2" logPath = "/workspace" logFile = "out.txt" @@ -170,19 +172,13 @@ func getHelloWorldPipeline(namespace string) *v1alpha1.Pipeline { Spec: v1alpha1.PipelineSpec{ Tasks: []v1alpha1.PipelineTask{ v1alpha1.PipelineTask{ - Name: "helloworld-task-1", + Name: hwPipelineTaskName1, TaskRef: v1alpha1.TaskRef{ Name: hwTaskName, }, }, v1alpha1.PipelineTask{ - Name: "helloworld-task-2", - TaskRef: v1alpha1.TaskRef{ - Name: hwTaskName, - }, - }, - v1alpha1.PipelineTask{ - Name: "helloworld-task-3", + Name: hwPipelineTaskName2, TaskRef: v1alpha1.TaskRef{ Name: hwTaskName, }, diff --git a/test/crd_checks.go b/test/crd_checks.go index 4b2512f9912..6e8f271d728 100644 --- a/test/crd_checks.go +++ b/test/crd_checks.go @@ -30,11 +30,7 @@ import ( const ( interval = 1 * time.Second - // Currently using a super short timeout b/c tests are expected to fail so this way - // we can get to that failure faster - knative/serving is currently using `6 * time.Minute` - // which we could use, or we could use timeouts more specific to what each `Task` is - // actually expected to do. - timeout = 120 * time.Second + timeout = 2 * time.Minute ) // WaitForTaskRunState polls the status of the TaskRun called name from client every diff --git a/test/pipelinerun_test.go b/test/pipelinerun_test.go index 6775b000009..36081f312eb 100644 --- a/test/pipelinerun_test.go +++ b/test/pipelinerun_test.go @@ -19,6 +19,7 @@ limitations under the License. package test import ( + "strings" "testing" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" @@ -63,17 +64,18 @@ func TestPipelineRun(t *testing.T) { } logger.Infof("Making sure the expected TaskRuns were created") expectedTaskRuns := []string{ - hwPipelineName + hwPipelineTaskName1, - hwPipelineName + hwPipelineTaskName2, + strings.Join([]string{hwPipelineRunName, hwPipelineTaskName1}, "-"), + strings.Join([]string{hwPipelineRunName, hwPipelineTaskName2}, "-"), } for _, runName := range expectedTaskRuns { r, err := c.TaskRunClient.Get(runName, metav1.GetOptions{}) if err != nil { t.Errorf("Couldn't get expected TaskRun %s: %s", runName, err) - } - c := r.Status.GetCondition(duckv1alpha1.ConditionSucceeded) - if c.Status != corev1.ConditionTrue { - t.Errorf("Expected TaskRun %s to have succeeded but Status is %s", runName, c.Status) + } else { + c := r.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c.Status != corev1.ConditionTrue { + t.Errorf("Expected TaskRun %s to have succeeded but Status is %s", runName, c.Status) + } } } VerifyBuildOutput(t, c, namespace, taskOutput) diff --git a/test/taskrun_test.go b/test/taskrun_test.go index 0389d0e39f6..0b8b4a94d70 100644 --- a/test/taskrun_test.go +++ b/test/taskrun_test.go @@ -23,6 +23,7 @@ import ( "strings" "testing" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" knativetest "github.com/knative/pkg/test" "github.com/knative/pkg/test/logging" corev1 "k8s.io/api/core/v1" @@ -54,13 +55,14 @@ func TestTaskRun(t *testing.T) { t.Fatalf("Failed to create TaskRun `%s`: %s", hwTaskRunName, err) } - // Verify status of TaskRun (wait for it) + logger.Infof("Waiting for TaskRun %s in namespace %s to complete", hwTaskRunName, namespace) if err := WaitForTaskRunState(c, hwTaskRunName, func(tr *v1alpha1.TaskRun) (bool, error) { - if len(tr.Status.Conditions) > 0 && tr.Status.Conditions[0].Status == corev1.ConditionTrue { + c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil && c.Status == corev1.ConditionTrue { return true, nil } return false, nil - }, "TaskRunCompleted"); err != nil { + }, "TaskRunSuccess"); err != nil { t.Errorf("Error waiting for TaskRun %s to finish: %s", hwTaskRunName, err) } From 16c56d1f8a5510f4906cf410c916dce1458080f7 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Thu, 11 Oct 2018 16:48:40 -0700 Subject: [PATCH 5/7] Stop reconciling invalid PipelineRun If a PipelineRun references a Pipeline that uses Tasks which don't exist, we should immediately stop trying to Reconcile it. To fix this, the user/trigger should create a new PipelineRun after creating the Tasks needed. --- .../v1alpha1/pipelinerun/pipelinerun.go | 9 ++++- .../v1alpha1/pipelinerun/pipelinerun_test.go | 37 ++++++++++++++++++- .../pipelinerun/resources/pipelinestate.go | 6 +-- .../resources/pipelinestate_test.go | 6 ++- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 2e5d91a2fe0..81437af1745 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -171,7 +171,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er p, pr.Name, ) if err != nil { - return fmt.Errorf("error getting Tasks for Pipeline %s, Pipeline may be invalid!: %s", p.Name, err) + if errors.IsNotFound(err) { + c.Logger.Infof("PipelineRun %s's Pipeline %s can't be Run; it contains Tasks that don't exist: %s", + fmt.Sprintf("%s/%s", p.Namespace, p.Name), + fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err) + // The PipelineRun is Invalid so we want to stop trying to Reconcile it + return nil + } + return fmt.Errorf("error getting Tasks and/or TaskRuns for Pipeline %s, Pipeline may be invalid!: %s", p.Name, err) } prtr := resources.GetNextTask(pr.Name, state, c.Logger) if prtr != nil { diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go index 323c7adf09b..19b285d2f6b 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go @@ -46,7 +46,6 @@ func TestReconcile(t *testing.T) { }, }, }} - ps := []*v1alpha1.Pipeline{{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pipeline", @@ -118,7 +117,7 @@ func TestReconcile(t *testing.T) { } } -func TestReconcileInvalid(t *testing.T) { +func TestReconcile_InvalidPipeline(t *testing.T) { prs := []*v1alpha1.PipelineRun{{ ObjectMeta: metav1.ObjectMeta{ Name: "invalid-pipeline", @@ -160,6 +159,40 @@ func TestReconcileInvalid(t *testing.T) { } } +func TestReconcile_MissingTasks(t *testing.T) { + ps := []*v1alpha1.Pipeline{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pipeline-missing-tasks", + Namespace: "foo", + }, + Spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{{ + Name: "myspecialtask", + TaskRef: v1alpha1.TaskRef{Name: "sometask"}, + }}, + }}, + } + prs := []*v1alpha1.PipelineRun{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pipelinerun-missing-tasks", + Namespace: "foo", + }, + Spec: v1alpha1.PipelineRunSpec{ + PipelineRef: v1alpha1.PipelineRef{ + Name: "pipeline-missing-tasks", + }, + }}, + } + d := testData{ + prs: prs, + ps: ps, + } + c, _, _ := getController(d) + err := c.Reconciler.Reconcile(context.Background(), "foo/pipelinerun-missing-tasks") + if err != nil { + t.Errorf("When Pipeline's Tasks can't be found, expected no error to be returned (i.e. controller should stop trying to reconcile) but got: %s", err) + } +} + func getLogMessages(logs *observer.ObservedLogs) []string { messages := []string{} for _, l := range logs.All() { diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go index d881c89568a..107a8330c92 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go @@ -85,9 +85,9 @@ func GetPipelineState(getTask GetTask, getTaskRun GetTaskRun, p *v1alpha1.Pipeli pt := p.Spec.Tasks[i] t, err := getTask(p.Namespace, pt.TaskRef.Name) if err != nil { - return nil, fmt.Errorf("failed to get tasks for Pipeline %q: Error getting task %q : %s", - fmt.Sprintf("%s/%s", p.Namespace, p.Name), - fmt.Sprintf("%s/%s", p.Namespace, pt.TaskRef.Name), err) + // If the Task can't be found, it means the PipelineRun is invalid. Return the same error + // type so it can be used by the caller. + return nil, err } prtr := PipelineRunTaskRun{ Task: t, diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go index 0ae1b1c9b93..3694b7e3279 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go @@ -17,7 +17,6 @@ limitations under the License. package resources import ( - "fmt" "testing" "go.uber.org/zap" @@ -255,7 +254,7 @@ func TestGetPipelineState(t *testing.T) { func TestGetPipelineState_TaskDoesntExist(t *testing.T) { getTask := func(namespace, name string) (*v1alpha1.Task, error) { - return nil, fmt.Errorf("Task %s doesn't exist", name) + return nil, errors.NewNotFound(v1alpha1.Resource("task"), name) } getTaskRun := func(namespace, name string) (*v1alpha1.TaskRun, error) { return nil, nil @@ -264,6 +263,9 @@ func TestGetPipelineState_TaskDoesntExist(t *testing.T) { if err == nil { t.Fatalf("Expected error getting non-existent Tasks for Pipeline %s but got none", p.Name) } + if !errors.IsNotFound(err) { + t.Fatalf("Expected same error type returned by func for non-existent Task for Pipeline %s but got %s", p.Name, err) + } } func TestGetPipelineConditionStatus(t *testing.T) { From 39975c237d5ef950bbb50dfb416a0222710596c8 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Thu, 11 Oct 2018 18:14:57 -0700 Subject: [PATCH 6/7] Add/cleanup end to end debug info Something has gone wrong with one of the integration tests on my PR and I don't know what so I'm trying to add more info. Added Builds to the dumped CRDs, and also moved the step that deploys the examples is now after the integration tests b/c it produces a lot of errors in the logs (hahaha...) and makes it harder to debug integration tests failures. --- test/e2e-tests.sh | 11 +++++++---- test/kaniko_task_test.go | 10 ++++++++-- test/presubmit-tests.sh | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 994f9b60e95..292ed44b050 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -45,7 +45,7 @@ function teardown() { # Called by `fail_test` (provided by `e2e-tests.sh`) to dump info on test failure function dump_extra_cluster_state() { - for crd in pipelines pipelineruns tasks taskruns resources pipelineparams + for crd in pipelines pipelineruns tasks taskruns resources pipelineparams builds do echo ">>> $crd:" kubectl get $crd -o yaml --all-namespaces @@ -75,10 +75,13 @@ set +o xtrace # Wait for pods to be running in the namespaces we are deploying to wait_until_pods_running knative-build-pipeline || fail_test "Pipeline CRD did not come up" -# Run the smoke tests for the examples dir to make sure they are valid -./examples/smoke-test.sh || fail_test - # Run the integration tests go_test_e2e -timeout=20m ./test || fail_test +# Run the smoke tests for the examples dir to make sure they are valid +# Run these _after_ the integration tests b/c they don't quite work all the way +# and they cause a lot of noise in the logs, making it harder to debug integration +# test failures. +./examples/smoke-test.sh || fail_test + success diff --git a/test/kaniko_task_test.go b/test/kaniko_task_test.go index fa51eaed724..791cca266ab 100644 --- a/test/kaniko_task_test.go +++ b/test/kaniko_task_test.go @@ -26,6 +26,7 @@ import ( "time" buildv1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" knativetest "github.com/knative/pkg/test" "github.com/knative/pkg/test/logging" corev1 "k8s.io/api/core/v1" @@ -150,8 +151,13 @@ func TestKanikoTaskRun(t *testing.T) { // Verify status of TaskRun (wait for it) if err := WaitForTaskRunState(c, kanikoTaskRunName, func(tr *v1alpha1.TaskRun) (bool, error) { - if len(tr.Status.Conditions) > 0 && tr.Status.Conditions[0].Status == corev1.ConditionTrue { - return true, nil + c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil { + if c.Status == corev1.ConditionTrue { + return true, nil + } else if c.Status == corev1.ConditionFalse { + return true, fmt.Errorf("pipeline run %s failed!", hwPipelineRunName) + } } return false, nil }, "TaskRunCompleted"); err != nil { diff --git a/test/presubmit-tests.sh b/test/presubmit-tests.sh index c538bb5764b..fbfb4c33759 100755 --- a/test/presubmit-tests.sh +++ b/test/presubmit-tests.sh @@ -50,7 +50,7 @@ function integration_tests() { header "Running integration tests" local options="" (( EMIT_METRICS )) && options="--emit-metrics" - ./test/e2e-tests.sh ${options} + ./test/e2e-tests.sh ${options} } main $@ From 360d28276336d3a9ab37fb5866311c2ac6328cd2 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Fri, 12 Oct 2018 09:28:18 -0700 Subject: [PATCH 7/7] Only check build output in one integration test I think it's reasonable for only one of our eventually many integration tests to verify the build output, especially when it involves adding a volume mount to the pile of things that could go wrong in the test. Refactored the test a bit, so we don't assert inside the test, and we output some logs before polling. Removed dumping of CRDs in test script b/c each test runs in its own namespace and cleans up after itself, so there is never anything to dump (see #145). Updated condition checking so that if the Run fails, we bail immediately instead of continuing to hope it will succeed. --- test/crd.go | 58 +++++++++++++++++++++------------------- test/e2e-tests.sh | 5 ---- test/pipelinerun_test.go | 10 ++++--- test/taskrun_test.go | 48 ++++++++++----------------------- 4 files changed, 51 insertions(+), 70 deletions(-) diff --git a/test/crd.go b/test/crd.go index 275b0dcf153..b2e33d964d0 100644 --- a/test/crd.go +++ b/test/crd.go @@ -23,10 +23,9 @@ import ( "bytes" "fmt" "io" - "strings" - "testing" buildv1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1" + "github.com/knative/pkg/test/logging" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -53,7 +52,7 @@ const ( buildOutput = "Build successful" ) -func getHelloWorldValidationPod(namespace string) *corev1.Pod { +func getHelloWorldValidationPod(namespace, volumeClaimName string) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -80,7 +79,7 @@ func getHelloWorldValidationPod(namespace string) *corev1.Pod { Name: "scratch", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "scratch", + ClaimName: volumeClaimName, }, }, }, @@ -121,27 +120,32 @@ func getHelloWorldTask(namespace string, args []string) *v1alpha1.Task { Name: hwContainerName, Image: "busybox", Args: args, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{ - Name: "scratch", - MountPath: logPath, - }, - }, }, }, - Volumes: []corev1.Volume{ - corev1.Volume{ - Name: "scratch", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "scratch", - }, - }, - }, + }, + }, + } +} + +func getHelloWorldTaskWithVolume(namespace string, args []string) *v1alpha1.Task { + t := getHelloWorldTask(namespace, args) + t.Spec.BuildSpec.Steps[0].VolumeMounts = []corev1.VolumeMount{ + corev1.VolumeMount{ + Name: "scratch", + MountPath: logPath, + }, + } + t.Spec.BuildSpec.Volumes = []corev1.Volume{ + corev1.Volume{ + Name: "scratch", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: hwVolumeName, }, }, }, } + return t } func getHelloWorldTaskRun(namespace string) *v1alpha1.TaskRun { @@ -217,15 +221,15 @@ func getHelloWorldPipelineRun(namespace string) *v1alpha1.PipelineRun { } } -func VerifyBuildOutput(t *testing.T, c *clients, namespace string, testStr string) { +func getBuildOutputFromVolume(logger *logging.BaseLogger, c *clients, namespace, testStr string) (string, error) { // Create Validation Pod pods := c.KubeClient.Kube.CoreV1().Pods(namespace) - if _, err := pods.Create(getHelloWorldValidationPod(namespace)); err != nil { - t.Fatalf("Failed to create TaskRun `%s`: %s", hwTaskRunName, err) + if _, err := pods.Create(getHelloWorldValidationPod(namespace, hwVolumeName)); err != nil { + return "", fmt.Errorf("failed to create Volume `%s`: %s", hwVolumeName, err) } - // Verify status of Pod (wait for it) + logger.Infof("Waiting for pod with test volume %s to come up so we can read logs from it", hwVolumeName) if err := WaitForPodState(c, hwValidationPodName, namespace, func(p *corev1.Pod) (bool, error) { // the "Running" status is used as "Succeeded" caused issues as the pod succeeds and restarts quickly // there might be a race condition here and possibly a better way of handling this, perhaps using a Job or different state validation @@ -234,20 +238,18 @@ func VerifyBuildOutput(t *testing.T, c *clients, namespace string, testStr strin } return false, nil }, "ValidationPodCompleted"); err != nil { - t.Errorf("Error waiting for Pod %s to finish: %s", hwValidationPodName, err) + return "", fmt.Errorf("error waiting for Pod %s to finish: %s", hwValidationPodName, err) } // Get validation pod logs and verify that the build executed a container w/ desired output req := pods.GetLogs(hwValidationPodName, &corev1.PodLogOptions{}) readCloser, err := req.Stream() if err != nil { - t.Fatalf("Failed to open stream to read: %v", err) + return "", fmt.Errorf("failed to open stream to read: %v", err) } defer readCloser.Close() var buf bytes.Buffer out := bufio.NewWriter(&buf) _, err = io.Copy(out, readCloser) - if !strings.Contains(buf.String(), testStr) { - t.Fatalf("Expected output %s from pod %s but got %s", buildOutput, hwValidationPodName, buf.String()) - } + return buf.String(), nil } diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 292ed44b050..7d4eaef1920 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -45,11 +45,6 @@ function teardown() { # Called by `fail_test` (provided by `e2e-tests.sh`) to dump info on test failure function dump_extra_cluster_state() { - for crd in pipelines pipelineruns tasks taskruns resources pipelineparams builds - do - echo ">>> $crd:" - kubectl get $crd -o yaml --all-namespaces - done echo ">>> Pipeline controller log:" kubectl -n knative-build-pipeline logs $(get_app_pod build-pipeline-controller knative-build-pipeline) echo ">>> Pipeline webhook log:" diff --git a/test/pipelinerun_test.go b/test/pipelinerun_test.go index 36081f312eb..2e197780350 100644 --- a/test/pipelinerun_test.go +++ b/test/pipelinerun_test.go @@ -19,6 +19,7 @@ limitations under the License. package test import ( + "fmt" "strings" "testing" @@ -55,8 +56,12 @@ func TestPipelineRun(t *testing.T) { logger.Infof("Waiting for PipelineRun %s in namespace %s to complete", hwPipelineRunName, namespace) if err := WaitForPipelineRunState(c, hwPipelineRunName, func(tr *v1alpha1.PipelineRun) (bool, error) { c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) - if c != nil && c.Status == corev1.ConditionTrue { - return true, nil + if c != nil { + if c.Status == corev1.ConditionTrue { + return true, nil + } else if c.Status == corev1.ConditionFalse { + return true, fmt.Errorf("pipeline run %s failed!", hwPipelineRunName) + } } return false, nil }, "PipelineRunSuccess"); err != nil { @@ -78,5 +83,4 @@ func TestPipelineRun(t *testing.T) { } } } - VerifyBuildOutput(t, c, namespace, taskOutput) } diff --git a/test/taskrun_test.go b/test/taskrun_test.go index 0b8b4a94d70..2e738183151 100644 --- a/test/taskrun_test.go +++ b/test/taskrun_test.go @@ -16,10 +16,7 @@ limitations under the License. package test import ( - "bufio" - "bytes" "fmt" - "io" "strings" "testing" @@ -27,7 +24,6 @@ import ( knativetest "github.com/knative/pkg/test" "github.com/knative/pkg/test/logging" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" ) @@ -41,16 +37,15 @@ func TestTaskRun(t *testing.T) { knativetest.CleanupOnInterrupt(func() { tearDown(logger, c.KubeClient, namespace) }, logger) defer tearDown(logger, c.KubeClient, namespace) - // Create Volume + logger.Infof("Creating volume %s to collect log output", hwVolumeName) if _, err := c.KubeClient.Kube.CoreV1().PersistentVolumeClaims(namespace).Create(getHelloWorldVolumeClaim(namespace)); err != nil { t.Fatalf("Failed to create Volume `%s`: %s", hwTaskName, err) } - // Create Task - if _, err := c.TaskClient.Create(getHelloWorldTask(namespace, []string{"/bin/sh", "-c", fmt.Sprintf("echo %s > %s/%s", taskOutput, logPath, logFile)})); err != nil { + logger.Infof("Creating Task and TaskRun in namespace %s", namespace) + if _, err := c.TaskClient.Create(getHelloWorldTaskWithVolume(namespace, []string{"/bin/sh", "-c", fmt.Sprintf("echo %s > %s/%s", taskOutput, logPath, logFile)})); err != nil { t.Fatalf("Failed to create Task `%s`: %s", hwTaskName, err) } - if _, err := c.TaskRunClient.Create(getHelloWorldTaskRun(namespace)); err != nil { t.Fatalf("Failed to create TaskRun `%s`: %s", hwTaskRunName, err) } @@ -58,39 +53,24 @@ func TestTaskRun(t *testing.T) { logger.Infof("Waiting for TaskRun %s in namespace %s to complete", hwTaskRunName, namespace) if err := WaitForTaskRunState(c, hwTaskRunName, func(tr *v1alpha1.TaskRun) (bool, error) { c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) - if c != nil && c.Status == corev1.ConditionTrue { - return true, nil + if c != nil { + if c.Status == corev1.ConditionTrue { + return true, nil + } else if c.Status == corev1.ConditionFalse { + return true, fmt.Errorf("pipeline run %s failed!", hwPipelineRunName) + } } return false, nil }, "TaskRunSuccess"); err != nil { t.Errorf("Error waiting for TaskRun %s to finish: %s", hwTaskRunName, err) } - // The Build created by the TaskRun will have the same name - b, err := c.BuildClient.Get(hwTaskRunName, metav1.GetOptions{}) - if err != nil { - t.Errorf("Expected there to be a Build with the same name as TaskRun %s but got error: %s", hwTaskRunName, err) - } - cluster := b.Status.Cluster - if cluster == nil || cluster.PodName == "" { - t.Fatalf("Expected build status to have a podname but it didn't!") - } - podName := cluster.PodName - pods := c.KubeClient.Kube.CoreV1().Pods(namespace) - - req := pods.GetLogs(podName, &corev1.PodLogOptions{}) - readCloser, err := req.Stream() + logger.Infof("Verifying TaskRun %s output in volume %s", hwTaskRunName, hwVolumeName) + output, err := getBuildOutputFromVolume(logger, c, namespace, taskOutput) if err != nil { - t.Fatalf("Failed to open stream to read: %v", err) + t.Fatalf("Unable to get build output from volume %s: %s", hwVolumeName, err) } - defer readCloser.Close() - var buf bytes.Buffer - out := bufio.NewWriter(&buf) - _, err = io.Copy(out, readCloser) - if !strings.Contains(buf.String(), buildOutput) { - t.Fatalf("Expected output %s from pod %s but got %s", buildOutput, podName, buf.String()) + if !strings.Contains(output, taskOutput) { + t.Fatalf("Expected output %s from pod %s but got %s", buildOutput, hwValidationPodName, output) } - - // Verify that the init containers Build ran had 'taskOutput' written - VerifyBuildOutput(t, c, namespace, taskOutput) }