Skip to content

Commit

Permalink
Refactor events to decouple k8s event and cloud event
Browse files Browse the repository at this point in the history
This commit refactors the code in events to decouple the k8s events emit
and cloud events emit. This commit fixes #4404.
  • Loading branch information
Yongxuanzhang committed Nov 30, 2022
1 parent 1ff593c commit 4e66060
Show file tree
Hide file tree
Showing 14 changed files with 454 additions and 391 deletions.
3 changes: 1 addition & 2 deletions pkg/reconciler/customrun/customrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
_ "github.com/tektoncd/pipeline/pkg/taskrunmetrics/fake" // Make sure the taskrunmetrics are setup
Expand Down Expand Up @@ -65,7 +64,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *v1beta1.Custo
condition := customRunEvents.Status.GetCondition(apis.ConditionSucceeded)
logger.Debugf("Emitting cloudevent for %s, condition: %s", customRunEvents.Name, condition)

events.EmitCloudEvents(ctx, &customRunEvents)
cloudevent.EmitCloudEvents(ctx, &customRunEvents)
}

return nil
Expand Down
39 changes: 39 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/clock"
"knative.dev/pkg/apis"
controller "knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand Down Expand Up @@ -73,6 +76,42 @@ func cloudEventDeliveryFromTargets(targets []string) []v1beta1.CloudEventDeliver
return nil
}

// EmitCloudEvents emits CloudEvents (only) for object
func EmitCloudEvents(ctx context.Context, object runtime.Object) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

if sendCloudEvents {
err := SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}

func EmitCloudEventsWhenConditionChange(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

if sendCloudEvents {
// Only send events if the new condition represents a change
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) {
err := SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}
}

// SendCloudEvents is used by the TaskRun controller to send cloud events once
// the TaskRun is complete. `tr` is used to obtain the list of targets
func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.SugaredLogger, c clock.PassiveClock) error {
Expand Down
60 changes: 58 additions & 2 deletions pkg/reconciler/events/cloudevent/cloud_event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent"
"github.com/tektoncd/pipeline/test/diff"
eventstest "github.com/tektoncd/pipeline/test/events"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -621,7 +622,7 @@ func TestSendCloudEventWithRetries(t *testing.T) {
ceClient := Get(ctx).(FakeClient)
ceClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCEvents)
recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
}
})
Expand Down Expand Up @@ -677,6 +678,61 @@ func TestSendCloudEventWithRetriesNoClient(t *testing.T) {
}
}

func TestEmitCloudEvents(t *testing.T) {

object := &v1alpha1.Run{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/run/test1",
},
Status: v1alpha1.RunStatus{},
}
testcases := []struct {
name string
data map[string]string
wantEvents []string
wantCloudEvents []string
}{{
name: "without sink",
data: map[string]string{},
wantEvents: []string{},
wantCloudEvents: []string{},
}, {
name: "with empty string sink",
data: map[string]string{"default-cloud-events-sink": ""},
wantEvents: []string{},
wantCloudEvents: []string{},
}, {
name: "with sink",
data: map[string]string{"default-cloud-events-sink": "http://mysink"},
wantEvents: []string{},
wantCloudEvents: []string{`(?s)dev.tekton.event.run.started.v1.*test1`},
}}

for _, tc := range testcases {
// Setup the context and seed test data
ctx, _ := rtesting.SetupFakeContext(t)
ctx = WithClient(ctx, &FakeClientBehaviour{SendSuccessfully: true}, len(tc.wantCloudEvents))
fakeClient := Get(ctx).(FakeClient)

// Setup the config and add it to the context
defaults, _ := config.NewDefaultsFromMap(tc.data)
featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{})
cfg := &config.Config{
Defaults: defaults,
FeatureFlags: featureFlags,
}
ctx = config.ToContext(ctx, cfg)

recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
EmitCloudEvents(ctx, object)
if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
}
fakeClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents)
}
}


func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool, expectedEventCount int) context.Context {
var ctx context.Context
ctx, _ = rtesting.SetupFakeContext(t)
Expand Down
94 changes: 3 additions & 91 deletions pkg/reconciler/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,10 @@ package events
import (
"context"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

const (
// EventReasonSucceded is the reason set for events about successful completion of TaskRuns / PipelineRuns
EventReasonSucceded = "Succeeded"
// EventReasonFailed is the reason set for events about unsuccessful completion of TaskRuns / PipelineRuns
EventReasonFailed = "Failed"
// EventReasonStarted is the reason set for events about the start of TaskRuns / PipelineRuns
EventReasonStarted = "Started"
// EventReasonError is the reason set for events related to TaskRuns / PipelineRuns reconcile errors
EventReasonError = "Error"
)

// Emit emits events for object
Expand All @@ -48,78 +31,7 @@ const (
// k8s events are always sent if afterCondition is different from beforeCondition
// Cloud events are always sent if enabled, i.e. if a sink is available
func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
recorder := controller.GetEventRecorder(ctx)
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

sendKubernetesEvents(recorder, beforeCondition, afterCondition, object)

if sendCloudEvents {
// Only send events if the new condition represents a change
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) {
err := cloudevent.SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}
}

// EmitCloudEvents emits CloudEvents (only) for object
func EmitCloudEvents(ctx context.Context, object runtime.Object) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

if sendCloudEvents {
err := cloudevent.SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
k8sevent.EmitK8sEvents(ctx, beforeCondition, afterCondition, object)
cloudevent.EmitCloudEventsWhenConditionChange(ctx, beforeCondition, afterCondition, object)
}

func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
// Events that are going to be sent
//
// Status "ConditionUnknown":
// beforeCondition == nil, emit EventReasonStarted
// beforeCondition != nil, emit afterCondition.Reason
//
// Status "ConditionTrue": emit EventReasonSucceded
// Status "ConditionFalse": emit EventReasonFailed
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) && afterCondition != nil {
// If the condition changed, and the target condition is not empty, we send an event
switch afterCondition.Status {
case corev1.ConditionTrue:
c.Event(object, corev1.EventTypeNormal, EventReasonSucceded, afterCondition.Message)
case corev1.ConditionFalse:
c.Event(object, corev1.EventTypeWarning, EventReasonFailed, afterCondition.Message)
case corev1.ConditionUnknown:
if beforeCondition == nil {
// If the condition changed, the status is "unknown", and there was no condition before,
// we emit the "Started event". We ignore further updates of the "unknown" status.
c.Event(object, corev1.EventTypeNormal, EventReasonStarted, "")
} else {
// If the condition changed, the status is "unknown", and there was a condition before,
// we emit an event that matches the reason and message of the condition.
// This is used for instance to signal the transition from "started" to "running"
c.Event(object, corev1.EventTypeNormal, afterCondition.Reason, afterCondition.Message)
}
}
}
}

// EmitError emits a failure associated to an error
func EmitError(c record.EventRecorder, err error, object runtime.Object) {
if err != nil {
c.Event(object, corev1.EventTypeWarning, EventReasonError, err.Error())
}
}
Loading

0 comments on commit 4e66060

Please sign in to comment.