Skip to content

Commit

Permalink
Refactor events to decouple k8s event and cloud event
Browse files Browse the repository at this point in the history
This commit refactors the code in events to decouple the k8s events emit
and cloud events emit. This commit fixes #4404.
  • Loading branch information
Yongxuanzhang committed Dec 1, 2022
1 parent 1ff593c commit 3b1b392
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 310 deletions.
40 changes: 40 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/clock"
"knative.dev/pkg/apis"
controller "knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand Down Expand Up @@ -73,6 +76,43 @@ func cloudEventDeliveryFromTargets(targets []string) []v1beta1.CloudEventDeliver
return nil
}

// EmitCloudEvents emits CloudEvents (only) for object
func EmitCloudEvents(ctx context.Context, object runtime.Object) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

if sendCloudEvents {
err := SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}

// EmitCloudEventsWhenConditionChange emits CloudEvents when there is a change in condition
func EmitCloudEventsWhenConditionChange(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

if sendCloudEvents {
// Only send events if the new condition represents a change
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) {
err := SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}
}

// SendCloudEvents is used by the TaskRun controller to send cloud events once
// the TaskRun is complete. `tr` is used to obtain the list of targets
func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.SugaredLogger, c clock.PassiveClock) error {
Expand Down
100 changes: 98 additions & 2 deletions pkg/reconciler/events/cloudevent/cloud_event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent"
"github.com/tektoncd/pipeline/test/diff"
eventstest "github.com/tektoncd/pipeline/test/events"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -621,7 +622,7 @@ func TestSendCloudEventWithRetries(t *testing.T) {
ceClient := Get(ctx).(FakeClient)
ceClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCEvents)
recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
}
})
Expand Down Expand Up @@ -677,6 +678,101 @@ func TestSendCloudEventWithRetriesNoClient(t *testing.T) {
}
}

func TestEmitCloudEvents(t *testing.T) {

object := &v1alpha1.Run{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/run/test1",
},
Status: v1alpha1.RunStatus{},
}
testcases := []struct {
name string
data map[string]string
wantEvents []string
wantCloudEvents []string
}{{
name: "without sink",
data: map[string]string{},
wantEvents: []string{},
wantCloudEvents: []string{},
}, {
name: "with empty string sink",
data: map[string]string{"default-cloud-events-sink": ""},
wantEvents: []string{},
wantCloudEvents: []string{},
}, {
name: "with sink",
data: map[string]string{"default-cloud-events-sink": "http://mysink"},
wantEvents: []string{},
wantCloudEvents: []string{`(?s)dev.tekton.event.run.started.v1.*test1`},
}}

for _, tc := range testcases {
// Setup the context and seed test data
ctx, _ := rtesting.SetupFakeContext(t)
ctx = WithClient(ctx, &FakeClientBehaviour{SendSuccessfully: true}, len(tc.wantCloudEvents))
fakeClient := Get(ctx).(FakeClient)

// Setup the config and add it to the context
defaults, _ := config.NewDefaultsFromMap(tc.data)
featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{})
cfg := &config.Config{
Defaults: defaults,
FeatureFlags: featureFlags,
}
ctx = config.ToContext(ctx, cfg)

recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
EmitCloudEvents(ctx, object)
if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
}
fakeClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents)
}
}

func TestEmitCloudEventsWhenConditionChange(t *testing.T) {
objectStatus := duckv1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: v1beta1.PipelineRunReasonStarted.String(),
}},
}
object := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/pipelineruns/test1",
},
Status: v1beta1.PipelineRunStatus{Status: objectStatus},
}
after := &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Message: "just starting",
}

data := map[string]string{"default-cloud-events-sink": "http://mysink"}
wantCloudEvents := []string{`(?s)dev.tekton.event.pipelinerun.started.v1.*test1`}

// Setup the context and seed test data
ctx, _ := rtesting.SetupFakeContext(t)
ctx = WithClient(ctx, &FakeClientBehaviour{SendSuccessfully: true}, len(wantCloudEvents))
fakeClient := Get(ctx).(FakeClient)

// Setup the config and add it to the context
defaults, _ := config.NewDefaultsFromMap(data)
featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{})
cfg := &config.Config{
Defaults: defaults,
FeatureFlags: featureFlags,
}
ctx = config.ToContext(ctx, cfg)

EmitCloudEventsWhenConditionChange(ctx, nil, after, object)
fakeClient.CheckCloudEventsUnordered(t, "with sink", wantCloudEvents)
}

func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool, expectedEventCount int) context.Context {
var ctx context.Context
ctx, _ = rtesting.SetupFakeContext(t)
Expand Down
97 changes: 7 additions & 90 deletions pkg/reconciler/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,10 @@ package events
import (
"context"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

const (
// EventReasonSucceded is the reason set for events about successful completion of TaskRuns / PipelineRuns
EventReasonSucceded = "Succeeded"
// EventReasonFailed is the reason set for events about unsuccessful completion of TaskRuns / PipelineRuns
EventReasonFailed = "Failed"
// EventReasonStarted is the reason set for events about the start of TaskRuns / PipelineRuns
EventReasonStarted = "Started"
// EventReasonError is the reason set for events related to TaskRuns / PipelineRuns reconcile errors
EventReasonError = "Error"
)

// Emit emits events for object
Expand All @@ -48,78 +31,12 @@ const (
// k8s events are always sent if afterCondition is different from beforeCondition
// Cloud events are always sent if enabled, i.e. if a sink is available
func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
recorder := controller.GetEventRecorder(ctx)
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}

sendKubernetesEvents(recorder, beforeCondition, afterCondition, object)

if sendCloudEvents {
// Only send events if the new condition represents a change
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) {
err := cloudevent.SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}
k8sevent.EmitK8sEvents(ctx, beforeCondition, afterCondition, object)
cloudevent.EmitCloudEventsWhenConditionChange(ctx, beforeCondition, afterCondition, object)
}

// EmitCloudEvents emits CloudEvents (only) for object
func EmitCloudEvents(ctx context.Context, object runtime.Object) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
if sendCloudEvents {
ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink)
}
// EmitCloudEvents is refactored to cloudevent, this is to avoid breaking change
var EmitCloudEvents = cloudevent.EmitCloudEvents

if sendCloudEvents {
err := cloudevent.SendCloudEventWithRetries(ctx, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
}
}

func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
// Events that are going to be sent
//
// Status "ConditionUnknown":
// beforeCondition == nil, emit EventReasonStarted
// beforeCondition != nil, emit afterCondition.Reason
//
// Status "ConditionTrue": emit EventReasonSucceded
// Status "ConditionFalse": emit EventReasonFailed
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) && afterCondition != nil {
// If the condition changed, and the target condition is not empty, we send an event
switch afterCondition.Status {
case corev1.ConditionTrue:
c.Event(object, corev1.EventTypeNormal, EventReasonSucceded, afterCondition.Message)
case corev1.ConditionFalse:
c.Event(object, corev1.EventTypeWarning, EventReasonFailed, afterCondition.Message)
case corev1.ConditionUnknown:
if beforeCondition == nil {
// If the condition changed, the status is "unknown", and there was no condition before,
// we emit the "Started event". We ignore further updates of the "unknown" status.
c.Event(object, corev1.EventTypeNormal, EventReasonStarted, "")
} else {
// If the condition changed, the status is "unknown", and there was a condition before,
// we emit an event that matches the reason and message of the condition.
// This is used for instance to signal the transition from "started" to "running"
c.Event(object, corev1.EventTypeNormal, afterCondition.Reason, afterCondition.Message)
}
}
}
}

// EmitError emits a failure associated to an error
func EmitError(c record.EventRecorder, err error, object runtime.Object) {
if err != nil {
c.Event(object, corev1.EventTypeWarning, EventReasonError, err.Error())
}
}
// EmitError is refactored to k8sevent, this is to avoid breaking change
var EmitError = k8sevent.EmitError
Loading

0 comments on commit 3b1b392

Please sign in to comment.