From 69c1cfdedf44fcafe59a1fd403b9436ece8557f2 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Wed, 17 Jul 2019 03:29:14 +0900 Subject: [PATCH] Cloud Event pipeline resource Implements in the TaskRun controller the logic to provide the cloud event pipeline resource. This commits puts together the API, cloud event helper and resource definition from four pull requests: - https://github.com/tektoncd/pipeline/pull/1090 - https://github.com/tektoncd/pipeline/pull/1091 - https://github.com/tektoncd/pipeline/pull/1092 It adds unit tests for the new code and one E2E YAML test. The YAML test runs a simple http server that can receive the cloudevent for test purposes. The list of cloud events to be sent is added to the TaskRun status and processed by the TaskRun controller once the pod associated to the TaskRun completes its execution. The `isDone` definition of the TaskRun is not altered, the reconciler checks for events to be sent once the TaskRun.isDone is true. Retries are not implemented yet in the sense that every scheduled event will be attempted exactly once, but it may be that those attempts happen across different invocations of Reconcile. Signed-off-by: Andrea Frittoli --- docs/resources.md | 63 ++++++ examples/taskruns/taskrun-cloud-event.yaml | 186 ++++++++++++++++ pkg/apis/pipeline/v1alpha1/taskrun_types.go | 20 -- .../pipeline/v1alpha1/taskrun_types_test.go | 50 ----- pkg/reconciler/v1alpha1/taskrun/controller.go | 2 + .../cloudevent/cloud_event_controller.go | 97 +++++++++ .../cloudevent/cloud_event_controller_test.go | 129 ++++++++++++ .../resources/cloudevent/cloudevent.go | 21 ++ pkg/reconciler/v1alpha1/taskrun/taskrun.go | 41 +++- .../v1alpha1/taskrun/taskrun_test.go | 199 ++++++++++++++++++ test/builder/task.go | 25 +++ 11 files changed, 758 insertions(+), 75 deletions(-) create mode 100644 examples/taskruns/taskrun-cloud-event.yaml create mode 100644 pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go create mode 100644 pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go diff --git a/docs/resources.md b/docs/resources.md index 764874245cd..87c55fdcb58 100644 --- a/docs/resources.md +++ b/docs/resources.md @@ -52,6 +52,7 @@ The following `PipelineResources` are currently supported: - [Storage Resource](#storage-resource) - [GCS Storage Resource](#gcs-storage-resource) - [BuildGCS Storage Resource](#buildgcs-storage-resource) +- [Cloud Event Resource](#cloud-event-resource) ### Git Resource @@ -617,6 +618,68 @@ the container image [gcr.io/cloud-builders//gcs-fetcher](https://github.com/GoogleCloudPlatform/cloud-builders/tree/master/gcs-fetcher) does not support configuring secrets. +### Cloud Event Resource + +The Cloud Event Resource represents a [cloud event](https://github.com/cloudevents/spec) +that is sent to a target `URI` upon completion of a `TaskRun`. +The Cloud Event Resource sends Tekton specific events; the body of the event includes +the entire TaskRun spec plus status; the types of events defined for now are: + +- dev.tekton.event.task.unknown +- dev.tekton.event.task.successful +- dev.tekton.event.task.failed + +Cloud event resources are useful to notify a third party upon the completion and +status of a `TaskRun`. In combinations with the [Tekton triggers](https://github.com/tektoncd/triggers) +project they can be used to link `Task/PipelineRuns` asynchronously. + +To create a CloudEvent resource using the `PipelineResource` CRD: + +```yaml +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: event-to-sink +spec: + type: cloudevent + params: + - name: targetURI + value: http://sink:8080 +``` + +The content of an event is for example: + +```yaml +Context Attributes, + SpecVersion: 0.2 + Type: dev.tekton.event.task.successful + Source: /apis/tekton.dev/v1alpha1/namespaces/default/taskruns/pipeline-run-api-16aa55-source-to-image-task-rpndl + ID: pipeline-run-api-16aa55-source-to-image-task-rpndl + Time: 2019-07-04T11:03:53.058694712Z + ContentType: application/json +Transport Context, + URI: / + Host: my-sink.default.my-cluster.containers.appdomain.cloud + Method: POST +Data, + { + "taskRun": { + "metadata": {...} + "spec": { + "inputs": {...} + "outputs": {...} + "serviceAccount": "default", + "taskRef": { + "name": "source-to-image", + "kind": "Task" + }, + "timeout": "1h0m0s" + }, + "status": {...} + } + } +``` + Except as otherwise noted, the content of this page is licensed under the [Creative Commons Attribution 4.0 License](https://creativecommons.org/licenses/by/4.0/), and code samples are licensed under the diff --git a/examples/taskruns/taskrun-cloud-event.yaml b/examples/taskruns/taskrun-cloud-event.yaml new file mode 100644 index 00000000000..2e4a3b92a0f --- /dev/null +++ b/examples/taskruns/taskrun-cloud-event.yaml @@ -0,0 +1,186 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: sink + namespace: default +spec: + selector: + app: cloudevent + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + app: cloudevent + name: message-sink + namespace: default +spec: + containers: + - env: + - name: PORT + value: "8080" + name: cloudeventlistener + image: python:3-alpine + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: + - -ce + - | + cat <

POST!

') + + def do_GET(self): + with open("content.txt", mode="rb") as f: + content = f.read() + self.send_response(200 if content else 404) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(content) + + if __name__ == "__main__": + open("content.txt", 'a').close() + httpd = HTTPServer(('', $PORT), GetAndPostHandler) + print('Starting httpd...') + httpd.serve_forever() + EOF + ports: + - containerPort: 8080 + name: user-port + protocol: TCP +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: to-message-sink +spec: + type: cloudEvent + params: + - name: targetURI + value: http://sink.default:8080 +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: fake-image +spec: + type: image + params: + - name: url + value: fake-registry/test/fake-image +--- +apiVersion: tekton.dev/v1alpha1 +kind: Task +metadata: + name: send-cloud-event-task +spec: + outputs: + resources: + - name: myimage + type: image + - name: notification + type: cloudEvent + steps: + - name: build-index-json + image: busybox + command: + - /bin/sh + args: + - -ce + - | + set -e + mkdir -p /builder/home/image-outputs/myimage/ + cat < /builder/home/image-outputs/myimage/index.json + { + "schemaVersion": 2, + "manifests": [ + { + "mediaType": "application/vnd.oci.image.index.v1+json", + "size": 314, + "digest": "sha256:23308992a034634b391107fd4f5e03958a07cc4c28168a228ad8a9f8942473ca" + } + ] + } + EOF +--- +apiVersion: tekton.dev/v1alpha1 +kind: Task +metadata: + name: poll-for-content-task +spec: + steps: + - name: polling + image: python:3-alpine + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: + - -ce + - | + cat < 0 { - initialState := CloudEventDeliveryState{ - Condition: CloudEventConditionUnknown, - RetryCount: 0, - } - events := make([]CloudEventDelivery, len(targets)) - for idx, target := range targets { - events[idx] = CloudEventDelivery{ - Target: target, - Status: initialState, - } - } - tr.CloudEvents = events - } -} - // StepState reports the results of running a step in the Task. type StepState struct { corev1.ContainerState diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go index aaf3fc69e04..0b0e46ea942 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go @@ -151,53 +151,3 @@ func TestTaskRunHasStarted(t *testing.T) { }) } } - -func TestInitializeCloudEvents(t *testing.T) { - tests := []struct { - name string - targets []string - wantCloudEvents []v1alpha1.CloudEventDelivery - }{{ - name: "testWithNilTarget", - targets: nil, - wantCloudEvents: nil, - }, { - name: "testWithEmptyListTarget", - targets: make([]string, 0), - wantCloudEvents: nil, - }, { - name: "testWithTwoTargets", - targets: []string{"target1", "target2"}, - wantCloudEvents: []v1alpha1.CloudEventDelivery{ - { - Target: "target1", - Status: v1alpha1.CloudEventDeliveryState{ - Condition: v1alpha1.CloudEventConditionUnknown, - SentAt: nil, - Error: "", - RetryCount: 0, - }, - }, - { - Target: "target2", - Status: v1alpha1.CloudEventDeliveryState{ - Condition: v1alpha1.CloudEventConditionUnknown, - SentAt: nil, - Error: "", - RetryCount: 0, - }, - }, - }, - }} - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - tr := tb.TaskRun("taskrunname", "testns", tb.TaskRunStatus()) - trs := tr.Status - trs.InitializeCloudEvents(tc.targets) - gotCloudEvents := trs.CloudEvents - if diff := cmp.Diff(tc.wantCloudEvents, gotCloudEvents); diff != "" { - t.Errorf("Wrong Cloud Events (-want +got) = %s", diff) - } - }) - } -} diff --git a/pkg/reconciler/v1alpha1/taskrun/controller.go b/pkg/reconciler/v1alpha1/taskrun/controller.go index 2c6e2618507..1f77b050cbd 100644 --- a/pkg/reconciler/v1alpha1/taskrun/controller.go +++ b/pkg/reconciler/v1alpha1/taskrun/controller.go @@ -27,6 +27,7 @@ import ( taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "k8s.io/client-go/tools/cache" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -69,6 +70,7 @@ func NewController( clusterTaskLister: clusterTaskInformer.Lister(), resourceLister: resourceInformer.Lister(), timeoutHandler: timeoutHandler, + cloudEventClient: cloudeventclient.Get(ctx), } impl := controller.NewImpl(c, c.Logger, taskRunControllerName) diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go new file mode 100644 index 00000000000..48d07eea7e8 --- /dev/null +++ b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go @@ -0,0 +1,97 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevent + +import ( + "time" + + "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// InitializeCloudEvents initializes the CloudEvents part of the +// TaskRunStatus from the ResolvedTaskResources +func InitializeCloudEvents(tr *v1alpha1.TaskRun, prs []*v1alpha1.PipelineResource) { + // FIXME(afrittoli) If there are no events this is run every time + if len(tr.Status.CloudEvents) == 0 { + var targets []string + for _, output := range prs { + if output.Spec.Type == v1alpha1.PipelineResourceTypeCloudEvent { + cer, _ := v1alpha1.NewCloudEventResource(output) + targets = append(targets, cer.TargetURI) + } + } + if len(targets) > 0 { + tr.Status.CloudEvents = cloudEventDeliveryFromTargets(targets) + } + } +} + +func cloudEventDeliveryFromTargets(targets []string) []v1alpha1.CloudEventDelivery { + // len(nil slice) is 0 + if len(targets) > 0 { + initialState := v1alpha1.CloudEventDeliveryState{ + Condition: v1alpha1.CloudEventConditionUnknown, + RetryCount: 0, + } + events := make([]v1alpha1.CloudEventDelivery, len(targets)) + for idx, target := range targets { + events[idx] = v1alpha1.CloudEventDelivery{ + Target: target, + Status: initialState, + } + } + return events + } + return nil +} + +// SendCloudEvents is used by the TaskRun controller to send cloud events once +// the TaskRun is complete. `tr` is used to obtain the list of targets but also +// to construct the body of the +func SendCloudEvents(tr *v1alpha1.TaskRun, ceclient CEClient, logger *zap.SugaredLogger) error { + // Using multierror here so we can attempt to send all cloud events defined, + // regardless of whether they fail or not, and report all failed ones + var merr *multierror.Error + for idx, cloudEventDelivery := range tr.Status.CloudEvents { + eventStatus := &(tr.Status.CloudEvents[idx].Status) + // Skip events that have already been sent (successfully or unsuccessfully) + // Ensure we try to send all events once (possibly through different reconcile calls) + if eventStatus.Condition != v1alpha1.CloudEventConditionUnknown || eventStatus.RetryCount > 0 { + continue + } + _, err := SendTaskRunCloudEvent(cloudEventDelivery.Target, tr, logger, ceclient) + eventStatus.SentAt = &metav1.Time{Time: time.Now()} + eventStatus.RetryCount = eventStatus.RetryCount + 1 + if err != nil { + merr = multierror.Append(merr, err) + eventStatus.Condition = v1alpha1.CloudEventConditionFailed + eventStatus.Error = merr.Error() + } else { + logger.Infof("Sent event for target %s", cloudEventDelivery.Target) + eventStatus.Condition = v1alpha1.CloudEventConditionSent + } + } + if merr != nil && merr.Len() > 0 { + logger.Errorf("Failed to send %d cloud events for TaskRun %s", merr.Len(), tr.Name) + // Return all send error + return merr + } + return merr +} diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go new file mode 100644 index 00000000000..5aa0f702430 --- /dev/null +++ b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevent + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/tektoncd/pipeline/pkg/logging" + tb "github.com/tektoncd/pipeline/test/builder" +) + +func TestInitializeCloudEvents(t *testing.T) { + tests := []struct { + name string + targets []string + wantCloudEvents []v1alpha1.CloudEventDelivery + }{{ + name: "testWithNilTarget", + targets: nil, + wantCloudEvents: nil, + }, { + name: "testWithEmptyListTarget", + targets: make([]string, 0), + wantCloudEvents: nil, + }, { + name: "testWithTwoTargets", + targets: []string{"target1", "target2"}, + wantCloudEvents: []v1alpha1.CloudEventDelivery{ + { + Target: "target1", + Status: v1alpha1.CloudEventDeliveryState{ + Condition: v1alpha1.CloudEventConditionUnknown, + SentAt: nil, + Error: "", + RetryCount: 0, + }, + }, + { + Target: "target2", + Status: v1alpha1.CloudEventDeliveryState{ + Condition: v1alpha1.CloudEventConditionUnknown, + SentAt: nil, + Error: "", + RetryCount: 0, + }, + }, + }, + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + gotCloudEvents := cloudEventDeliveryFromTargets(tc.targets) + if diff := cmp.Diff(tc.wantCloudEvents, gotCloudEvents); diff != "" { + t.Errorf("Wrong Cloud Events (-want +got) = %s", diff) + } + }) + } +} + +func TestSendCloudEvents(t *testing.T) { + tests := []struct { + name string + taskRun *v1alpha1.TaskRun + wantTaskRun *v1alpha1.TaskRun + }{{ + name: "testWithMultipleMixedCloudEvents", + taskRun: tb.TaskRun("test-taskrun-multiple-cloudeventdelivery", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent("http//notattemptedunknown", "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//notattemptedfailed", "somehow", 0, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//notattemptedsucceeded", "", 0, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent("http//attemptedunknown", "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//attemptedfailed", "iknewit", 1, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//attemptedsucceeded", "", 1, v1alpha1.CloudEventConditionSent), + ), + ), + wantTaskRun: tb.TaskRun("test-taskrun-multiple-cloudeventdelivery", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent("http//notattemptedunknown", "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent("http//notattemptedfailed", "somehow", 0, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//notattemptedsucceeded", "", 0, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent("http//attemptedunknown", "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//attemptedfailed", "iknewit", 1, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//attemptedsucceeded", "", 1, v1alpha1.CloudEventConditionSent), + ), + ), + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger, _ := logging.NewLogger("", "") + successfulBehaviour := FakeClientBehaviour{ + SendSuccessfully: true, + } + err := SendCloudEvents(tc.taskRun, NewFakeClient(&successfulBehaviour), logger) + if err == nil { + t.Fatalf("Unexpected error sending cloud events: %v", err) + } + opts := GetCloudEventDeliveryCompareOptions() + if diff := cmp.Diff(tc.wantTaskRun.Status, tc.taskRun.Status, opts...); diff != "" { + t.Errorf("Wrong Cloud Events Status (-want +got) = %s", diff) + } + }) + } +} + +// TBD: TestSendCloudEvents error tests +// TBD: Test InitializeCloudEvents diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go index ba742ad2811..0eeec6e5ae7 100644 --- a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go +++ b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go @@ -21,12 +21,15 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/client" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" "knative.dev/eventing-contrib/pkg/kncloudevents" "knative.dev/pkg/apis" @@ -125,3 +128,21 @@ func SendTaskRunCloudEvent(sinkURI string, taskRun *v1alpha1.TaskRun, logger *za event, err = SendCloudEvent(sinkURI, eventID, eventSourceURI, data, eventType, logger, cloudEventClient) return event, err } + +// GetCloudEventDeliveryCompareOptions returns compare options to sort +// and compare a list of CloudEventDelivery +func GetCloudEventDeliveryCompareOptions() []cmp.Option{ + // Setup cmp options + cloudDeliveryStateCompare := func(x, y v1alpha1.CloudEventDeliveryState) bool { + return cmp.Equal(x.Condition, y.Condition) && cmp.Equal(x.Error, y.Error) && cmp.Equal(x.RetryCount, y.RetryCount) + } + less := func(x, y v1alpha1.CloudEventDelivery) bool { + return strings.Compare(x.Target, y.Target) < 0 || (strings.Compare(x.Target, y.Target) == 0 && x.Status.SentAt.Before(y.Status.SentAt)) + } + return []cmp.Option{ + cmpopts.SortSlices(less), + cmp.Comparer(func(x, y v1alpha1.CloudEventDelivery) bool { + return (strings.Compare(x.Target, y.Target) == 0) && cloudDeliveryStateCompare(x.Status, y.Status) + }), + } +} diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index e43f09ee721..44301f84e59 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/hashicorp/go-multierror" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" @@ -30,6 +31,7 @@ import ( "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/sidecars" "github.com/tektoncd/pipeline/pkg/status" "go.uber.org/zap" @@ -65,6 +67,7 @@ type Reconciler struct { taskLister listers.TaskLister clusterTaskLister listers.ClusterTaskLister resourceLister listers.PipelineResourceLister + cloudEventClient cloudevent.CEClient tracker tracker.Interface cache *entrypoint.Cache timeoutHandler *reconciler.TimeoutSet @@ -108,25 +111,42 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } if tr.IsDone() { + var merr *multierror.Error + // Try to send cloud events first + cloudEventErr := cloudevent.SendCloudEvents(tr, c.cloudEventClient, c.Logger) + // Regardless of `err`, we must write back any status update that may have + // been generated by `sendCloudEvents` + updateErr := c.updateStatusLabelsAndAnnotations(tr, original) + merr = multierror.Append(cloudEventErr, updateErr) + if cloudEventErr != nil { + // Let's keep timeouts and sidecars running as long as we're trying to + // send cloud events. So we stop here an return errors encountered this far. + return merr.ErrorOrNil() + } c.timeoutHandler.Release(tr) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = sidecars.Stop(pod, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update) } else if errors.IsNotFound(err) { - return nil + return merr.ErrorOrNil() } if err != nil { c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err) + merr = multierror.Append(merr, err) } - return err + return merr.ErrorOrNil() } - // Reconcile this copy of the task run and then write back any status // updates regardless of whether the reconciliation errored out. if err := c.reconcile(ctx, tr); err != nil { c.Logger.Errorf("Reconcile error: %v", err.Error()) return err } + return c.updateStatusLabelsAndAnnotations(tr, original) +} + +func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error { + var err error if equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's @@ -144,7 +164,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { return err } } - return err } @@ -253,6 +272,17 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error return nil } + // Initialize the cloud events if at least a CloudEventResource is defined + // and they have not been initialized yet. + // FIXME(afrittoli) This resource specific logic will have to be replaced + // once we have a custom PipelineResource framework in place. + c.Logger.Infof("Cloud Events: %s", tr.Status.CloudEvents) + prs := make([]*v1alpha1.PipelineResource, 0, len(rtr.Outputs)) + for _, pr := range rtr.Outputs { + prs = append(prs, pr) + } + cloudevent.InitializeCloudEvents(tr, prs) + // Get the TaskRun's Pod if it should have one. Otherwise, create the Pod. pod, err := resources.TryGetPod(tr.Status, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get) if err != nil { @@ -293,12 +323,13 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error } reconciler.EmitEvent(c.Recorder, before, after, tr) - c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after) return nil } + + func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) { var reason, msg string var succeededStatus corev1.ConditionStatus diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index eb2e607b6a3..229e8c4e04a 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -46,6 +46,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/status" "github.com/tektoncd/pipeline/pkg/system" "github.com/tektoncd/pipeline/test" @@ -73,6 +74,8 @@ var ( resourceQuantityCmp = cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 }) + cloudEventTarget1 = "https://foo" + cloudEventTarget2 = "https://bar" simpleStep = tb.Step("simple-step", "foo", tb.Command("/mycmd")) simpleTask = tb.Task("test-task", "foo", tb.TaskSpec(simpleStep)) @@ -134,6 +137,13 @@ var ( })), )) + twoOutputsTask = tb.Task("test-two-output-task", "foo", tb.TaskSpec( + simpleStep, tb.TaskOutputs( + tb.OutputsResource(cloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + tb.OutputsResource(anotherCloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + ), + )) + gitResource = tb.PipelineResource("git-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeGit, tb.PipelineResourceSpecParam("URL", "https://foo.git"), )) @@ -143,6 +153,12 @@ var ( imageResource = tb.PipelineResource("image-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeImage, tb.PipelineResourceSpecParam("URL", "gcr.io/kristoff/sven"), )) + cloudEventResource = tb.PipelineResource("cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget1), + )) + anotherCloudEventResource = tb.PipelineResource("another-cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget2), + )) toolsVolume = corev1.Volume{ Name: "tools", @@ -241,6 +257,10 @@ func getRunName(tr *v1alpha1.TaskRun) string { func getTaskRunController(t *testing.T, d test.Data) (test.TestAssets, func()) { ctx, _ := rtesting.SetupFakeContext(t) ctx, cancel := context.WithCancel(ctx) + cloudEventClientBehaviour := cloudevent.FakeClientBehaviour{ + SendSuccessfully: true, + } + ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour) entrypointCache, _ = entrypoint.NewCache() c, _ := test.SeedTestData(t, ctx, d) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) @@ -1616,3 +1636,182 @@ func TestHandlePodCreationError(t *testing.T) { }) } } + +func TestReconcileCloudEvents(t *testing.T) { + + taskRunWithNoCEResources := tb.TaskRun("test-taskrun-no-ce-resources", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + )) + taskRunWithTwoCEResourcesNoInit := tb.TaskRun("test-taskrun-two-ce-resources-no-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + ) + taskRunWithTwoCEResourcesInit := tb.TaskRun("test-taskrun-two-ce-resources-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESucceded := tb.TaskRun("test-taskrun-ce-succeeded", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCEFailed := tb.TaskRun("test-taskrun-ce-failed", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESuccededOneAttempt := tb.TaskRun("test-taskrun-ce-succeeded-one-attempt", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskruns := []*v1alpha1.TaskRun{ + taskRunWithNoCEResources, taskRunWithTwoCEResourcesNoInit, + taskRunWithTwoCEResourcesInit, taskRunWithCESucceded, taskRunWithCEFailed, + taskRunWithCESuccededOneAttempt, + } + + d := test.Data{ + TaskRuns: taskruns, + Tasks: []*v1alpha1.Task{simpleTask, twoOutputsTask}, + ClusterTasks: []*v1alpha1.ClusterTask{}, + PipelineResources: []*v1alpha1.PipelineResource{cloudEventResource, anotherCloudEventResource}, + } + for _, tc := range []struct { + name string + taskRun *v1alpha1.TaskRun + wantCloudEvents []v1alpha1.CloudEventDelivery + }{{ + name: "no-ce-resources", + taskRun: taskRunWithNoCEResources, + wantCloudEvents: taskRunWithNoCEResources.Status.CloudEvents, + }, { + name: "ce-resources-no-init", + taskRun: taskRunWithTwoCEResourcesNoInit, + wantCloudEvents: tb.TaskRun("want", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init", + taskRun: taskRunWithTwoCEResourcesInit, + wantCloudEvents: tb.TaskRun("want2", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful", + taskRun: taskRunWithCESucceded, + wantCloudEvents: tb.TaskRun("want3", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-failed", + taskRun: taskRunWithCEFailed, + wantCloudEvents: tb.TaskRun("want4", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful-one-attempt", + taskRun: taskRunWithCESuccededOneAttempt, + wantCloudEvents: tb.TaskRun("want5", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }} { + t.Run(tc.name, func(t *testing.T) { + names.TestingSeed() + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + saName := tc.taskRun.Spec.ServiceAccount + if saName == "" { + saName = "default" + } + if _, err := clients.Kube.CoreV1().ServiceAccounts(tc.taskRun.Namespace).Create(&corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: saName, + Namespace: tc.taskRun.Namespace, + }, + }); err != nil { + t.Fatal(err) + } + + if err := c.Reconciler.Reconcile(context.Background(), getRunName(tc.taskRun)); err != nil { + t.Errorf("expected no error. Got error %v", err) + } + namespace, name, err := cache.SplitMetaNamespaceKey(tc.taskRun.Name) + if err != nil { + t.Errorf("Invalid resource key: %v", err) + } + + tr, err := clients.Pipeline.TektonV1alpha1().TaskRuns(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated taskrun: %v", err) + } + opts := cloudevent.GetCloudEventDeliveryCompareOptions() + t.Log(tr.Status.CloudEvents) + if diff := cmp.Diff(tc.wantCloudEvents, tr.Status.CloudEvents, opts...); diff != "" { + t.Errorf("Unexpected status of cloud events (-want +got) = %s", diff) + } + }) + } +} diff --git a/test/builder/task.go b/test/builder/task.go index 9bb7c6f531f..64dcf91ec22 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -315,6 +315,24 @@ func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { } } +// TaskRunCloudEvent adds an event to the TaskRunStatus. +func TaskRunCloudEvent(target, error string, retryCount int32, condition v1alpha1.CloudEventCondition) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + if len(s.CloudEvents) == 0 { + s.CloudEvents = make([]v1alpha1.CloudEventDelivery, 0) + } + cloudEvent := v1alpha1.CloudEventDelivery{ + Target: target, + Status: v1alpha1.CloudEventDeliveryState{ + Condition: condition, + RetryCount: retryCount, + Error: error, + }, + } + s.CloudEvents = append(s.CloudEvents, cloudEvent) + } +} + // TaskRunTimeout sets the timeout duration to the TaskRunSpec. func TaskRunTimeout(d time.Duration) TaskRunSpecOp { return func(spec *v1alpha1.TaskRunSpec) { @@ -397,6 +415,13 @@ func TaskRunAnnotation(key, value string) TaskRunOp { } } +// TaskRunSelfLink adds a SelfLink +func TaskRunSelfLink(selflink string) TaskRunOp { + return func(tr *v1alpha1.TaskRun) { + tr.ObjectMeta.SelfLink = selflink + } +} + // TaskRunSpec sets the specified spec of the TaskRun. // Any number of TaskRunSpec modifier can be passed to transform it. func TaskRunSpec(ops ...TaskRunSpecOp) TaskRunOp {