diff --git a/docs/resources.md b/docs/resources.md index 764874245cd..87c55fdcb58 100644 --- a/docs/resources.md +++ b/docs/resources.md @@ -52,6 +52,7 @@ The following `PipelineResources` are currently supported: - [Storage Resource](#storage-resource) - [GCS Storage Resource](#gcs-storage-resource) - [BuildGCS Storage Resource](#buildgcs-storage-resource) +- [Cloud Event Resource](#cloud-event-resource) ### Git Resource @@ -617,6 +618,68 @@ the container image [gcr.io/cloud-builders//gcs-fetcher](https://github.com/GoogleCloudPlatform/cloud-builders/tree/master/gcs-fetcher) does not support configuring secrets. +### Cloud Event Resource + +The Cloud Event Resource represents a [cloud event](https://github.com/cloudevents/spec) +that is sent to a target `URI` upon completion of a `TaskRun`. +The Cloud Event Resource sends Tekton specific events; the body of the event includes +the entire TaskRun spec plus status; the types of events defined for now are: + +- dev.tekton.event.task.unknown +- dev.tekton.event.task.successful +- dev.tekton.event.task.failed + +Cloud event resources are useful to notify a third party upon the completion and +status of a `TaskRun`. In combinations with the [Tekton triggers](https://github.com/tektoncd/triggers) +project they can be used to link `Task/PipelineRuns` asynchronously. + +To create a CloudEvent resource using the `PipelineResource` CRD: + +```yaml +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: event-to-sink +spec: + type: cloudevent + params: + - name: targetURI + value: http://sink:8080 +``` + +The content of an event is for example: + +```yaml +Context Attributes, + SpecVersion: 0.2 + Type: dev.tekton.event.task.successful + Source: /apis/tekton.dev/v1alpha1/namespaces/default/taskruns/pipeline-run-api-16aa55-source-to-image-task-rpndl + ID: pipeline-run-api-16aa55-source-to-image-task-rpndl + Time: 2019-07-04T11:03:53.058694712Z + ContentType: application/json +Transport Context, + URI: / + Host: my-sink.default.my-cluster.containers.appdomain.cloud + Method: POST +Data, + { + "taskRun": { + "metadata": {...} + "spec": { + "inputs": {...} + "outputs": {...} + "serviceAccount": "default", + "taskRef": { + "name": "source-to-image", + "kind": "Task" + }, + "timeout": "1h0m0s" + }, + "status": {...} + } + } +``` + Except as otherwise noted, the content of this page is licensed under the [Creative Commons Attribution 4.0 License](https://creativecommons.org/licenses/by/4.0/), and code samples are licensed under the diff --git a/examples/taskruns/taskrun-cloud-event.yaml b/examples/taskruns/taskrun-cloud-event.yaml new file mode 100644 index 00000000000..eb8c1d6d716 --- /dev/null +++ b/examples/taskruns/taskrun-cloud-event.yaml @@ -0,0 +1,111 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: sink + namespace: default +spec: + selector: + app: cloudevent + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + app: cloudevent + name: message-sink + namespace: default +spec: + containers: + - env: + - name: PORT + value: "8080" + name: cloudeventlistener + image: python:3-alpine + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: + - -ce + - | + cat <

POST!

") + + if __name__ == "__main__": + httpd = HTTPServer(('', $PORT), PostHandler) + print('Starting httpd...') + httpd.serve_forever() + EOF + ports: + - containerPort: 8080 + name: user-port + protocol: TCP +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: to-message-sink +spec: + type: cloudEvent + params: + - name: targetURI + value: http://sink.default:8080 +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: rules-branch +spec: + type: git + params: + - name: revision + value: master + - name: url + value: https://github.com/bazelbuild/rules_docker +--- +apiVersion: tekton.dev/v1alpha1 +kind: Task +metadata: + name: send-cloud-event-task +spec: + inputs: + resources: + - name: workspace + type: git + outputs: + resources: + - name: notification + type: cloudEvent + steps: + - name: list + image: ubuntu + command: ["/bin/bash"] + args: ['-c', 'ls -al /workspace'] +--- +apiVersion: tekton.dev/v1alpha1 +kind: TaskRun +metadata: + name: send-cloud-event +spec: + inputs: + resources: + - name: workspace + resourceRef: + name: rules-branch + outputs: + resources: + - name: notification + resourceRef: + name: to-message-sink + taskRef: + name: send-cloud-event-task diff --git a/pkg/reconciler/v1alpha1/taskrun/controller.go b/pkg/reconciler/v1alpha1/taskrun/controller.go index 2c6e2618507..1f77b050cbd 100644 --- a/pkg/reconciler/v1alpha1/taskrun/controller.go +++ b/pkg/reconciler/v1alpha1/taskrun/controller.go @@ -27,6 +27,7 @@ import ( taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "k8s.io/client-go/tools/cache" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -69,6 +70,7 @@ func NewController( clusterTaskLister: clusterTaskInformer.Lister(), resourceLister: resourceInformer.Lister(), timeoutHandler: timeoutHandler, + cloudEventClient: cloudeventclient.Get(ctx), } impl := controller.NewImpl(c, c.Logger, taskRunControllerName) diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index e43f09ee721..c7f6151f089 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/hashicorp/go-multierror" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" @@ -30,6 +31,7 @@ import ( "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/sidecars" "github.com/tektoncd/pipeline/pkg/status" "go.uber.org/zap" @@ -65,6 +67,7 @@ type Reconciler struct { taskLister listers.TaskLister clusterTaskLister listers.ClusterTaskLister resourceLister listers.PipelineResourceLister + cloudEventClient cloudevent.CEClient tracker tracker.Interface cache *entrypoint.Cache timeoutHandler *reconciler.TimeoutSet @@ -108,25 +111,42 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } if tr.IsDone() { + var merr *multierror.Error + // Try to send cloud events first + cloudEventErr := c.sendCloudEvents(tr) + // Regardless of `err`, we must write back any status update that may have + // been generated by `sendCloudEvents` + updateErr := c.updateStatusLabelsAndAnnotations(tr, original) + merr = multierror.Append(cloudEventErr, updateErr) + if cloudEventErr != nil { + // Let's keep timeouts and sidecars running as long as we're trying to + // send cloud events. So we stop here an return errors encountered this far. + return merr.ErrorOrNil() + } c.timeoutHandler.Release(tr) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = sidecars.Stop(pod, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update) } else if errors.IsNotFound(err) { - return nil + return merr.ErrorOrNil() } if err != nil { c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err) + merr = multierror.Append(merr, err) } - return err + return merr.ErrorOrNil() } - // Reconcile this copy of the task run and then write back any status // updates regardless of whether the reconciliation errored out. if err := c.reconcile(ctx, tr); err != nil { c.Logger.Errorf("Reconcile error: %v", err.Error()) return err } + return c.updateStatusLabelsAndAnnotations(tr, original) +} + +func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error { + var err error if equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's @@ -144,7 +164,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { return err } } - return err } @@ -253,6 +272,25 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error return nil } + // Initialize the cloud events if at least a CloudEventResource is defined + // and they have not been initialized yet. + c.Logger.Infof("Cloud Events: %s", tr.Status.CloudEvents) + // FIXME(afrittoli) If there are no events this is run every time + if len(tr.Status.CloudEvents) == 0 { + targets := make([]string, len(rtr.Outputs)) + idx := 0 + for _, output := range rtr.Outputs { + if output.Spec.Type == v1alpha1.PipelineResourceTypeCloudEvent { + cer, _ := v1alpha1.NewCloudEventResource(output) + targets[idx] = cer.TargetURI + idx++ + } + } + if idx > 0 { + tr.Status.InitializeCloudEvents(targets) + } + } + // Get the TaskRun's Pod if it should have one. Otherwise, create the Pod. pod, err := resources.TryGetPod(tr.Status, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get) if err != nil { @@ -293,12 +331,41 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error } reconciler.EmitEvent(c.Recorder, before, after, tr) - c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after) return nil } +func (c *Reconciler) sendCloudEvents(tr *v1alpha1.TaskRun) error { + // The TaskRun is complete. Time to send cloud events (if any). + var merr *multierror.Error + for idx, cloudEventDelivery := range tr.Status.CloudEvents { + eventStatus := &(tr.Status.CloudEvents[idx].Status) + // Skip events that have already been sent (successfully or unsuccessfully) + // Ensure we try to send all events once (possibly through different reconcile calls) + if eventStatus.Condition != v1alpha1.CloudEventConditionUnknown || eventStatus.RetryCount > 0 { + continue + } + _, err := cloudevent.SendTaskRunCloudEvent(cloudEventDelivery.Target, tr, c.Logger, c.cloudEventClient) + eventStatus.SentAt = &metav1.Time{Time: time.Now()} + eventStatus.RetryCount = eventStatus.RetryCount + 1 + if err != nil { + merr = multierror.Append(merr, err) + eventStatus.Condition = v1alpha1.CloudEventConditionFailed + eventStatus.Error = merr.Error() + } else { + c.Logger.Infof("Sent event for target %s", cloudEventDelivery.Target) + eventStatus.Condition = v1alpha1.CloudEventConditionSent + } + } + if merr != nil && merr.Len() > 0 { + c.Logger.Errorf("Failed to send %d cloud events for TaskRun %s", merr.Len(), tr.Name) + // Return all send error + return merr + } + return merr +} + func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) { var reason, msg string var succeededStatus corev1.ConditionStatus diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index eb2e607b6a3..71e1e7ed450 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -46,6 +46,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/status" "github.com/tektoncd/pipeline/pkg/system" "github.com/tektoncd/pipeline/test" @@ -73,6 +74,8 @@ var ( resourceQuantityCmp = cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 }) + cloudEventTarget1 = "https://foo" + cloudEventTarget2 = "https://bar" simpleStep = tb.Step("simple-step", "foo", tb.Command("/mycmd")) simpleTask = tb.Task("test-task", "foo", tb.TaskSpec(simpleStep)) @@ -134,6 +137,13 @@ var ( })), )) + twoOutputsTask = tb.Task("test-two-output-task", "foo", tb.TaskSpec( + simpleStep, tb.TaskOutputs( + tb.OutputsResource(cloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + tb.OutputsResource(anotherCloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + ), + )) + gitResource = tb.PipelineResource("git-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeGit, tb.PipelineResourceSpecParam("URL", "https://foo.git"), )) @@ -143,6 +153,12 @@ var ( imageResource = tb.PipelineResource("image-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeImage, tb.PipelineResourceSpecParam("URL", "gcr.io/kristoff/sven"), )) + cloudEventResource = tb.PipelineResource("cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget1), + )) + anotherCloudEventResource = tb.PipelineResource("another-cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget2), + )) toolsVolume = corev1.Volume{ Name: "tools", @@ -241,6 +257,10 @@ func getRunName(tr *v1alpha1.TaskRun) string { func getTaskRunController(t *testing.T, d test.Data) (test.TestAssets, func()) { ctx, _ := rtesting.SetupFakeContext(t) ctx, cancel := context.WithCancel(ctx) + cloudEventClientBehaviour := cloudevent.FakeClientBehaviour{ + SendSuccessfully: true, + } + ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour) entrypointCache, _ = entrypoint.NewCache() c, _ := test.SeedTestData(t, ctx, d) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) @@ -1616,3 +1636,195 @@ func TestHandlePodCreationError(t *testing.T) { }) } } + +func TestReconcileCloudEvens(t *testing.T) { + + taskRunWithNoCEResources := tb.TaskRun("test-taskrun-no-ce-resources", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + )) + taskRunWithTwoCEResourcesNoInit := tb.TaskRun("test-taskrun-two-ce-resources-no-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + ) + taskRunWithTwoCEResourcesInit := tb.TaskRun("test-taskrun-two-ce-resources-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESucceded := tb.TaskRun("test-taskrun-ce-succeeded", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCEFailed := tb.TaskRun("test-taskrun-ce-failed", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESuccededOneAttempt := tb.TaskRun("test-taskrun-ce-succeeded-one-attempt", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskruns := []*v1alpha1.TaskRun{ + taskRunWithNoCEResources, taskRunWithTwoCEResourcesNoInit, + taskRunWithTwoCEResourcesInit, taskRunWithCESucceded, taskRunWithCEFailed, + taskRunWithCESuccededOneAttempt, + } + + d := test.Data{ + TaskRuns: taskruns, + Tasks: []*v1alpha1.Task{simpleTask, twoOutputsTask}, + ClusterTasks: []*v1alpha1.ClusterTask{}, + PipelineResources: []*v1alpha1.PipelineResource{cloudEventResource, anotherCloudEventResource}, + } + for _, tc := range []struct { + name string + taskRun *v1alpha1.TaskRun + wantCloudEvents []v1alpha1.CloudEventDelivery + }{{ + name: "no-ce-resources", + taskRun: taskRunWithNoCEResources, + wantCloudEvents: taskRunWithNoCEResources.Status.CloudEvents, + }, { + name: "ce-resources-no-init", + taskRun: taskRunWithTwoCEResourcesNoInit, + wantCloudEvents: tb.TaskRun("want", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init", + taskRun: taskRunWithTwoCEResourcesInit, + wantCloudEvents: tb.TaskRun("want2", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful", + taskRun: taskRunWithCESucceded, + wantCloudEvents: tb.TaskRun("want3", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-failed", + taskRun: taskRunWithCEFailed, + wantCloudEvents: tb.TaskRun("want4", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful-one-attempt", + taskRun: taskRunWithCESuccededOneAttempt, + wantCloudEvents: tb.TaskRun("want5", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }} { + t.Run(tc.name, func(t *testing.T) { + names.TestingSeed() + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + saName := tc.taskRun.Spec.ServiceAccount + if saName == "" { + saName = "default" + } + if _, err := clients.Kube.CoreV1().ServiceAccounts(tc.taskRun.Namespace).Create(&corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: saName, + Namespace: tc.taskRun.Namespace, + }, + }); err != nil { + t.Fatal(err) + } + + if err := c.Reconciler.Reconcile(context.Background(), getRunName(tc.taskRun)); err != nil { + t.Errorf("expected no error. Got error %v", err) + } + namespace, name, err := cache.SplitMetaNamespaceKey(tc.taskRun.Name) + if err != nil { + t.Errorf("Invalid resource key: %v", err) + } + + tr, err := clients.Pipeline.TektonV1alpha1().TaskRuns(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated taskrun: %v", err) + } + + // Setup cmp options + cloudDeliveryStateCompare := func(x, y v1alpha1.CloudEventDeliveryState) bool { + return cmp.Equal(x.Condition, y.Condition) && cmp.Equal(x.Error, y.Error) && cmp.Equal(x.RetryCount, y.RetryCount) + } + less := func(x, y v1alpha1.CloudEventDelivery) bool { + return strings.Compare(x.Target, y.Target) < 0 || (strings.Compare(x.Target, y.Target) == 0 && x.Status.SentAt.Before(y.Status.SentAt)) + } + opts := []cmp.Option{ + cmpopts.SortSlices(less), + cmp.Comparer(func(x, y v1alpha1.CloudEventDelivery) bool { + return (strings.Compare(x.Target, y.Target) == 0) && cloudDeliveryStateCompare(x.Status, y.Status) + }), + } + t.Log(tr.Status.CloudEvents) + if diff := cmp.Diff(tc.wantCloudEvents, tr.Status.CloudEvents, opts...); diff != "" { + t.Errorf("Unexpected status of cloud events (-want +got) = %s", diff) + } + }) + } +} diff --git a/test/builder/task.go b/test/builder/task.go index 9bb7c6f531f..64dcf91ec22 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -315,6 +315,24 @@ func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { } } +// TaskRunCloudEvent adds an event to the TaskRunStatus. +func TaskRunCloudEvent(target, error string, retryCount int32, condition v1alpha1.CloudEventCondition) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + if len(s.CloudEvents) == 0 { + s.CloudEvents = make([]v1alpha1.CloudEventDelivery, 0) + } + cloudEvent := v1alpha1.CloudEventDelivery{ + Target: target, + Status: v1alpha1.CloudEventDeliveryState{ + Condition: condition, + RetryCount: retryCount, + Error: error, + }, + } + s.CloudEvents = append(s.CloudEvents, cloudEvent) + } +} + // TaskRunTimeout sets the timeout duration to the TaskRunSpec. func TaskRunTimeout(d time.Duration) TaskRunSpecOp { return func(spec *v1alpha1.TaskRunSpec) { @@ -397,6 +415,13 @@ func TaskRunAnnotation(key, value string) TaskRunOp { } } +// TaskRunSelfLink adds a SelfLink +func TaskRunSelfLink(selflink string) TaskRunOp { + return func(tr *v1alpha1.TaskRun) { + tr.ObjectMeta.SelfLink = selflink + } +} + // TaskRunSpec sets the specified spec of the TaskRun. // Any number of TaskRunSpec modifier can be passed to transform it. func TaskRunSpec(ops ...TaskRunSpecOp) TaskRunOp {