Skip to content

Commit

Permalink
WIP: Cloud Event pipeline resource
Browse files Browse the repository at this point in the history
Implements in the TaskRun controller the logic to provide
the cloud event pipeline resource.

This commits puts together the API, cloud event helper and
resource definition from four pull requests:
- tektoncd#1090
- tektoncd#1091
- tektoncd#1092

TBD: Test for interaction with the image digest export

It adds unit tests for the new code and one E2E YAML test.
The YAML test runs a simple http server that can receive the
cloudevent for test purposes.

The list of cloud events to be sent is added to the TaskRun
status and processed by the TaskRun controller once the pod
associated to the TaskRun completes its execution.
The `isDone` definition of the TaskRun is not altered, the
reconciler checks for events to be sent once the
TaskRun.isDone is true.

Retries are not implemented yet in the sense that every
scheduled event will be attempted exactly once, but it may
be that those attempts happen across different invocations
of Reconcile.

Signed-off-by: Andrea Frittoli <andrea.frittoli@uk.ibm.com>
  • Loading branch information
afrittoli committed Aug 8, 2019
1 parent 6e11b60 commit 3dae89f
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 5 deletions.
63 changes: 63 additions & 0 deletions docs/resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The following `PipelineResources` are currently supported:
- [Storage Resource](#storage-resource)
- [GCS Storage Resource](#gcs-storage-resource)
- [BuildGCS Storage Resource](#buildgcs-storage-resource)
- [Cloud Event Resource](#cloud-event-resource)

### Git Resource

Expand Down Expand Up @@ -617,6 +618,68 @@ the container image
[gcr.io/cloud-builders//gcs-fetcher](https://github.com/GoogleCloudPlatform/cloud-builders/tree/master/gcs-fetcher)
does not support configuring secrets.

### Cloud Event Resource

The Cloud Event Resource represents a [cloud event](https://github.com/cloudevents/spec)
that is sent to a target `URI` upon completion of a `TaskRun`.
The Cloud Event Resource sends Tekton specific events; the body of the event includes
the entire TaskRun spec plus status; the types of events defined for now are:

- dev.tekton.event.task.unknown
- dev.tekton.event.task.successful
- dev.tekton.event.task.failed

Cloud event resources are useful to notify a third party upon the completion and
status of a `TaskRun`. In combinations with the [Tekton triggers](https://github.com/tektoncd/triggers)
project they can be used to link `Task/PipelineRuns` asynchronously.

To create a CloudEvent resource using the `PipelineResource` CRD:

```yaml
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: event-to-sink
spec:
type: cloudevent
params:
- name: targetURI
value: http://sink:8080
```

The content of an event is for example:

```yaml
Context Attributes,
SpecVersion: 0.2
Type: dev.tekton.event.task.successful
Source: /apis/tekton.dev/v1alpha1/namespaces/default/taskruns/pipeline-run-api-16aa55-source-to-image-task-rpndl
ID: pipeline-run-api-16aa55-source-to-image-task-rpndl
Time: 2019-07-04T11:03:53.058694712Z
ContentType: application/json
Transport Context,
URI: /
Host: my-sink.default.my-cluster.containers.appdomain.cloud
Method: POST
Data,
{
"taskRun": {
"metadata": {...}
"spec": {
"inputs": {...}
"outputs": {...}
"serviceAccount": "default",
"taskRef": {
"name": "source-to-image",
"kind": "Task"
},
"timeout": "1h0m0s"
},
"status": {...}
}
}
```

Except as otherwise noted, the content of this page is licensed under the
[Creative Commons Attribution 4.0 License](https://creativecommons.org/licenses/by/4.0/),
and code samples are licensed under the
Expand Down
111 changes: 111 additions & 0 deletions examples/taskruns/taskrun-cloud-event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
---
apiVersion: v1
kind: Service
metadata:
name: sink
namespace: default
spec:
selector:
app: cloudevent
ports:
- protocol: TCP
port: 8080
targetPort: 8080
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: cloudevent
name: message-sink
namespace: default
spec:
containers:
- env:
- name: PORT
value: "8080"
name: cloudeventlistener
image: python:3-alpine
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -ce
- |
cat <<EOF | python
from http.server import BaseHTTPRequestHandler, HTTPServer
class PostHandler(BaseHTTPRequestHandler):
def do_POST(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write("<html><body><h1>POST!</h1></body></html>")
if __name__ == "__main__":
httpd = HTTPServer(('', $PORT), PostHandler)
print('Starting httpd...')
httpd.serve_forever()
EOF
ports:
- containerPort: 8080
name: user-port
protocol: TCP
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: to-message-sink
spec:
type: cloudEvent
params:
- name: targetURI
value: http://sink.default:8080
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: rules-branch
spec:
type: git
params:
- name: revision
value: master
- name: url
value: https://github.com/bazelbuild/rules_docker
---
apiVersion: tekton.dev/v1alpha1
kind: Task
metadata:
name: send-cloud-event-task
spec:
inputs:
resources:
- name: workspace
type: git
outputs:
resources:
- name: notification
type: cloudEvent
steps:
- name: list
image: ubuntu
command: ["/bin/bash"]
args: ['-c', 'ls -al /workspace']
---
apiVersion: tekton.dev/v1alpha1
kind: TaskRun
metadata:
name: send-cloud-event
spec:
inputs:
resources:
- name: workspace
resourceRef:
name: rules-branch
outputs:
resources:
- name: notification
resourceRef:
name: to-message-sink
taskRef:
name: send-cloud-event-task
2 changes: 2 additions & 0 deletions pkg/reconciler/v1alpha1/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -69,6 +70,7 @@ func NewController(
clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
}
impl := controller.NewImpl(c, c.Logger, taskRunControllerName)

Expand Down
77 changes: 72 additions & 5 deletions pkg/reconciler/v1alpha1/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"strings"
"time"

"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/sidecars"
"github.com/tektoncd/pipeline/pkg/status"
"go.uber.org/zap"
Expand Down Expand Up @@ -65,6 +67,7 @@ type Reconciler struct {
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister listers.PipelineResourceLister
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
cache *entrypoint.Cache
timeoutHandler *reconciler.TimeoutSet
Expand Down Expand Up @@ -108,25 +111,42 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}

if tr.IsDone() {
var merr *multierror.Error
// Try to send cloud events first
cloudEventErr := c.sendCloudEvents(tr)
// Regardless of `err`, we must write back any status update that may have
// been generated by `sendCloudEvents`
updateErr := c.updateStatusLabelsAndAnnotations(tr, original)
merr = multierror.Append(cloudEventErr, updateErr)
if cloudEventErr != nil {
// Let's keep timeouts and sidecars running as long as we're trying to
// send cloud events. So we stop here an return errors encountered this far.
return merr.ErrorOrNil()
}
c.timeoutHandler.Release(tr)
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = sidecars.Stop(pod, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update)
} else if errors.IsNotFound(err) {
return nil
return merr.ErrorOrNil()
}
if err != nil {
c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err)
merr = multierror.Append(merr, err)
}
return err
return merr.ErrorOrNil()
}

// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
if err := c.reconcile(ctx, tr); err != nil {
c.Logger.Errorf("Reconcile error: %v", err.Error())
return err
}
return c.updateStatusLabelsAndAnnotations(tr, original)
}

func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error {
var err error
if equality.Semantic.DeepEqual(original.Status, tr.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
Expand All @@ -144,7 +164,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
return err
}
}

return err
}

Expand Down Expand Up @@ -253,6 +272,25 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
return nil
}

// Initialize the cloud events if at least a CloudEventResource is defined
// and they have not been initialized yet.
c.Logger.Infof("Cloud Events: %s", tr.Status.CloudEvents)
// FIXME(afrittoli) If there are no events this is run every time
if len(tr.Status.CloudEvents) == 0 {
targets := make([]string, len(rtr.Outputs))
idx := 0
for _, output := range rtr.Outputs {
if output.Spec.Type == v1alpha1.PipelineResourceTypeCloudEvent {
cer, _ := v1alpha1.NewCloudEventResource(output)
targets[idx] = cer.TargetURI
idx++
}
}
if idx > 0 {
tr.Status.InitializeCloudEvents(targets)
}
}

// Get the TaskRun's Pod if it should have one. Otherwise, create the Pod.
pod, err := resources.TryGetPod(tr.Status, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get)
if err != nil {
Expand Down Expand Up @@ -293,12 +331,41 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
}

reconciler.EmitEvent(c.Recorder, before, after, tr)

c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after)

return nil
}

func (c *Reconciler) sendCloudEvents(tr *v1alpha1.TaskRun) error {
// The TaskRun is complete. Time to send cloud events (if any).
var merr *multierror.Error
for idx, cloudEventDelivery := range tr.Status.CloudEvents {
eventStatus := &(tr.Status.CloudEvents[idx].Status)
// Skip events that have already been sent (successfully or unsuccessfully)
// Ensure we try to send all events once (possibly through different reconcile calls)
if eventStatus.Condition != v1alpha1.CloudEventConditionUnknown || eventStatus.RetryCount > 0 {
continue
}
_, err := cloudevent.SendTaskRunCloudEvent(cloudEventDelivery.Target, tr, c.Logger, c.cloudEventClient)
eventStatus.SentAt = &metav1.Time{Time: time.Now()}
eventStatus.RetryCount = eventStatus.RetryCount + 1
if err != nil {
merr = multierror.Append(merr, err)
eventStatus.Condition = v1alpha1.CloudEventConditionFailed
eventStatus.Error = merr.Error()
} else {
c.Logger.Infof("Sent event for target %s", cloudEventDelivery.Target)
eventStatus.Condition = v1alpha1.CloudEventConditionSent
}
}
if merr != nil && merr.Len() > 0 {
c.Logger.Errorf("Failed to send %d cloud events for TaskRun %s", merr.Len(), tr.Name)
// Return all send error
return merr
}
return merr
}

func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) {
var reason, msg string
var succeededStatus corev1.ConditionStatus
Expand Down
Loading

0 comments on commit 3dae89f

Please sign in to comment.