Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run finally pipeline even if task is failed at the validation #8314

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,14 +840,22 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1.Pipeline
return controller.NewPermanentError(err)
}

// Check for Missing Result References
err = resources.CheckMissingResultReferences(pipelineRunFacts.State, nextRpts)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(v1.PipelineRunReasonInvalidTaskResultReference.String(), err.Error())
return controller.NewPermanentError(err)
for _, rpt := range nextRpts {
// Check for Missing Result References
// if error found, present rpt will be
// added to the validationFailedTask list
err := resources.CheckMissingResultReferences(pipelineRunFacts.State, rpt)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
// If there is an error encountered, no new task
// will be scheduled, hence nextRpts should be empty
// If finally tasks are found, then those tasks will
// be added to the nextRpts
nextRpts = nil
logger.Infof("Adding the task %q to the validation failed list", rpt.ResolvedTask)
pipelineRunFacts.ValidationFailedTask = append(pipelineRunFacts.ValidationFailedTask, rpt)
}
}

// GetFinalTasks only returns final tasks when a DAG is complete
fNextRpts := pipelineRunFacts.GetFinalTasks()
if len(fNextRpts) != 0 {
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,8 +1290,8 @@ status:
image: busybox
script: 'exit 0'
conditions:
- message: "Invalid task result reference: Could not find result with name result2 for task task1"
reason: InvalidTaskResultReference
- message: "Tasks Completed: 2 (Failed: 1, Cancelled 0), Skipped: 0"
reason: Failed
status: "False"
type: Succeeded
childReferences:
Expand All @@ -1307,15 +1307,15 @@ status:
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-missing-results", []string{}, true)
reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-missing-results", []string{}, false)
if reconciledRun.Status.CompletionTime == nil {
t.Errorf("Expected a CompletionTime on invalid PipelineRun but was nil")
}

// The PipelineRun should be marked as failed due to InvalidTaskResultReference.
// The PipelineRun should be marked as failed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the reason for failure will be available at TaskRun level, is that right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also concerned about not showing the exact reason for the failure. When I checked in the implementation inside the GetPipelineConditionStatus function, it gave status based on failed, and success. So it is more generalized.
The status will not be available at the TaskRun level as TaskRun will not be created.
Do you have any suggestions to deal with this in a better way so that we can have a proper reason for the failure?
Earlier we were failing right away so we were getting the exact error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea that I can think of is to have ValidationFailed as a map where it will store failed tasks along with the reason and inside the GetPipelineConditionStatus function we can have a check for the ValidationFailed and add the status accordingly.
If this looks good, I can try the implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question that comes with this approach is when setting the reason in GetPipelineConditionStatus is how to decide the reason for the multiple Validation Failed Task.
May be we can set this to a generic error like https://github.com/tektoncd/pipeline/blob/main/pkg/apis/pipeline/v1/pipelinerun_types.go#L385

if d := cmp.Diff(expectedPipelineRun, reconciledRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta,
ignoreStartTime, ignoreCompletionTime, ignoreProvenance); d != "" {
t.Errorf("Expected to see PipelineRun run marked as failed with the reason: InvalidTaskResultReference. Diff %s", diff.PrintWantGot(d))
t.Errorf("Expected to see PipelineRun run marked as failed. Diff %s", diff.PrintWantGot(d))
}

// Check that the expected TaskRun was created
Expand Down
60 changes: 34 additions & 26 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (t *ResolvedPipelineTask) EvaluateCEL() error {

// isDone returns true only if the task is skipped, succeeded or failed
func (t ResolvedPipelineTask) isDone(facts *PipelineRunFacts) bool {
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure()
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure() || t.isValidationFailed(facts.ValidationFailedTask)
}

// IsRunning returns true only if the task is neither succeeded, cancelled nor failed
Expand Down Expand Up @@ -218,6 +218,16 @@ func (t ResolvedPipelineTask) isFailure() bool {
return t.haveAnyTaskRunsFailed() && isDone
}

// isValidationFailed return true if the task is failed at the validation step
func (t ResolvedPipelineTask) isValidationFailed(ftasks []*ResolvedPipelineTask) bool {
for _, ftask := range ftasks {
if ftask.ResolvedTask == t.ResolvedTask {
return true
}
}
return false
}

// isCancelledForTimeOut returns true only if the run is cancelled due to PipelineRun-controlled timeout
// If the PipelineTask has a Matrix, isCancelled returns true if any run is cancelled due to PipelineRun-controlled timeout and all other runs are done.
func (t ResolvedPipelineTask) isCancelledForTimeOut() bool {
Expand Down Expand Up @@ -339,7 +349,7 @@ func (t *ResolvedPipelineTask) skip(facts *PipelineRunFacts) TaskSkipStatus {
var skippingReason v1.SkippingReason

switch {
case facts.isFinalTask(t.PipelineTask.Name) || t.isScheduled():
case facts.isFinalTask(t.PipelineTask.Name) || t.isScheduled() || t.isValidationFailed(facts.ValidationFailedTask):
skippingReason = v1.None
case facts.IsStopping():
skippingReason = v1.StoppingSkip
Expand Down Expand Up @@ -825,31 +835,29 @@ func isCustomRunCancelledByPipelineRunTimeout(cr *v1beta1.CustomRun) bool {
// CheckMissingResultReferences returns an error if it is missing any result references.
// Missing result references can occur if task fails to produce a result but has
// OnError: continue (ie TestMissingResultWhenStepErrorIsIgnored)
func CheckMissingResultReferences(pipelineRunState PipelineRunState, targets PipelineRunState) error {
for _, target := range targets {
for _, resultRef := range v1.PipelineTaskResultRefs(target.PipelineTask) {
referencedPipelineTask, ok := pipelineRunState.ToMap()[resultRef.PipelineTask]
if !ok {
return fmt.Errorf("Result reference error: Could not find ref \"%s\" in internal pipelineRunState", resultRef.PipelineTask)
func CheckMissingResultReferences(pipelineRunState PipelineRunState, target *ResolvedPipelineTask) error {
for _, resultRef := range v1.PipelineTaskResultRefs(target.PipelineTask) {
referencedPipelineTask, ok := pipelineRunState.ToMap()[resultRef.PipelineTask]
if !ok {
return fmt.Errorf("Result reference error: Could not find ref \"%s\" in internal pipelineRunState", resultRef.PipelineTask)
}
if referencedPipelineTask.IsCustomTask() {
if len(referencedPipelineTask.CustomRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length CustomRuns", resultRef.PipelineTask)
}
if referencedPipelineTask.IsCustomTask() {
if len(referencedPipelineTask.CustomRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length CustomRuns", resultRef.PipelineTask)
}
customRun := referencedPipelineTask.CustomRuns[0]
_, err := findRunResultForParam(customRun, resultRef)
if err != nil {
return err
}
} else {
if len(referencedPipelineTask.TaskRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length TaskRuns", resultRef.PipelineTask)
}
taskRun := referencedPipelineTask.TaskRuns[0]
_, err := findTaskResultForParam(taskRun, resultRef)
if err != nil {
return err
}
customRun := referencedPipelineTask.CustomRuns[0]
_, err := findRunResultForParam(customRun, resultRef)
if err != nil {
return err
}
} else {
if len(referencedPipelineTask.TaskRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length TaskRuns", resultRef.PipelineTask)
}
taskRun := referencedPipelineTask.TaskRuns[0]
_, err := findTaskResultForParam(taskRun, resultRef)
if err != nil {
return err
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type PipelineRunFacts struct {
// The skip data is sensitive to changes in the state. The ResetSkippedCache method
// can be used to clean the cache and force re-computation when needed.
SkipCache map[string]TaskSkipStatus

// ValidationFailedTask are the tasks for which taskrun is not created as they
// never got added to the execution i.e. they failed in the validation step. One of
// the case of failing at the validation is during CheckMissingResultReferences method
// Tasks in ValidationFailedTask is added in method runNextSchedulableTask
ValidationFailedTask []*ResolvedPipelineTask
}

// PipelineRunTimeoutsState records information about start times and timeouts for the PipelineRun, so that the PipelineRunFacts
Expand Down Expand Up @@ -358,7 +364,7 @@ func (state PipelineRunState) getNextTasks(candidateTasks sets.String) []*Resolv
func (facts *PipelineRunFacts) IsStopping() bool {
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
if t.isFailure() && t.PipelineTask.OnError != v1.PipelineTaskContinue {
if (t.isFailure() || t.isValidationFailed(facts.ValidationFailedTask)) && t.PipelineTask.OnError != v1.PipelineTaskContinue {
return true
}
}
Expand Down Expand Up @@ -732,6 +738,8 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount {
} else {
s.Failed++
}
case t.isValidationFailed(facts.ValidationFailedTask):
s.Failed++
// increment skipped and skipped due to timeout counters since the task was skipped due to the pipeline, tasks, or finally timeout being reached before the task was launched
case t.Skip(facts).SkippingReason == v1.PipelineTimedOutSkip ||
t.Skip(facts).SkippingReason == v1.TasksTimedOutSkip ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,13 @@ func TestCheckMissingResultReferences(t *testing.T) {
wantErr: "Result reference error: Internal result ref \"lTask\" has zero-length TaskRuns",
}} {
t.Run(tt.name, func(t *testing.T) {
err := CheckMissingResultReferences(tt.pipelineRunState, tt.targets)
var err error
for _, target := range tt.targets {
tmpErr := CheckMissingResultReferences(tt.pipelineRunState, target)
if tmpErr != nil {
err = tmpErr
}
}
if (err != nil) && err.Error() != tt.wantErr {
t.Errorf("CheckMissingResultReferences() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
122 changes: 122 additions & 0 deletions test/pipelinefinally_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,98 @@ spec:
}
}

func TestPipelineLevelFinally_OneDAGNotProducingResult_SecondDAGUsingResult_Failure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, namespace := setup(ctx, t)
knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf)
defer tearDown(ctx, t, c, namespace)

successTask := getSuccessTask(t, namespace)
successTask.Spec.Results = append(successTask.Spec.Results, v1.TaskResult{
Name: "result",
})
if _, err := c.V1TaskClient.Create(ctx, successTask, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create final Task: %s", err)
}

taskClaimingResultProductionButNotProducing := getSuccessTaskClaimProducingResultButNotProducing(t, namespace)
if _, err := c.V1TaskClient.Create(ctx, taskClaimingResultProductionButNotProducing, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task claiming result production but not producing task results: %s", err)
}

taskConsumingResultInParam := getTaskConsumingResults(t, namespace, "dagtask1-result")
if _, err := c.V1TaskClient.Create(ctx, taskConsumingResultInParam, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task consuming task results in param: %s", err)
}

pipeline := parse.MustParseV1Pipeline(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
finally:
- name: finaltask1
taskRef:
name: %s
tasks:
- name: dagtask1
taskRef:
name: %s
- name: dagtaskconsumingdagtask1
params:
- name: dagtask1-result
value: $(tasks.dagtask1.results.result)
taskRef:
name: %s
`, helpers.ObjectNameForTest(t), namespace, successTask.Name, taskClaimingResultProductionButNotProducing.Name, taskConsumingResultInParam.Name))
if _, err := c.V1PipelineClient.Create(ctx, pipeline, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline: %s", err)
}

pipelineRun := getPipelineRun(t, namespace, pipeline.Name)
if _, err := c.V1PipelineRunClient.Create(ctx, pipelineRun, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline Run `%s`: %s", pipelineRun.Name, err)
}

if err := WaitForPipelineRunState(ctx, c, pipelineRun.Name, timeout, PipelineRunFailed(pipelineRun.Name), "PipelineRunFailed", v1Version); err != nil {
t.Fatalf("Waiting for PipelineRun %s to fail: %v", pipelineRun.Name, err)
}

taskrunList, err := c.V1TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: "tekton.dev/pipelineRun=" + pipelineRun.Name})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", pipelineRun.Name, err)
}

// expecting taskRuns for finaltask1 and dagtask
expectedTaskRunsCount := 2
if len(taskrunList.Items) != expectedTaskRunsCount {
var s []string
for _, n := range taskrunList.Items {
s = append(s, n.Labels["tekton.dev/pipelineTask"])
}
t.Fatalf("Error retrieving TaskRuns for PipelineRun %s. Expected %d taskRuns and found %d taskRuns for: %s",
pipelineRun.Name, expectedTaskRunsCount, len(taskrunList.Items), strings.Join(s, ", "))
}

// verify dag task failed, parallel dag task succeeded, and final task succeeded
for _, taskrunItem := range taskrunList.Items {
switch n := taskrunItem.Labels["tekton.dev/pipelineTask"]; {
case n == "dagtask1":
if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess", v1Version); err != nil {
t.Errorf("Error waiting for TaskRun to succeed: %v", err)
}
case n == "finaltask1":
if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess", v1Version); err != nil {
t.Errorf("Error waiting for TaskRun to succeed: %v", err)
}
default:
t.Fatalf("Found unexpected taskRun %s", n)
}
}
}

func getSuccessTask(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
Expand Down Expand Up @@ -760,6 +852,36 @@ spec:
`, helpers.ObjectNameForTest(t), namespace))
}

func getSuccessTaskClaimProducingResultButNotProducing(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
steps:
- image: mirror.gcr.io/alpine
script: echo -n "Hello"
results:
- name: result
`, helpers.ObjectNameForTest(t), namespace))
}

func getTaskConsumingResults(t *testing.T, namespace string, paramName string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
steps:
- image: mirror.gcr.io/alpine
script: 'echo "Content of param: $(params.%s)" '
params:
- name: %s
`, helpers.ObjectNameForTest(t), namespace, paramName, paramName))
}

func getDelaySuccessTaskProducingResults(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
Expand Down
Loading