Skip to content

Commit

Permalink
retrieving deps outside of dag.build
Browse files Browse the repository at this point in the history
Adding an attribute function to PipelineTaskList to retrieve deps
(thanks @bobcatfish for great suggestion), passing this deps to
Build function instead of retrieving it from within that function.

This change was motivated while introducing task result consumption in
finally tasks. Finally tasks can consume results from dag tasks but
dag tasks are not part of the same graph and must not be added as part of
deps of any finally tasks. This change allows reconciler (with the knowldege of
dag tasks vs finally tasks) to manage deps while building a graph.
  • Loading branch information
pritidesai authored and tekton-robot committed Dec 3, 2020
1 parent d702ca1 commit 431ea10
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 20 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ func (l PipelineTaskList) Items() []dag.Task {
return tasks
}

func (l PipelineTaskList) Deps() map[string][]string {
deps := map[string][]string{}
for _, pt := range l {
deps[pt.HashKey()] = pt.Deps()
}
return deps
}

// PipelineTaskParam is used to provide arbitrary string parameters to a Task.
type PipelineTaskParam = v1beta1.PipelineTaskParam

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1alpha1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func validateFrom(tasks []PipelineTask) *apis.FieldError {
// cycle or that they rely on values from Tasks that ran previously, and that the PipelineResource
// is actually an output of the Task it should come from.
func validateGraph(tasks []PipelineTask) error {
if _, err := dag.Build(PipelineTaskList(tasks)); err != nil {
if _, err := dag.Build(PipelineTaskList(tasks), PipelineTaskList(tasks).Deps()); err != nil {
return err
}
return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (pt PipelineTask) resourceDeps() []string {
resourceDeps = append(resourceDeps, rd.From...)
}
}

// Add any dependents from conditional resources.
for _, cond := range pt.Conditions {
for _, rd := range cond.Resources {
Expand All @@ -209,6 +210,7 @@ func (pt PipelineTask) resourceDeps() []string {
}
}
}

// Add any dependents from task results
for _, param := range pt.Params {
expressions, ok := GetVarSubstitutionExpressionsForParam(param)
Expand Down Expand Up @@ -254,6 +256,14 @@ func contains(s string, arr []string) bool {

type PipelineTaskList []PipelineTask

func (l PipelineTaskList) Deps() map[string][]string {
deps := map[string][]string{}
for _, pt := range l {
deps[pt.HashKey()] = pt.Deps()
}
return deps
}

func (l PipelineTaskList) Items() []dag.Task {
tasks := []dag.Task{}
for _, t := range l {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func validateFrom(tasks []PipelineTask) (errs *apis.FieldError) {
// cycle or that they rely on values from Tasks that ran previously, and that the PipelineResource
// is actually an output of the Task it should come from.
func validateGraph(tasks []PipelineTask) *apis.FieldError {
if _, err := dag.Build(PipelineTaskList(tasks)); err != nil {
if _, err := dag.Build(PipelineTaskList(tasks), PipelineTaskList(tasks).Deps()); err != nil {
return apis.ErrInvalidValue(err.Error(), "tasks")
}
return nil
Expand Down
5 changes: 2 additions & 3 deletions pkg/reconciler/pipeline/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,16 @@ func (g *Graph) addPipelineTask(t Task) (*Node, error) {
}

// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
func Build(tasks Tasks) (*Graph, error) {
func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
d := newGraph()

deps := map[string][]string{}
// Add all Tasks mentioned in the `PipelineSpec`
for _, pt := range tasks.Items() {
if _, err := d.addPipelineTask(pt); err != nil {
return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
}
deps[pt.HashKey()] = pt.Deps()
}

// Process all from and runAfter constraints to add task dependency
for pt, taskDeps := range deps {
for _, previousTask := range taskDeps {
Expand Down
17 changes: 9 additions & 8 deletions pkg/reconciler/pipeline/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestBuild_Parallel(t *testing.T) {
"c": {Task: c},
},
}
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks), v1beta1.PipelineTaskList(p.Spec.Tasks).Deps())
if err != nil {
t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBuild_JoinMultipleRoots(t *testing.T) {
Tasks: []v1beta1.PipelineTask{a, xDependsOnA, yDependsOnARunsAfterB, zDependsOnX, b, c},
},
}
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks), v1beta1.PipelineTaskList(p.Spec.Tasks).Deps())
if err != nil {
t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestBuild_FanInFanOut(t *testing.T) {
Tasks: []v1beta1.PipelineTask{a, dDependsOnA, eRunsAfterA, fDependsOnDAndE, gRunsAfterF},
},
}
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks), v1beta1.PipelineTaskList(p.Spec.Tasks).Deps())
if err != nil {
t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestBuild_ConditionResources(t *testing.T) {
},
}

g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks), v1beta1.PipelineTaskList(p.Spec.Tasks).Deps())
if err != nil {
t.Errorf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
Expand Down Expand Up @@ -433,7 +433,8 @@ func TestBuild_TaskParamsFromTaskResults(t *testing.T) {
Tasks: []v1beta1.PipelineTask{a, b, c, d, e, xDependsOnA, yDependsOnBRunsAfterC, zDependsOnDAndE},
},
}
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
tasks := v1beta1.PipelineTaskList(p.Spec.Tasks)
g, err := dag.Build(tasks, tasks.Deps())
if err != nil {
t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
Expand Down Expand Up @@ -473,7 +474,7 @@ func TestBuild_ConditionsParamsFromTaskResults(t *testing.T) {
Tasks: []v1beta1.PipelineTask{a, xDependsOnA},
},
}
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
g, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks), v1beta1.PipelineTaskList(p.Spec.Tasks).Deps())
if err != nil {
t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
Expand Down Expand Up @@ -656,7 +657,7 @@ func TestBuild_InvalidDAG(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: tc.name},
Spec: tc.spec,
}
_, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
_, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks), v1beta1.PipelineTaskList(p.Spec.Tasks).Deps())
if err == nil || !strings.Contains(err.Error(), tc.err) {
t.Errorf("expected to see an error for invalid DAG in pipeline %v but had none", tc.spec)
}
Expand Down Expand Up @@ -694,7 +695,7 @@ func testGraph(t *testing.T) *dag.Graph {
Name: "z",
RunAfter: []string{"x"},
}}
g, err := dag.Build(v1beta1.PipelineTaskList(tasks))
g, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
pr.ObjectMeta.Annotations[key] = value
}

d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks))
d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks), v1beta1.PipelineTaskList(pipelineSpec.Tasks).Deps())
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
Expand All @@ -369,7 +369,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
// if a task in PipelineRunState is final task or not
// the finally section is optional and might not exist
// dfinally holds an empty Graph in the absence of finally clause
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally))
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally), map[string][]string{})
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func DagFromState(state PipelineRunState) (*dag.Graph, error) {
for _, rprt := range state {
pts = append(pts, *rprt.PipelineTask)
}
return dag.Build(v1beta1.PipelineTaskList(pts))
return dag.Build(v1beta1.PipelineTaskList(pts), v1beta1.PipelineTaskList(pts).Deps())
}

func TestIsSkipped(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,11 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) {
expectedFinalTasks: PipelineRunState{},
}}
for _, tc := range tcs {
dagGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.DAGTasks))
dagGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.DAGTasks), v1beta1.PipelineTaskList(tc.DAGTasks).Deps())
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for pipelineTasks %v: %v", tc.DAGTasks, err)
}
finalGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks))
finalGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks), map[string][]string{})
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for final pipelineTasks %v: %v", tc.finalTasks, err)
}
Expand Down Expand Up @@ -1059,11 +1059,11 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
pr := tb.PipelineRun("pipelinerun-final-tasks")
d, err := dag.Build(v1beta1.PipelineTaskList(tc.dagTasks))
d, err := dag.Build(v1beta1.PipelineTaskList(tc.dagTasks), v1beta1.PipelineTaskList(tc.dagTasks).Deps())
if err != nil {
t.Fatalf("Unexpected error while buildig graph for DAG tasks %v: %v", tc.dagTasks, err)
}
df, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks))
df, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks), map[string][]string{})
if err != nil {
t.Fatalf("Unexpected error while buildig graph for final tasks %v: %v", tc.finalTasks, err)
}
Expand Down

0 comments on commit 431ea10

Please sign in to comment.