Skip to content

Commit

Permalink
Replace snooze with NewRequeueKey
Browse files Browse the repository at this point in the history
  • Loading branch information
mattmoor committed Aug 6, 2021
1 parent 9d74f3f commit 011ebc8
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 73 deletions.
10 changes: 0 additions & 10 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package pipelinerun

import (
"context"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -35,12 +34,10 @@ import (
resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
)

Expand Down Expand Up @@ -88,13 +85,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

c.snooze = func(acc kmeta.Accessor, amnt time.Duration) {
impl.EnqueueKeyAfter(types.NamespacedName{
Namespace: acc.GetNamespace(),
Name: acc.GetName(),
}, amnt)
}

logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

Expand Down
23 changes: 11 additions & 12 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ type Reconciler struct {
cloudEventClient cloudevent.CEClient
metrics *Recorder
pvcHandler volumeclaim.PvcHandler

snooze func(kmeta.Accessor, time.Duration)
}

var (
Expand Down Expand Up @@ -221,23 +219,24 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Error while syncing the pipelinerun status: %v", err.Error())
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
defer func() {
if pr.Status.StartTime == nil {
return
}
// Compute the time since the task started.
elapsed := time.Since(pr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
c.snooze(pr, pr.GetTimeout(ctx)-elapsed)
}()

// Reconcile this copy of the pipelinerun and then write back any status or label
// updates regardless of whether the reconciliation errored out.
if err = c.reconcile(ctx, pr, getPipelineFunc); err != nil {
logger.Errorf("Reconcile error: %v", err.Error())
}

return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
if err = c.finishReconcileUpdateEmitEvents(ctx, pr, before, err); err != nil {
return err
}

if pr.Status.StartTime != nil {
// Compute the time since the task started.
elapsed := time.Since(pr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
return controller.NewRequeueAfter(pr.GetTimeout(ctx) - elapsed)
}
return nil
}

func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, pr *v1beta1.PipelineRun, beforeCondition *apis.Condition, previousError error) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,9 @@ func TestReconcileCancelledRunFinallyFailsTaskRunCancellation(t *testing.T) {
failingReactorActivated = false

err = c.Reconciler.Reconcile(testAssets.Ctx, "foo/test-pipeline-fails-to-cancel")
if err != nil {
if err == nil {
// No error is ok
} else if ok, _ := controller.IsRequeueKey(err); !ok { // Requeue is also fine.
t.Errorf("Expected to cancel TaskRun successfully!")
}

Expand Down Expand Up @@ -6070,6 +6072,8 @@ func (prt PipelineRunTest) reconcileRun(namespace, pipelineRunName string, wantE
if controller.IsPermanentError(reconcileError) != permanentError {
prt.Test.Fatalf("Expected the error to be permanent: %v but got: %v", permanentError, controller.IsPermanentError(reconcileError))
}
} else if ok, _ := controller.IsRequeueKey(reconcileError); ok {
// This is normal, it happens for timeouts when we otherwise successfully reconcile.
} else if reconcileError != nil {
prt.Test.Fatalf("Error reconciling: %s", reconcileError)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package taskrun

import (
"context"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -32,13 +31,11 @@ import (
"github.com/tektoncd/pipeline/pkg/pod"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
filteredpodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
)

Expand Down Expand Up @@ -86,13 +83,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

c.snooze = func(acc kmeta.Accessor, amnt time.Duration) {
impl.EnqueueKeyAfter(types.NamespacedName{
Namespace: acc.GetNamespace(),
Name: acc.GetName(),
}, amnt)
}

logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

Expand Down
23 changes: 11 additions & 12 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ type Reconciler struct {
entrypointCache podconvert.EntrypointCache
metrics *Recorder
pvcHandler volumeclaim.PvcHandler

snooze func(kmeta.Accessor, time.Duration)
}

// Check that our Reconciler implements taskrunreconciler.Interface
Expand Down Expand Up @@ -163,15 +161,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
err := c.failTaskRun(ctx, tr, v1beta1.TaskRunReasonTimedOut, message)
return c.finishReconcileUpdateEmitEvents(ctx, tr, before, err)
}
defer func() {
if tr.Status.StartTime == nil {
return
}
// Compute the time since the task started.
elapsed := time.Since(tr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
c.snooze(tr, tr.GetTimeout(ctx)-elapsed)
}()

// prepare fetches all required resources, validates them together with the
// taskrun, runs API convertions. Errors that come out of prepare are
Expand All @@ -194,7 +183,17 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
}

// Emit events (only when ConditionSucceeded was changed)
return c.finishReconcileUpdateEmitEvents(ctx, tr, before, err)
if err = c.finishReconcileUpdateEmitEvents(ctx, tr, before, err); err != nil {
return err
}

if tr.Status.StartTime != nil {
// Compute the time since the task started.
elapsed := time.Since(tr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
return controller.NewRequeueAfter(tr.GetTimeout(ctx) - elapsed)
}
return nil
}
func (c *Reconciler) stopSidecars(ctx context.Context, tr *v1beta1.TaskRun) (*corev1.Pod, error) {
logger := logging.FromContext(ctx)
Expand Down
76 changes: 48 additions & 28 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,9 @@ func TestReconcile_ExplicitDefaultSA(t *testing.T) {
c := testAssets.Controller
clients := testAssets.Clients

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -726,7 +728,9 @@ func TestReconcile_FeatureFlags(t *testing.T) {
}, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -811,7 +815,9 @@ func TestReconcile_CloudEvents(t *testing.T) {
t.Fatal(err)
}

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -1550,7 +1556,9 @@ func TestReconcile(t *testing.T) {
t.Fatal(err)
}

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -1620,7 +1628,9 @@ func TestReconcile_SetsStartTime(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error reconciling valid TaskRun but got %v", err)
}

Expand Down Expand Up @@ -1774,7 +1784,7 @@ func TestReconcileTaskRunWithPermanentError(t *testing.T) {
// Such TaskRun enters Reconciler and from within the isDone block, marks the run success so that
// reconciler does not keep trying to reconcile
if reconcileErr != nil {
t.Fatalf("Expected to see no error when reconciling TaskRun with Permanent Error but was not none")
t.Fatalf("Expected to see error when reconciling TaskRun with Permanent Error but was not none")
}

// Check actions
Expand Down Expand Up @@ -1877,7 +1887,9 @@ func TestReconcilePodUpdateStatus(t *testing.T) {
c := testAssets.Controller
clients := testAssets.Clients

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Fatalf("Unexpected error when Reconcile() : %v", err)
}
newTr, err := clients.Pipeline.TektonV1beta1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -1913,7 +1925,9 @@ func TestReconcilePodUpdateStatus(t *testing.T) {
// lister cache is update to reflect the result of the previous Reconcile.
testAssets.Informers.TaskRun.Informer().GetIndexer().Add(newTr)

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Fatalf("Unexpected error when Reconcile(): %v", err)
}

Expand Down Expand Up @@ -2178,13 +2192,10 @@ func TestHandlePodCreationError(t *testing.T) {
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
snooze: func(acc kmeta.Accessor, amnt time.Duration) {
t.Error("Unexpected call to snooze.")
},
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
}

testcases := []struct {
Expand Down Expand Up @@ -2393,7 +2404,9 @@ func TestReconcileCloudEvents(t *testing.T) {
t.Fatal(err)
}

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
// No error is ok.
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}

Expand Down Expand Up @@ -2611,7 +2624,9 @@ func TestReconcileValidDefaultWorkspace(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
// No error is ok.
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Expected no error reconciling valid TaskRun but got %v", err)
}

Expand Down Expand Up @@ -2734,7 +2749,9 @@ func TestReconcileValidDefaultWorkspaceOmittedOptionalWorkspace(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRunOmittingWorkspace)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRunOmittingWorkspace)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Unexpected reconcile error for TaskRun %q: %v", taskRunOmittingWorkspace.Name, err)
}

Expand Down Expand Up @@ -2963,7 +2980,9 @@ func TestReconcileWorkspaceWithVolumeClaimTemplate(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error reconciling valid TaskRun but got %v", err)
}

Expand Down Expand Up @@ -3231,13 +3250,10 @@ func TestFailTaskRun(t *testing.T) {
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
snooze: func(acc kmeta.Accessor, amnt time.Duration) {
t.Error("Unexpected call to snooze.")
},
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
}

err := c.failTaskRun(testAssets.Ctx, tc.taskRun, tc.reason, tc.message)
Expand Down Expand Up @@ -3378,7 +3394,9 @@ func TestPodAdoption(t *testing.T) {
}

// Reconcile the TaskRun. This creates a Pod.
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Error reconciling TaskRun. Got error %v", err)
}

Expand Down Expand Up @@ -3411,7 +3429,9 @@ func TestPodAdoption(t *testing.T) {
}

// Reconcile the TaskRun again.
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Error reconciling TaskRun again. Got error %v", err)
}

Expand Down

0 comments on commit 011ebc8

Please sign in to comment.