From 3dae89f89a4dc3498d4c93dd9c317ceda01eba16 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Wed, 17 Jul 2019 03:29:14 +0900 Subject: [PATCH] WIP: Cloud Event pipeline resource Implements in the TaskRun controller the logic to provide the cloud event pipeline resource. This commits puts together the API, cloud event helper and resource definition from four pull requests: - https://github.com/tektoncd/pipeline/pull/1090 - https://github.com/tektoncd/pipeline/pull/1091 - https://github.com/tektoncd/pipeline/pull/1092 TBD: Test for interaction with the image digest export It adds unit tests for the new code and one E2E YAML test. The YAML test runs a simple http server that can receive the cloudevent for test purposes. The list of cloud events to be sent is added to the TaskRun status and processed by the TaskRun controller once the pod associated to the TaskRun completes its execution. The `isDone` definition of the TaskRun is not altered, the reconciler checks for events to be sent once the TaskRun.isDone is true. Retries are not implemented yet in the sense that every scheduled event will be attempted exactly once, but it may be that those attempts happen across different invocations of Reconcile. Signed-off-by: Andrea Frittoli --- docs/resources.md | 63 ++++++ examples/taskruns/taskrun-cloud-event.yaml | 111 +++++++++ pkg/reconciler/v1alpha1/taskrun/controller.go | 2 + pkg/reconciler/v1alpha1/taskrun/taskrun.go | 77 ++++++- .../v1alpha1/taskrun/taskrun_test.go | 212 ++++++++++++++++++ test/builder/task.go | 25 +++ 6 files changed, 485 insertions(+), 5 deletions(-) create mode 100644 examples/taskruns/taskrun-cloud-event.yaml diff --git a/docs/resources.md b/docs/resources.md index 764874245cd..87c55fdcb58 100644 --- a/docs/resources.md +++ b/docs/resources.md @@ -52,6 +52,7 @@ The following `PipelineResources` are currently supported: - [Storage Resource](#storage-resource) - [GCS Storage Resource](#gcs-storage-resource) - [BuildGCS Storage Resource](#buildgcs-storage-resource) +- [Cloud Event Resource](#cloud-event-resource) ### Git Resource @@ -617,6 +618,68 @@ the container image [gcr.io/cloud-builders//gcs-fetcher](https://github.com/GoogleCloudPlatform/cloud-builders/tree/master/gcs-fetcher) does not support configuring secrets. +### Cloud Event Resource + +The Cloud Event Resource represents a [cloud event](https://github.com/cloudevents/spec) +that is sent to a target `URI` upon completion of a `TaskRun`. +The Cloud Event Resource sends Tekton specific events; the body of the event includes +the entire TaskRun spec plus status; the types of events defined for now are: + +- dev.tekton.event.task.unknown +- dev.tekton.event.task.successful +- dev.tekton.event.task.failed + +Cloud event resources are useful to notify a third party upon the completion and +status of a `TaskRun`. In combinations with the [Tekton triggers](https://github.com/tektoncd/triggers) +project they can be used to link `Task/PipelineRuns` asynchronously. + +To create a CloudEvent resource using the `PipelineResource` CRD: + +```yaml +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: event-to-sink +spec: + type: cloudevent + params: + - name: targetURI + value: http://sink:8080 +``` + +The content of an event is for example: + +```yaml +Context Attributes, + SpecVersion: 0.2 + Type: dev.tekton.event.task.successful + Source: /apis/tekton.dev/v1alpha1/namespaces/default/taskruns/pipeline-run-api-16aa55-source-to-image-task-rpndl + ID: pipeline-run-api-16aa55-source-to-image-task-rpndl + Time: 2019-07-04T11:03:53.058694712Z + ContentType: application/json +Transport Context, + URI: / + Host: my-sink.default.my-cluster.containers.appdomain.cloud + Method: POST +Data, + { + "taskRun": { + "metadata": {...} + "spec": { + "inputs": {...} + "outputs": {...} + "serviceAccount": "default", + "taskRef": { + "name": "source-to-image", + "kind": "Task" + }, + "timeout": "1h0m0s" + }, + "status": {...} + } + } +``` + Except as otherwise noted, the content of this page is licensed under the [Creative Commons Attribution 4.0 License](https://creativecommons.org/licenses/by/4.0/), and code samples are licensed under the diff --git a/examples/taskruns/taskrun-cloud-event.yaml b/examples/taskruns/taskrun-cloud-event.yaml new file mode 100644 index 00000000000..eb8c1d6d716 --- /dev/null +++ b/examples/taskruns/taskrun-cloud-event.yaml @@ -0,0 +1,111 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: sink + namespace: default +spec: + selector: + app: cloudevent + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + app: cloudevent + name: message-sink + namespace: default +spec: + containers: + - env: + - name: PORT + value: "8080" + name: cloudeventlistener + image: python:3-alpine + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: + - -ce + - | + cat <

POST!

") + + if __name__ == "__main__": + httpd = HTTPServer(('', $PORT), PostHandler) + print('Starting httpd...') + httpd.serve_forever() + EOF + ports: + - containerPort: 8080 + name: user-port + protocol: TCP +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: to-message-sink +spec: + type: cloudEvent + params: + - name: targetURI + value: http://sink.default:8080 +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: rules-branch +spec: + type: git + params: + - name: revision + value: master + - name: url + value: https://github.com/bazelbuild/rules_docker +--- +apiVersion: tekton.dev/v1alpha1 +kind: Task +metadata: + name: send-cloud-event-task +spec: + inputs: + resources: + - name: workspace + type: git + outputs: + resources: + - name: notification + type: cloudEvent + steps: + - name: list + image: ubuntu + command: ["/bin/bash"] + args: ['-c', 'ls -al /workspace'] +--- +apiVersion: tekton.dev/v1alpha1 +kind: TaskRun +metadata: + name: send-cloud-event +spec: + inputs: + resources: + - name: workspace + resourceRef: + name: rules-branch + outputs: + resources: + - name: notification + resourceRef: + name: to-message-sink + taskRef: + name: send-cloud-event-task diff --git a/pkg/reconciler/v1alpha1/taskrun/controller.go b/pkg/reconciler/v1alpha1/taskrun/controller.go index 2c6e2618507..1f77b050cbd 100644 --- a/pkg/reconciler/v1alpha1/taskrun/controller.go +++ b/pkg/reconciler/v1alpha1/taskrun/controller.go @@ -27,6 +27,7 @@ import ( taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "k8s.io/client-go/tools/cache" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -69,6 +70,7 @@ func NewController( clusterTaskLister: clusterTaskInformer.Lister(), resourceLister: resourceInformer.Lister(), timeoutHandler: timeoutHandler, + cloudEventClient: cloudeventclient.Get(ctx), } impl := controller.NewImpl(c, c.Logger, taskRunControllerName) diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index e43f09ee721..c7f6151f089 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/hashicorp/go-multierror" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" @@ -30,6 +31,7 @@ import ( "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/sidecars" "github.com/tektoncd/pipeline/pkg/status" "go.uber.org/zap" @@ -65,6 +67,7 @@ type Reconciler struct { taskLister listers.TaskLister clusterTaskLister listers.ClusterTaskLister resourceLister listers.PipelineResourceLister + cloudEventClient cloudevent.CEClient tracker tracker.Interface cache *entrypoint.Cache timeoutHandler *reconciler.TimeoutSet @@ -108,25 +111,42 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } if tr.IsDone() { + var merr *multierror.Error + // Try to send cloud events first + cloudEventErr := c.sendCloudEvents(tr) + // Regardless of `err`, we must write back any status update that may have + // been generated by `sendCloudEvents` + updateErr := c.updateStatusLabelsAndAnnotations(tr, original) + merr = multierror.Append(cloudEventErr, updateErr) + if cloudEventErr != nil { + // Let's keep timeouts and sidecars running as long as we're trying to + // send cloud events. So we stop here an return errors encountered this far. + return merr.ErrorOrNil() + } c.timeoutHandler.Release(tr) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = sidecars.Stop(pod, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update) } else if errors.IsNotFound(err) { - return nil + return merr.ErrorOrNil() } if err != nil { c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err) + merr = multierror.Append(merr, err) } - return err + return merr.ErrorOrNil() } - // Reconcile this copy of the task run and then write back any status // updates regardless of whether the reconciliation errored out. if err := c.reconcile(ctx, tr); err != nil { c.Logger.Errorf("Reconcile error: %v", err.Error()) return err } + return c.updateStatusLabelsAndAnnotations(tr, original) +} + +func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error { + var err error if equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's @@ -144,7 +164,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { return err } } - return err } @@ -253,6 +272,25 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error return nil } + // Initialize the cloud events if at least a CloudEventResource is defined + // and they have not been initialized yet. + c.Logger.Infof("Cloud Events: %s", tr.Status.CloudEvents) + // FIXME(afrittoli) If there are no events this is run every time + if len(tr.Status.CloudEvents) == 0 { + targets := make([]string, len(rtr.Outputs)) + idx := 0 + for _, output := range rtr.Outputs { + if output.Spec.Type == v1alpha1.PipelineResourceTypeCloudEvent { + cer, _ := v1alpha1.NewCloudEventResource(output) + targets[idx] = cer.TargetURI + idx++ + } + } + if idx > 0 { + tr.Status.InitializeCloudEvents(targets) + } + } + // Get the TaskRun's Pod if it should have one. Otherwise, create the Pod. pod, err := resources.TryGetPod(tr.Status, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get) if err != nil { @@ -293,12 +331,41 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error } reconciler.EmitEvent(c.Recorder, before, after, tr) - c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after) return nil } +func (c *Reconciler) sendCloudEvents(tr *v1alpha1.TaskRun) error { + // The TaskRun is complete. Time to send cloud events (if any). + var merr *multierror.Error + for idx, cloudEventDelivery := range tr.Status.CloudEvents { + eventStatus := &(tr.Status.CloudEvents[idx].Status) + // Skip events that have already been sent (successfully or unsuccessfully) + // Ensure we try to send all events once (possibly through different reconcile calls) + if eventStatus.Condition != v1alpha1.CloudEventConditionUnknown || eventStatus.RetryCount > 0 { + continue + } + _, err := cloudevent.SendTaskRunCloudEvent(cloudEventDelivery.Target, tr, c.Logger, c.cloudEventClient) + eventStatus.SentAt = &metav1.Time{Time: time.Now()} + eventStatus.RetryCount = eventStatus.RetryCount + 1 + if err != nil { + merr = multierror.Append(merr, err) + eventStatus.Condition = v1alpha1.CloudEventConditionFailed + eventStatus.Error = merr.Error() + } else { + c.Logger.Infof("Sent event for target %s", cloudEventDelivery.Target) + eventStatus.Condition = v1alpha1.CloudEventConditionSent + } + } + if merr != nil && merr.Len() > 0 { + c.Logger.Errorf("Failed to send %d cloud events for TaskRun %s", merr.Len(), tr.Name) + // Return all send error + return merr + } + return merr +} + func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) { var reason, msg string var succeededStatus corev1.ConditionStatus diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index eb2e607b6a3..71e1e7ed450 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -46,6 +46,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/status" "github.com/tektoncd/pipeline/pkg/system" "github.com/tektoncd/pipeline/test" @@ -73,6 +74,8 @@ var ( resourceQuantityCmp = cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 }) + cloudEventTarget1 = "https://foo" + cloudEventTarget2 = "https://bar" simpleStep = tb.Step("simple-step", "foo", tb.Command("/mycmd")) simpleTask = tb.Task("test-task", "foo", tb.TaskSpec(simpleStep)) @@ -134,6 +137,13 @@ var ( })), )) + twoOutputsTask = tb.Task("test-two-output-task", "foo", tb.TaskSpec( + simpleStep, tb.TaskOutputs( + tb.OutputsResource(cloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + tb.OutputsResource(anotherCloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + ), + )) + gitResource = tb.PipelineResource("git-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeGit, tb.PipelineResourceSpecParam("URL", "https://foo.git"), )) @@ -143,6 +153,12 @@ var ( imageResource = tb.PipelineResource("image-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeImage, tb.PipelineResourceSpecParam("URL", "gcr.io/kristoff/sven"), )) + cloudEventResource = tb.PipelineResource("cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget1), + )) + anotherCloudEventResource = tb.PipelineResource("another-cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget2), + )) toolsVolume = corev1.Volume{ Name: "tools", @@ -241,6 +257,10 @@ func getRunName(tr *v1alpha1.TaskRun) string { func getTaskRunController(t *testing.T, d test.Data) (test.TestAssets, func()) { ctx, _ := rtesting.SetupFakeContext(t) ctx, cancel := context.WithCancel(ctx) + cloudEventClientBehaviour := cloudevent.FakeClientBehaviour{ + SendSuccessfully: true, + } + ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour) entrypointCache, _ = entrypoint.NewCache() c, _ := test.SeedTestData(t, ctx, d) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) @@ -1616,3 +1636,195 @@ func TestHandlePodCreationError(t *testing.T) { }) } } + +func TestReconcileCloudEvens(t *testing.T) { + + taskRunWithNoCEResources := tb.TaskRun("test-taskrun-no-ce-resources", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + )) + taskRunWithTwoCEResourcesNoInit := tb.TaskRun("test-taskrun-two-ce-resources-no-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + ) + taskRunWithTwoCEResourcesInit := tb.TaskRun("test-taskrun-two-ce-resources-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESucceded := tb.TaskRun("test-taskrun-ce-succeeded", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCEFailed := tb.TaskRun("test-taskrun-ce-failed", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESuccededOneAttempt := tb.TaskRun("test-taskrun-ce-succeeded-one-attempt", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskruns := []*v1alpha1.TaskRun{ + taskRunWithNoCEResources, taskRunWithTwoCEResourcesNoInit, + taskRunWithTwoCEResourcesInit, taskRunWithCESucceded, taskRunWithCEFailed, + taskRunWithCESuccededOneAttempt, + } + + d := test.Data{ + TaskRuns: taskruns, + Tasks: []*v1alpha1.Task{simpleTask, twoOutputsTask}, + ClusterTasks: []*v1alpha1.ClusterTask{}, + PipelineResources: []*v1alpha1.PipelineResource{cloudEventResource, anotherCloudEventResource}, + } + for _, tc := range []struct { + name string + taskRun *v1alpha1.TaskRun + wantCloudEvents []v1alpha1.CloudEventDelivery + }{{ + name: "no-ce-resources", + taskRun: taskRunWithNoCEResources, + wantCloudEvents: taskRunWithNoCEResources.Status.CloudEvents, + }, { + name: "ce-resources-no-init", + taskRun: taskRunWithTwoCEResourcesNoInit, + wantCloudEvents: tb.TaskRun("want", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init", + taskRun: taskRunWithTwoCEResourcesInit, + wantCloudEvents: tb.TaskRun("want2", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful", + taskRun: taskRunWithCESucceded, + wantCloudEvents: tb.TaskRun("want3", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-failed", + taskRun: taskRunWithCEFailed, + wantCloudEvents: tb.TaskRun("want4", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful-one-attempt", + taskRun: taskRunWithCESuccededOneAttempt, + wantCloudEvents: tb.TaskRun("want5", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }} { + t.Run(tc.name, func(t *testing.T) { + names.TestingSeed() + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + saName := tc.taskRun.Spec.ServiceAccount + if saName == "" { + saName = "default" + } + if _, err := clients.Kube.CoreV1().ServiceAccounts(tc.taskRun.Namespace).Create(&corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: saName, + Namespace: tc.taskRun.Namespace, + }, + }); err != nil { + t.Fatal(err) + } + + if err := c.Reconciler.Reconcile(context.Background(), getRunName(tc.taskRun)); err != nil { + t.Errorf("expected no error. Got error %v", err) + } + namespace, name, err := cache.SplitMetaNamespaceKey(tc.taskRun.Name) + if err != nil { + t.Errorf("Invalid resource key: %v", err) + } + + tr, err := clients.Pipeline.TektonV1alpha1().TaskRuns(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated taskrun: %v", err) + } + + // Setup cmp options + cloudDeliveryStateCompare := func(x, y v1alpha1.CloudEventDeliveryState) bool { + return cmp.Equal(x.Condition, y.Condition) && cmp.Equal(x.Error, y.Error) && cmp.Equal(x.RetryCount, y.RetryCount) + } + less := func(x, y v1alpha1.CloudEventDelivery) bool { + return strings.Compare(x.Target, y.Target) < 0 || (strings.Compare(x.Target, y.Target) == 0 && x.Status.SentAt.Before(y.Status.SentAt)) + } + opts := []cmp.Option{ + cmpopts.SortSlices(less), + cmp.Comparer(func(x, y v1alpha1.CloudEventDelivery) bool { + return (strings.Compare(x.Target, y.Target) == 0) && cloudDeliveryStateCompare(x.Status, y.Status) + }), + } + t.Log(tr.Status.CloudEvents) + if diff := cmp.Diff(tc.wantCloudEvents, tr.Status.CloudEvents, opts...); diff != "" { + t.Errorf("Unexpected status of cloud events (-want +got) = %s", diff) + } + }) + } +} diff --git a/test/builder/task.go b/test/builder/task.go index 9bb7c6f531f..64dcf91ec22 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -315,6 +315,24 @@ func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { } } +// TaskRunCloudEvent adds an event to the TaskRunStatus. +func TaskRunCloudEvent(target, error string, retryCount int32, condition v1alpha1.CloudEventCondition) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + if len(s.CloudEvents) == 0 { + s.CloudEvents = make([]v1alpha1.CloudEventDelivery, 0) + } + cloudEvent := v1alpha1.CloudEventDelivery{ + Target: target, + Status: v1alpha1.CloudEventDeliveryState{ + Condition: condition, + RetryCount: retryCount, + Error: error, + }, + } + s.CloudEvents = append(s.CloudEvents, cloudEvent) + } +} + // TaskRunTimeout sets the timeout duration to the TaskRunSpec. func TaskRunTimeout(d time.Duration) TaskRunSpecOp { return func(spec *v1alpha1.TaskRunSpec) { @@ -397,6 +415,13 @@ func TaskRunAnnotation(key, value string) TaskRunOp { } } +// TaskRunSelfLink adds a SelfLink +func TaskRunSelfLink(selflink string) TaskRunOp { + return func(tr *v1alpha1.TaskRun) { + tr.ObjectMeta.SelfLink = selflink + } +} + // TaskRunSpec sets the specified spec of the TaskRun. // Any number of TaskRunSpec modifier can be passed to transform it. func TaskRunSpec(ops ...TaskRunSpecOp) TaskRunOp {