Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where PipelineRun emits task results that were never produced #3472

Merged
merged 1 commit into from Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,20 @@ references the `outputValue` `Result` emitted by the `calculate-sum` `Task`.

For an end-to-end example, see [`Results` in a `PipelineRun`](../examples/v1beta1/pipelineruns/pipelinerun-results.yaml).

A `Pipeline Result` is not emitted if any of the following are true:
- A `PipelineTask` referenced by the `Pipeline Result` failed. The `PipelineRun` will also
have failed.
- A `PipelineTask` referenced by the `Pipeline Result` was skipped.
- A `PipelineTask` referenced by the `Pipeline Result` didn't emit the referenced `Task Result`. This
should be considered a bug in the `Task` and [may fail a `PipelineTask` in future](https://github.com/tektoncd/pipeline/issues/3497).
- The `Pipeline Result` uses a variable that doesn't point to an actual `PipelineTask`. This will
result in an `InvalidTaskResultReference` validation error during `PipelineRun` execution.
- The `Pipeline Result` uses a variable that doesn't point to an actual result in a `PipelineTask`.
This will cause an `InvalidTaskResultReference` validation error during `PipelineRun` execution.

**Note:** Since a `Pipeline Result` can contain references to multiple `Task Results`, if any of those
`Task Result` references are invalid the entire `Pipeline Result` is not emitted.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think as a user i might find it hard to reason about what will happen based on this description, maybe it would be clearer to flip it around a bit, and i think it might be more clear to a user to say something like "task" or "pipeline task" instead of "task run":

PipelineRun execution will fail if a pipeline result requires any of the following:

  • A result from a Task that is skipped
  • A result from a Task that it declares but when executed it doesn't actually emit it. This should be considered a bug in the Task and may fail a TaskRun in future.
  • A result from a Task that it does not declare
  • A result from a Task that does not exit in the Pipeline

I didn't include these ones b/c i'm not sure if they WILL have that same error:

  • A TaskRun referenced by the Pipeline Result failed. The PipelineRun will also have failed. <-- since execution will stop in this case, i dont think there would be an error related to the result?

Copy link
Author

@ghost ghost Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've updated the section to reference "Pipeline Task" throughout instead of "TaskRun".

Inre: your comments on PipelineRun execution behaviour, I think there are at least two discrete questions we're trying to answer in this secion of the doc:

  1. What is the final "shape" of a pipeline result if it contains an invalid reference?
  2. What happens to pipelinerun execution when pipeline results contain these invalid references?

I think that (1) is answered by the existing PR. Personally I think that (2) is independent of the results and shouldn't be documented in a section titled Emitting Results from a Pipeline since it's orthogonal: The Pipeline Results don't have any bearing on the execution status of the PipelineRun. That doesn't mean we shouldn't document it but it does seem to me a little out-of-scope to try and document PipelineRun execution state in a section dedicated to result data.

PipelineRun execution will fail if a pipeline result requires any of the following:

  • A result from a Task that is skipped

PipelineRun execution doesn't fail when a Task is skipped and a result references it. This is kinda an example of why I'm a tad hesitant to include documentation about pipelinerun execution state in a section dedicated to result data. I think this will come back to bite us if we try and document it here. One day we'll change the behaviour of PipelineRuns and we'll have to remember that we documented pipelinerun execution in multiple places that aren't pipelineruns.md and then comb back through to make sure all those places reflect the changed behaviour. Personally I'd prefer that we keep the documentation in pipelines.md as tightly focused as possible on how Pipeline Results behave.


## Configuring the `Task` execution order

You can connect `Tasks` in a `Pipeline` so that they execute in a Directed Acyclic Graph (DAG).
Expand Down
47 changes: 9 additions & 38 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"path/filepath"
"reflect"
"strconv"
"strings"
"time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -176,7 +175,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// and may not have had all of the assumed default specified.
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

c.updatePipelineResults(ctx, pr, getPipelineFunc)
if _, pipelineSpec, err := resources.GetPipelineData(ctx, pr, getPipelineFunc); err != nil {
msg := fmt.Sprintf("Failed to get Pipeline Spec to process Pipeline Results for PipelineRun %s/%s: %v", pr.Namespace, pr.Name, err)
logger.Error(msg)
logger.Warnf("An error processing Pipeline Results overwrites existing Succeeded Condition for PipelineRun %s/%s: %v", pr.Namespace, pr.Name, pr.Status.GetCondition(apis.ConditionSucceeded))
pr.Status.MarkFailed(ReasonCouldntGetPipeline, msg)
afrittoli marked this conversation as resolved.
Show resolved Hide resolved
} else {
pr.Status.PipelineResults = resources.ApplyTaskResultsToPipelineResults(pipelineSpec.Results, pr.Status.TaskRuns)
}

if err := artifacts.CleanupArtifactStorage(ctx, pr, c.KubeClientSet); err != nil {
logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
Expand Down Expand Up @@ -317,21 +324,6 @@ func (c *Reconciler) resolvePipelineState(
return pst, nil
}

func (c *Reconciler) updatePipelineResults(ctx context.Context, pr *v1beta1.PipelineRun, getPipelineFunc resources.GetPipeline) {
logger := logging.FromContext(ctx)

_, pipelineSpec, err := resources.GetPipelineData(ctx, pr, getPipelineFunc)
if err != nil {
logger.Errorf("Failed to determine Pipeline spec to use for pipelinerun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntGetPipeline,
"Error retrieving pipeline for pipelinerun %s/%s: %s",
pr.Namespace, pr.Name, err)
return
}
resolvedResultRefs := resources.ResolvePipelineResultRefs(pr.Status, pipelineSpec.Results)
pr.Status.PipelineResults = getPipelineRunResults(pipelineSpec, resolvedResultRefs)
}

func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, getPipelineFunc resources.GetPipeline) error {
logger := logging.FromContext(ctx)
// We may be reading a version of the object that was stored at an older version
Expand Down Expand Up @@ -622,27 +614,6 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
return nil
}

func getPipelineRunResults(pipelineSpec *v1beta1.PipelineSpec, resolvedResultRefs resources.ResolvedResultRefs) []v1beta1.PipelineRunResult {
var results []v1beta1.PipelineRunResult
stringReplacements := map[string]string{}

for _, resolvedResultRef := range resolvedResultRefs {
replaceTarget := fmt.Sprintf("%s.%s.%s.%s", v1beta1.ResultTaskPart, resolvedResultRef.ResultReference.PipelineTask, v1beta1.ResultResultPart, resolvedResultRef.ResultReference.Result)
stringReplacements[replaceTarget] = resolvedResultRef.Value.StringVal
}
for _, result := range pipelineSpec.Results {
in := result.Value
for k, v := range stringReplacements {
in = strings.Replace(in, fmt.Sprintf("$(%s)", k), v, -1)
}
results = append(results, v1beta1.PipelineRunResult{
Name: result.Name,
Value: in,
})
}
return results
}

func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1beta1.PipelineRun) error {
for taskRunName := range pr.Status.TaskRuns {
// TODO(dibyom): Add conditionCheck statuses here
Expand Down
75 changes: 75 additions & 0 deletions pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package resources

import (
"fmt"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
)

// ApplyParameters applies the params from a PipelineRun.Params to a PipelineSpec.
Expand Down Expand Up @@ -84,6 +87,8 @@ func ApplyTaskResults(targets PipelineRunState, resolvedResultRefs ResolvedResul
}
}

// ApplyWorkspaces replaces workspace variables in the given pipeline spec with their
// concrete values.
func ApplyWorkspaces(p *v1beta1.PipelineSpec, pr *v1beta1.PipelineRun) *v1beta1.PipelineSpec {
p = p.DeepCopy()
replacements := map[string]string{}
Expand Down Expand Up @@ -124,3 +129,73 @@ func replaceParamValues(params []v1beta1.Param, stringReplacements map[string]st
}
return params
}

// ApplyTaskResultsToPipelineResults applies the results of completed TasksRuns to a Pipeline's
// list of PipelineResults, returning the computed set of PipelineRunResults. References to
// non-existent TaskResults or failed TaskRuns result in a PipelineResult being considered invalid
// and omitted from the returned slice. A nil slice is returned if no results are passed in or all
// results are invalid.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i apologize for probably having already asked this, but it seems weird to me that we just ignore the invalid values, feels like we should be returning an error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's quite right to say that we ignore the invalid values. We return a nil pointer if the value is invalid - in other words, no valid value was found for the result reference.

I don't totally agree that this is an error state but I don't feel strongly enough to dispute it. What should we do with that error when it's returned? In the old code it was silently ignored.

Copy link
Member

@pritidesai pritidesai Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like we should be returning an error?

Returning an error might break pipelineRuns. There was no error reported in logs or to users at all. If we introduce error now, pipelineRuns which used to be successful will start failing with this error. And at the same time, the least we could do is log the error if needed but still exit with success.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should eventually move the execution of this logic to before the pipelinerun state is set.
That way could still fail the pipeline if results are broken or invalid in some way.
Maybe something for next release.

func ApplyTaskResultsToPipelineResults(results []v1beta1.PipelineResult, taskRunStatuses map[string]*v1beta1.PipelineRunTaskRunStatus) []v1beta1.PipelineRunResult {
taskStatuses := map[string]*v1beta1.PipelineRunTaskRunStatus{}
for _, trStatus := range taskRunStatuses {
taskStatuses[trStatus.PipelineTaskName] = trStatus
}

var runResults []v1beta1.PipelineRunResult = nil
stringReplacements := map[string]string{}
for _, pipelineResult := range results {
variablesInPipelineResult, _ := v1beta1.GetVarSubstitutionExpressionsForPipelineResult(pipelineResult)
validPipelineResult := true
for _, variable := range variablesInPipelineResult {
if _, isMemoized := stringReplacements[variable]; isMemoized {
continue
}
if resultValue := taskResultValue(variable, taskStatuses); resultValue != nil {
stringReplacements[variable] = *resultValue
} else {
validPipelineResult = false
}
}
if validPipelineResult {
finalValue := pipelineResult.Value
for variable, value := range stringReplacements {
v := fmt.Sprintf("$(%s)", variable)
finalValue = strings.Replace(finalValue, v, value, -1)
}
runResults = append(runResults, v1beta1.PipelineRunResult{
Name: pipelineResult.Name,
Value: finalValue,
})
}
}

return runResults
}

// taskResultValue returns a pointer to the result value for a given task result variable. A nil
// pointer is returned if the variable is invalid for any reason.
func taskResultValue(variable string, taskStatuses map[string]*v1beta1.PipelineRunTaskRunStatus) *string {
variableParts := strings.Split(variable, ".")
if len(variableParts) != 4 || variableParts[0] != "tasks" || variableParts[2] != "results" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: tasks and results could be constants and shared with other result related logic (not part of this PR), can be done seperately

return nil
}

taskName, resultName := variableParts[1], variableParts[3]

status, taskExists := taskStatuses[taskName]
if !taskExists || status.Status == nil {
return nil
}

cond := status.Status.GetCondition(apis.ConditionSucceeded)
if cond == nil || cond.Status != corev1.ConditionTrue {
return nil
}

for _, trResult := range status.Status.TaskRunResults {
if trResult.Name == resultName {
return &trResult.Value
}
}
return nil
}
Loading