Skip to content

Commit

Permalink
Refactor the way timeouts are handled
Browse files Browse the repository at this point in the history
`{Task,Pipeline}Run` now handle timeouts via `EnqueueAfter` on the workqueue.

`pkg/timeout` is now removed.

We now have consistent `GetTimeout(ctx)` methods on types.
  • Loading branch information
mattmoor authored and tekton-robot committed Nov 9, 2020
1 parent 092a598 commit 8eaaeaa
Show file tree
Hide file tree
Showing 37 changed files with 2,727 additions and 913 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/tektoncd/pipeline
go 1.13

require (
cloud.google.com/go/storage v1.11.0
cloud.google.com/go/storage v1.11.0 // indirect
github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher v0.0.0-20191203181535-308b93ad1f39
github.com/cloudevents/sdk-go/v2 v2.1.0
github.com/ghodss/yaml v1.0.0
Expand All @@ -22,11 +22,11 @@ require (
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
gomodules.xyz/jsonpatch/v2 v2.1.0
google.golang.org/api v0.31.0
k8s.io/api v0.18.8
k8s.io/apimachinery v0.19.0
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/code-generator v0.18.8
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29
knative.dev/pkg v0.0.0-20200922164940-4bf40ad82aab
)
Expand Down
15 changes: 15 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1

import (
"context"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
Expand Down Expand Up @@ -98,6 +99,15 @@ func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
}

func (pr *PipelineRun) GetTimeout(ctx context.Context) time.Duration {
// Use the platform default is no timeout is set
if pr.Spec.Timeout == nil {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
return defaultTimeout * time.Minute
}
return pr.Spec.Timeout.Duration
}

// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun
func (pr *PipelineRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name}
Expand Down
11 changes: 7 additions & 4 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package v1beta1

import (
"context"
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
apisconfig "github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -404,11 +406,11 @@ func (tr *TaskRun) IsCancelled() bool {
}

// HasTimedOut returns true if the TaskRun runtime is beyond the allowed timeout
func (tr *TaskRun) HasTimedOut() bool {
func (tr *TaskRun) HasTimedOut(ctx context.Context) bool {
if tr.Status.StartTime.IsZero() {
return false
}
timeout := tr.GetTimeout()
timeout := tr.GetTimeout(ctx)
// If timeout is set to 0 or defaulted to 0, there is no timeout.
if timeout == apisconfig.NoTimeoutDuration {
return false
Expand All @@ -417,10 +419,11 @@ func (tr *TaskRun) HasTimedOut() bool {
return runtime > timeout
}

func (tr *TaskRun) GetTimeout() time.Duration {
func (tr *TaskRun) GetTimeout(ctx context.Context) time.Duration {
// Use the platform default is no timeout is set
if tr.Spec.Timeout == nil {
return apisconfig.DefaultTimeoutMinutes * time.Minute
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
return defaultTimeout * time.Minute
}
return tr.Spec.Timeout.Duration
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/pipeline/v1beta1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1_test

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -329,7 +330,7 @@ func TestHasTimedOut(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.taskRun.HasTimedOut()
result := tc.taskRun.HasTimedOut(context.Background())
if d := cmp.Diff(result, tc.expectedStatus); d != "" {
t.Fatalf(diff.PrintWantGot(d))
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import (
resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/timeout"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
"knative.dev/pkg/tracker"
)
Expand All @@ -55,7 +56,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
pipelineInformer := pipelineinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := timeout.NewHandler(ctx.Done(), logger)
metrics, err := NewRecorder()
if err != nil {
logger.Errorf("Failed to create pipelinerun metrics recorder %v", err)
Expand All @@ -72,7 +72,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
metrics: metrics,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
Expand All @@ -86,8 +85,12 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

timeoutHandler.SetCallbackFunc(impl.EnqueueKey)
go timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset)
c.snooze = func(acc kmeta.Accessor, amnt time.Duration) {
impl.EnqueueKeyAfter(types.NamespacedName{
Namespace: acc.GetNamespace(),
Name: acc.GetName(),
}, amnt)
}

logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
25 changes: 12 additions & 13 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun"
tresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/timeout"
"github.com/tektoncd/pipeline/pkg/workspace"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand All @@ -56,6 +55,7 @@ import (
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/tracker"
Expand Down Expand Up @@ -121,9 +121,10 @@ type Reconciler struct {
conditionLister listersv1alpha1.ConditionLister
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
timeoutHandler *timeout.Handler
metrics *Recorder
pvcHandler volumeclaim.PvcHandler

snooze func(kmeta.Accessor, time.Duration)
}

var (
Expand All @@ -149,8 +150,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
pr.Status.StartTime = &pr.CreationTimestamp
}

// start goroutine to track pipelinerun timeout only startTime is not set
go c.timeoutHandler.Wait(pr.GetNamespacedName(), *pr.Status.StartTime, getPipelineRunTimeout(ctx, pr))
// Emit events. During the first reconcile the status of the PipelineRun may change twice
// from not Started to Started and then to Running, so we need to sent the event here
// and at the end of 'Reconcile' again.
Expand Down Expand Up @@ -185,7 +184,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Failed to delete StatefulSet for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
c.timeoutHandler.Release(pr.GetNamespacedName())
if err := c.updateTaskRunsStatusDirectly(pr); err != nil {
logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
Expand Down Expand Up @@ -217,6 +215,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Error while syncing the pipelinerun status: %v", err.Error())
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
defer func() {
if pr.Status.StartTime == nil {
return
}
// Compute the time since the task started.
elapsed := time.Since(pr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
c.snooze(pr, pr.GetTimeout(ctx)-elapsed)
}()

// Reconcile this copy of the pipelinerun and then write back any status or label
// updates regardless of whether the reconciliation errored out.
Expand Down Expand Up @@ -797,14 +804,6 @@ func combineTaskRunAndTaskSpecAnnotations(pr *v1beta1.PipelineRun, pipelineTask
return annotations
}

func getPipelineRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun) metav1.Duration {
if pr.Spec.Timeout == nil {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
return metav1.Duration{Duration: defaultTimeout * time.Minute}
}
return *pr.Spec.Timeout
}

func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

Expand Down
Loading

0 comments on commit 8eaaeaa

Please sign in to comment.