Split reconcile into prepare and reconcile
The current "reconcile" function does a lot of things, which means
it's hard to test properly, it's hard to follow the logic and
it's hard to handle all possible errors and events properly.

This splits reconcile into two parts, the first one that deals
preparing and validating the taskrun and all the resources it
depends on, the second one that actually reconciles the TaskRun
by creating the pod if required and updating the TaskRun from
the pod.

This adds events that were missing for error situations that
happen during preparation and validation.

The new events go past the burst limit of the event
broadcaster, which delays some events a lot for a pipeline
with a few tasks. Tweaking the default burst rate and QPS is
required to pass CI. We might want to add those configs to a
dedicated config map in future.
afrittoli authored and tekton-robot committed Apr 24, 2020
1 parent 7ffb7da commit c11c6af
Showing 10 changed files with 165 additions and 72 deletions.
4 changes: 4 additions & 0 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ const (
// config error of container
ReasonCreateContainerConfigError = "CreateContainerConfigError"

// ReasonPodCreationFailed indicates that the reason for the current condition
// is that the creation of the pod backing the TaskRun failed
ReasonPodCreationFailed = "PodCreationFailed"

// ReasonSucceeded indicates that the reason for the finished status is that all of the steps
// completed successfully
ReasonSucceeded = "Succeeded"
18 changes: 14 additions & 4 deletions pkg/reconciler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ import (

const (
// EventReasonSucceded is the reason set for events about successful completion of TaskRuns / PipelineRuns
EventReasonSucceded = "Succeeded"
// EventReasonFailed is the reason set for events about unsuccessful completion of TaskRuns / PipelineRuns
EventReasonFailed = "Failed"

// EmitEvent emits success or failed event for object
// if afterCondition is different from beforeCondition
func EmitEvent(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
if beforeCondition != afterCondition && afterCondition != nil {
// Create events when the obj result is in.
if afterCondition.Status == corev1.ConditionTrue {
c.Event(object, corev1.EventTypeNormal, "Succeeded", afterCondition.Message)
} else if afterCondition.Status == corev1.ConditionFalse {
c.Event(object, corev1.EventTypeWarning, "Failed", afterCondition.Message)
switch afterCondition.Status {
case corev1.ConditionTrue:
c.Event(object, corev1.EventTypeNormal, EventReasonSucceded, afterCondition.Message)
case corev1.ConditionUnknown:
c.Event(object, corev1.EventTypeNormal, afterCondition.Reason, afterCondition.Message)
case corev1.ConditionFalse:
c.Event(object, corev1.EventTypeWarning, EventReasonFailed, afterCondition.Message)
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var (
// converge the two. It then updates the Status block of the Pipeline Run
// resource with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Logger.Infof("Reconciling %v", time.Now())
c.Logger.Infof("Reconciling key %s at %v", key, time.Now())

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
8 changes: 7 additions & 1 deletion pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ func NewBase(opt Options, controllerAgentName string, images pipeline.Images) *B
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()

correlatorOptions := record.CorrelatorOptions{
// The default burst size is 25
BurstSize: 50,
QPS: 1,
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(correlatorOptions)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: opt.KubeClientSet.CoreV1().Events("")})

107 changes: 70 additions & 37 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ var _ controller.Reconciler = (*Reconciler)(nil)
// converge the two. It then updates the Status block of the Task Run
// resource with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// In case of reconcile errors, we store the error in a multierror, attempt
// to update, and return the original error combined with any update error
var merr error

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -182,38 +179,67 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
return multierror.Append(err, c.updateStatusLabelsAndAnnotations(tr, original)).ErrorOrNil()

// prepare fetches all required resources, validates them together with the
// taskrun, runs API convertions. Errors that come out of prepare are
// permanent one, so in case of error we update, emit events and return
taskSpec, rtr, err := c.prepare(ctx, tr)
if err != nil {
c.Logger.Errorf("TaskRun prepare error: %v", err.Error())
after := tr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, nil, after, tr)
// We only return an error if update failed, otherwise we don't want to
// reconcile an invalid TaskRun anymore
return c.updateStatusLabelsAndAnnotations(tr, original)

// Store the condition before reconcile
before := tr.Status.GetCondition(apis.ConditionSucceeded)

// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
if err := c.reconcile(ctx, tr); err != nil {
if err = c.reconcile(ctx, tr, taskSpec, rtr); err != nil {
c.Logger.Errorf("Reconcile error: %v", err.Error())
merr = multierror.Append(merr, err)
return multierror.Append(merr, c.updateStatusLabelsAndAnnotations(tr, original)).ErrorOrNil()

// Emit events (only when ConditionSucceeded was changed)
after := tr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, tr)

return multierror.Append(err, c.updateStatusLabelsAndAnnotations(tr, original)).ErrorOrNil()

func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error {
// `prepare` fetches resources the taskrun depends on, runs validation and convertion
// It may report errors back to Reconcile, it updates the taskrun status in case of
// error but it does not sync updates back to etcd. It does not emit events.
// All errors returned by `prepare` are always handled by `Reconcile`, so they don't cause
// the key to be re-queued directly. Once we start using `PermanentErrors` code in
// `prepare` will be able to control which error is handled by `Reconcile` and which is not
// See for details.
// `prepare` returns spec and resources. In future we might store
// them in the TaskRun.Status so we don't need to re-run `prepare` at every
// reconcile (see
func (c *Reconciler) prepare(ctx context.Context, tr *v1alpha1.TaskRun) (*v1alpha1.TaskSpec, *resources.ResolvedTaskResources, error) {
// We may be reading a version of the object that was stored at an older version
// and may not have had all of the assumed default specified.

if err := tr.ConvertTo(ctx, &v1beta1.TaskRun{}); err != nil {
if ce, ok := err.(*v1beta1.CannotConvertError); ok {
return nil
return nil, nil, nil
return err
return nil, nil, err

getTaskFunc, kind := c.getTaskFunc(tr)
taskMeta, taskSpec, err := resources.GetTaskData(ctx, tr, getTaskFunc)
if err != nil {
if ce, ok := err.(*v1beta1.CannotConvertError); ok {
return nil
c.Logger.Errorf("Failed to determine Task spec to use for taskrun %s: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedResolution, err)
return nil
return nil, nil, err

// Store the fetched TaskSpec on the TaskRun for auditing
Expand Down Expand Up @@ -253,19 +279,19 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
if err != nil {
c.Logger.Errorf("Failed to resolve references for taskrun %s: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedResolution, err)
return nil
return nil, nil, err

if err := ValidateResolvedTaskResources(tr.Spec.Params, rtr); err != nil {
c.Logger.Errorf("TaskRun %q resources are invalid: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil
return nil, nil, err

if err := workspace.ValidateBindings(taskSpec.Workspaces, tr.Spec.Workspaces); err != nil {
c.Logger.Errorf("TaskRun %q workspaces are invalid: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil
return nil, nil, err

// Initialize the cloud events if at least a CloudEventResource is defined
Expand All @@ -279,8 +305,20 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
cloudevent.InitializeCloudEvents(tr, prs)

return taskSpec, rtr, nil

// `reconcile` creates the Pod associated to the TaskRun, and it pulls back status
// updates from the Pod to the TaskRun.
// It reports errors back to Reconcile, it updates the taskrun status in case of
// error but it does not sync updates back to etcd. It does not emit events.
// `reconcile` consumes spec and resources returned by `prepare`
func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun,
taskSpec *v1alpha1.TaskSpec, rtr *resources.ResolvedTaskResources) error {
// Get the TaskRun's Pod if it should have one. Otherwise, create the Pod.
var pod *corev1.Pod
var err error

if tr.Status.PodName != "" {
pod, err = c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
Expand All @@ -307,7 +345,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error

if pod == nil {
if tr.HasVolumeClaimTemplate() {
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(tr.Spec.Workspaces, tr.GetOwnerReference(), tr.Namespace); err != nil {
if err := c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(tr.Spec.Workspaces, tr.GetOwnerReference(), tr.Namespace); err != nil {
c.Logger.Errorf("Failed to create PVC for TaskRun %s: %v", tr.Name, err)
fmt.Errorf("Failed to create PVC for TaskRun %s workspaces correctly: %s",
Expand All @@ -316,6 +354,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error

taskRunWorkspaces := applyVolumeClaimTemplates(tr.Spec.Workspaces, tr.GetOwnerReference())
// This is used by createPod below. Changes to the Spec are not updated.
tr.Spec.Workspaces = taskRunWorkspaces

Expand All @@ -341,19 +380,14 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error

before := tr.Status.GetCondition(apis.ConditionSucceeded)

// Convert the Pod's status to the equivalent TaskRun Status.
tr.Status = podconvert.MakeTaskRunStatus(c.Logger, *tr, pod, *taskSpec)

if err := updateTaskRunResourceResult(tr, pod.Status); err != nil {
return err

after := tr.Status.GetCondition(apis.ConditionSucceeded)

reconciler.EmitEvent(c.Recorder, before, after, tr)
c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after)
c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, tr.Status.GetCondition(apis.ConditionSucceeded))

return nil
Expand Down Expand Up @@ -449,33 +483,32 @@ func (c *Reconciler) getTaskFunc(tr *v1alpha1.TaskRun) (resources.GetTask, v1alp

func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) {
var reason, msg string
var succeededStatus corev1.ConditionStatus
var msg string
if isExceededResourceQuotaError(err) {
succeededStatus = corev1.ConditionUnknown
reason = podconvert.ReasonExceededResourceQuota
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts)
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: podconvert.ReasonExceededResourceQuota,
Message: fmt.Sprintf("%s: %v", msg, err),
} else {
succeededStatus = corev1.ConditionFalse
reason = podconvert.ReasonCouldntGetTask
// The pod creation failed, not because of quota issues. The most likely
// reason is that something is wrong with the spec of the Task, that we could
// not check with validation before - i.e. pod template fields
msg = fmt.Sprintf("failed to create task run pod %q: %v. Maybe ", tr.Name, err)
if tr.Spec.TaskRef != nil {
msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
msg += fmt.Sprintf("missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
} else {
msg = "Invalid TaskSpec"
msg += "invalid TaskSpec"
tr.Status.MarkResourceFailed(podconvert.ReasonCouldntGetTask, errors.New(msg))
Type: apis.ConditionSucceeded,
Status: succeededStatus,
Reason: reason,
Message: fmt.Sprintf("%s: %v", msg, err),
c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err)
c.Logger.Errorf("Failed to create build pod for task %q: %v", tr.Name, err)
c.Logger.Error("Failed to create task run pod for task %q: %v", tr.Name, err)

// failTaskRun stops a TaskRun with the provided Reason
Expand Down

