Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

access execution status of any task in finally #3390

Merged
merged 1 commit into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,44 @@ Overall, `PipelineRun` state transitioning is explained below for respective sce
Please refer to the [table](pipelineruns.md#monitoring-execution-status) under Monitoring Execution Status to learn about
what kind of events are triggered based on the `Pipelinerun` status.


### Using Execution `Status` of `pipelineTask`

Finally Task can utilize execution status of any of the `pipelineTasks` under `tasks` section using param:

```yaml
pritidesai marked this conversation as resolved.
Show resolved Hide resolved
finally:
- name: finaltask
params:
- name: task1Status
value: "$(tasks.task1.status)"
taskSpec:
params:
- name: task1Status
steps:
- image: ubuntu
name: print-task-status
script: |
if [ $(params.task1Status) == "Failed" ]
then
echo "Task1 has failed, continue processing the failure"
fi
```

This kind of variable can have any one of the values from the following table:

| Status | Description |
| ------- | -----------|
| Succeeded | `taskRun` for the `pipelineTask` completed successfully |
| Failed | `taskRun` for the `pipelineTask` completed with a failure or cancelled by the user |
| None | the `pipelineTask` has been skipped or no execution information available for the `pipelineTask` |

For an end-to-end example, see [`status` in a `PipelineRun`](../examples/v1beta1/pipelineruns/pipelinerun-task-execution-status.yaml).

**Note:** `$(tasks.<pipelineTask>.status)` is instantiated and available at runtime and must be used as a param value
as is without concatenating it with any other param or string, for example, this kind of usage is not validated/supported
`task status is $(tasks.<pipelineTask>.status)`.

### Known Limitations

### Specifying `Resources` in Final Tasks
Expand Down
1 change: 1 addition & 0 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ For instructions on using variable substitutions see the relevant section of [th
| `context.pipelineRun.namespace` | The namespace of the `PipelineRun` that this `Pipeline` is running in. |
| `context.pipelineRun.uid` | The uid of the `PipelineRun` that this `Pipeline` is running in. |
| `context.pipeline.name` | The name of this `Pipeline` . |
| `tasks.<pipelineTaskName>.status` | The execution status of the specified `pipelineTask`, only available in `finally` tasks. |


## Variables available in a `Task`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
kind: PipelineRun
apiVersion: tekton.dev/v1beta1
metadata:
generateName: pr-execution-status-
spec:
serviceAccountName: 'default'
pipelineSpec:
tasks:
- name: task1 # successful task
taskSpec:
steps:
- image: ubuntu
name: hello
script: |
echo "Hello World!"
- name: task2 # skipped task
when:
- input: "true"
operator: "notin"
values: ["true"]
taskSpec:
steps:
- image: ubuntu
name: success
script: |
exit 0
finally:
- name: task3 # this task verifies the status of dag tasks, it fails if verification fails
params:
- name: task1Status
value: "$(tasks.task1.status)"
- name: task2Status
value: "$(tasks.task2.status)"
taskSpec:
params:
- name: task1Status
- name: task2Status
steps:
- image: alpine
name: verify-dag-task-status
script: |
if [[ $(params.task1Status) != "Succeeded" || $(params.task2Status) != "None" ]]; then
exit 1;
fi
35 changes: 35 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (ps *PipelineSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
errs = errs.Also(validatePipelineParameterVariables(ps.Tasks, ps.Params).ViaField("tasks"))
errs = errs.Also(validatePipelineParameterVariables(ps.Finally, ps.Params).ViaField("finally"))
errs = errs.Also(validatePipelineContextVariables(ps.Tasks))
errs = errs.Also(validateExecutionStatusVariables(ps.Tasks, ps.Finally))
// Validate the pipeline's workspaces.
errs = errs.Also(validatePipelineWorkspaces(ps.Workspaces, ps.Tasks, ps.Finally))
// Validate the pipeline's results
Expand Down Expand Up @@ -290,6 +291,40 @@ func validatePipelineContextVariables(tasks []PipelineTask) *apis.FieldError {
return errs.Also(validatePipelineContextVariablesInParamValues(paramValues, "context\\.pipeline", pipelineContextNames))
}

func validateExecutionStatusVariables(tasks []PipelineTask, finallyTasks []PipelineTask) (errs *apis.FieldError) {
// creating a list of pipelineTask names to validate tasks.<name>.status
pipelineRunTasksContextNames := sets.String{}
for idx, t := range tasks {
for _, param := range t.Params {
// validate dag pipeline tasks not accessing execution status of other pipeline task
if ps, ok := GetVarSubstitutionExpressionsForParam(param); ok {
for _, p := range ps {
if strings.HasPrefix(p, "tasks.") && strings.HasSuffix(p, ".status") {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"),
"value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
}
}
}
}
pipelineRunTasksContextNames.Insert(t.Name)
}

// validate finally tasks accessing execution status of a dag task specified in the pipeline
var paramValues []string
for _, t := range finallyTasks {
for _, param := range t.Params {
pritidesai marked this conversation as resolved.
Show resolved Hide resolved
paramValues = append(paramValues, param.Value.StringVal)
paramValues = append(paramValues, param.Value.ArrayVal...)
}
}
for _, paramValue := range paramValues {
if strings.HasPrefix(stripVarSubExpression(paramValue), "tasks.") && strings.HasSuffix(stripVarSubExpression(paramValue), ".status") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: we don't need to invoke stripVarSubExpression twice here.

afrittoli marked this conversation as resolved.
Show resolved Hide resolved
errs = errs.Also(substitution.ValidateVariablePS(paramValue, "tasks", "status", pipelineRunTasksContextNames).ViaField("value"))
}
}
afrittoli marked this conversation as resolved.
Show resolved Hide resolved
return errs
}

func validatePipelineContextVariablesInParamValues(paramValues []string, prefix string, contextNames sets.String) (errs *apis.FieldError) {
for _, paramValue := range paramValues {
errs = errs.Also(substitution.ValidateVariableP(paramValue, prefix, contextNames).ViaField("value"))
Expand Down
74 changes: 74 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,80 @@ func TestContextInvalid(t *testing.T) {
}
}

func TestPipelineTasksExecutionStatus(t *testing.T) {
tests := []struct {
name string
tasks []PipelineTask
finalTasks []PipelineTask
expectedError apis.FieldError
}{{
name: "valid string variable in finally accessing pipelineTask status",
tasks: []PipelineTask{{
Name: "foo",
}},
finalTasks: []PipelineTask{{
Name: "bar",
TaskRef: &TaskRef{Name: "bar-task"},
Params: []Param{{
Name: "foo-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.foo.status)"},
}},
}},
}, {
name: "invalid string variable in dag task accessing pipelineTask status",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "bar-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.bar.status)"},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid array variable in dag task accessing pipelineTask status",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "bar-status", Value: ArrayOrString{Type: ParamTypeArray, ArrayVal: []string{"$(tasks.bar.status)"}},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid string variable in finally accessing missing pipelineTask status",
finalTasks: []PipelineTask{{
Name: "bar",
TaskRef: &TaskRef{Name: "bar-task"},
Params: []Param{{
Name: "notask-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.notask.status)"},
}},
}},
expectedError: *apis.ErrGeneric(`non-existent variable in "$(tasks.notask.status)"`, "value"),
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateExecutionStatusVariables(tt.tasks, tt.finalTasks)
if len(tt.expectedError.Error()) == 0 {
if err != nil {
t.Errorf("Pipeline.validateExecutionStatusVariables() returned error for valid pipeline variable accessing execution status: %s: %v", tt.name, err)
}
} else {
if err == nil {
t.Errorf("Pipeline.validateExecutionStatusVariables() did not return error for invalid pipeline parameters accessing execution status: %s, %s", tt.name, tt.tasks[0].Params)
}
if d := cmp.Diff(tt.expectedError.Error(), err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" {
t.Errorf("PipelineSpec.Validate() errors diff %s", diff.PrintWantGot(d))
}
}
})
}
}

func getTaskSpec() TaskSpec {
return TaskSpec{
Steps: []Step{{
Expand Down
7 changes: 6 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,12 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)
fnextRprts := pipelineRunFacts.GetFinalTasks()
if len(fnextRprts) != 0 {
// apply the runtime context just before creating taskRuns for final tasks in queue
resources.ApplyPipelineTaskContext(fnextRprts, pipelineRunFacts.GetPipelineTaskStatus(ctx))
nextRprts = append(nextRprts, fnextRprts...)
}

for _, rprt := range nextRprts {
if rprt == nil || rprt.Skip(pipelineRunFacts) {
Expand Down
97 changes: 97 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4548,6 +4548,103 @@ func TestReconcilePipeline_TaskSpecMetadata(t *testing.T) {
}
}

func TestReconciler_ReconcileKind_PipelineTaskContext(t *testing.T) {
names.TestingSeed()

pipelineName := "p-pipelinetask-status"
pipelineRunName := "pr-pipelinetask-status"

ps := []*v1beta1.Pipeline{tb.Pipeline(pipelineName, tb.PipelineNamespace("foo"), tb.PipelineSpec(
tb.PipelineTask("task1", "mytask"),
tb.FinalPipelineTask("finaltask", "finaltask",
tb.PipelineTaskParam("pipelineRun-tasks-task1", "$(tasks.task1.status)"),
),
))}

prs := []*v1beta1.PipelineRun{tb.PipelineRun(pipelineRunName, tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec(pipelineName, tb.PipelineRunServiceAccountName("test-sa")),
)}

ts := []*v1beta1.Task{
tb.Task("mytask", tb.TaskNamespace("foo")),
tb.Task("finaltask", tb.TaskNamespace("foo"),
tb.TaskSpec(
tb.TaskParam("pipelineRun-tasks-task1", v1beta1.ParamTypeString),
),
),
}

trs := []*v1beta1.TaskRun{
tb.TaskRun(pipelineRunName+"-task1-xxyyy",
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", pipelineRunName,
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", pipelineName),
tb.TaskRunLabel("tekton.dev/pipelineRun", pipelineRunName),
tb.TaskRunLabel("tekton.dev/pipelineTask", "task1"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("mytask"),
tb.TaskRunServiceAccountName("test-sa"),
),
tb.TaskRunStatus(
tb.StatusCondition(
apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.TaskRunReasonFailed.String(),
},
),
),
),
}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

_, clients := prt.reconcileRun("foo", pipelineRunName, []string{}, false)

expectedTaskRunName := pipelineRunName + "-finaltask-9l9zj"
expectedTaskRun := tb.TaskRun(expectedTaskRunName,
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", pipelineRunName,
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", pipelineName),
tb.TaskRunLabel("tekton.dev/pipelineRun", pipelineRunName),
tb.TaskRunLabel("tekton.dev/pipelineTask", "finaltask"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("finaltask"),
tb.TaskRunServiceAccountName("test-sa"),
tb.TaskRunParam("pipelineRun-tasks-task1", "Failed"),
),
)
// Check that the expected TaskRun was created
actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(prt.TestAssets.Ctx, metav1.ListOptions{
LabelSelector: "tekton.dev/pipelineTask=finaltask,tekton.dev/pipelineRun=" + pipelineRunName,
Limit: 1,
})

if err != nil {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actual.Items) != 1 {
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
}
actualTaskRun := actual.Items[0]
if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
}
}

// NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data
// This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun
func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest {
Expand Down
11 changes: 11 additions & 0 deletions pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ func ApplyTaskResults(targets PipelineRunState, resolvedResultRefs ResolvedResul
}
}

//ApplyPipelineTaskContext replaces context variables referring to execution status with the specified status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: missing space // Apply...

func ApplyPipelineTaskContext(state PipelineRunState, replacements map[string]string) {
for _, resolvedPipelineRunTask := range state {
if resolvedPipelineRunTask.PipelineTask != nil {
pipelineTask := resolvedPipelineRunTask.PipelineTask.DeepCopy()
pipelineTask.Params = replaceParamValues(pipelineTask.Params, replacements, nil)
resolvedPipelineRunTask.PipelineTask = pipelineTask
}
}
}

// ApplyWorkspaces replaces workspace variables in the given pipeline spec with their
// concrete values.
func ApplyWorkspaces(p *v1beta1.PipelineSpec, pr *v1beta1.PipelineRun) *v1beta1.PipelineSpec {
Expand Down
Loading