From 3b1b392b3933c86e55f07201aba3c9859d393c23 Mon Sep 17 00:00:00 2001 From: Yongxuan Zhang Date: Wed, 30 Nov 2022 19:57:09 +0000 Subject: [PATCH] Refactor events to decouple k8s event and cloud event This commit refactors the code in events to decouple the k8s events emit and cloud events emit. This commit fixes #4404. --- .../cloudevent/cloud_event_controller.go | 40 +++ .../cloudevent/cloud_event_controller_test.go | 100 +++++++- pkg/reconciler/events/event.go | 97 +------- pkg/reconciler/events/event_test.go | 202 +-------------- pkg/reconciler/events/k8sevent/doc.go | 17 ++ pkg/reconciler/events/k8sevent/event.go | 80 ++++++ pkg/reconciler/events/k8sevent/event_test.go | 233 ++++++++++++++++++ .../reconciler/events/k8sevent}/events.go | 4 +- .../pipelinerun/pipelinerun_test.go | 8 +- pkg/reconciler/taskrun/taskrun_test.go | 24 +- 10 files changed, 495 insertions(+), 310 deletions(-) create mode 100644 pkg/reconciler/events/k8sevent/doc.go create mode 100644 pkg/reconciler/events/k8sevent/event.go create mode 100644 pkg/reconciler/events/k8sevent/event_test.go rename {test/events => pkg/reconciler/events/k8sevent}/events.go (97%) diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller.go b/pkg/reconciler/events/cloudevent/cloud_event_controller.go index 0a3d3b27eff..85087f84301 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller.go @@ -23,6 +23,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" @@ -30,9 +31,11 @@ import ( "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/clock" + "knative.dev/pkg/apis" controller "knative.dev/pkg/controller" "knative.dev/pkg/logging" ) @@ -73,6 +76,43 @@ func cloudEventDeliveryFromTargets(targets []string) []v1beta1.CloudEventDeliver return nil } +// EmitCloudEvents emits CloudEvents (only) for object +func EmitCloudEvents(ctx context.Context, object runtime.Object) { + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) + } + + if sendCloudEvents { + err := SendCloudEventWithRetries(ctx, object) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } +} + +// EmitCloudEventsWhenConditionChange emits CloudEvents when there is a change in condition +func EmitCloudEventsWhenConditionChange(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) + } + + if sendCloudEvents { + // Only send events if the new condition represents a change + if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) { + err := SendCloudEventWithRetries(ctx, object) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } + } +} + // SendCloudEvents is used by the TaskRun controller to send cloud events once // the TaskRun is complete. `tr` is used to obtain the list of targets func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.SugaredLogger, c clock.PassiveClock) error { diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go index 24a2dca945f..d09c0a42cea 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go @@ -22,11 +22,12 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent" "github.com/tektoncd/pipeline/test/diff" - eventstest "github.com/tektoncd/pipeline/test/events" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" @@ -621,7 +622,7 @@ func TestSendCloudEventWithRetries(t *testing.T) { ceClient := Get(ctx).(FakeClient) ceClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCEvents) recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) - if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { + if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { t.Fatalf(err.Error()) } }) @@ -677,6 +678,101 @@ func TestSendCloudEventWithRetriesNoClient(t *testing.T) { } } +func TestEmitCloudEvents(t *testing.T) { + + object := &v1alpha1.Run{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/run/test1", + }, + Status: v1alpha1.RunStatus{}, + } + testcases := []struct { + name string + data map[string]string + wantEvents []string + wantCloudEvents []string + }{{ + name: "without sink", + data: map[string]string{}, + wantEvents: []string{}, + wantCloudEvents: []string{}, + }, { + name: "with empty string sink", + data: map[string]string{"default-cloud-events-sink": ""}, + wantEvents: []string{}, + wantCloudEvents: []string{}, + }, { + name: "with sink", + data: map[string]string{"default-cloud-events-sink": "http://mysink"}, + wantEvents: []string{}, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.started.v1.*test1`}, + }} + + for _, tc := range testcases { + // Setup the context and seed test data + ctx, _ := rtesting.SetupFakeContext(t) + ctx = WithClient(ctx, &FakeClientBehaviour{SendSuccessfully: true}, len(tc.wantCloudEvents)) + fakeClient := Get(ctx).(FakeClient) + + // Setup the config and add it to the context + defaults, _ := config.NewDefaultsFromMap(tc.data) + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{}) + cfg := &config.Config{ + Defaults: defaults, + FeatureFlags: featureFlags, + } + ctx = config.ToContext(ctx, cfg) + + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + EmitCloudEvents(ctx, object) + if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { + t.Fatalf(err.Error()) + } + fakeClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents) + } +} + +func TestEmitCloudEventsWhenConditionChange(t *testing.T) { + objectStatus := duckv1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.PipelineRunReasonStarted.String(), + }}, + } + object := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + } + after := &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "just starting", + } + + data := map[string]string{"default-cloud-events-sink": "http://mysink"} + wantCloudEvents := []string{`(?s)dev.tekton.event.pipelinerun.started.v1.*test1`} + + // Setup the context and seed test data + ctx, _ := rtesting.SetupFakeContext(t) + ctx = WithClient(ctx, &FakeClientBehaviour{SendSuccessfully: true}, len(wantCloudEvents)) + fakeClient := Get(ctx).(FakeClient) + + // Setup the config and add it to the context + defaults, _ := config.NewDefaultsFromMap(data) + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{}) + cfg := &config.Config{ + Defaults: defaults, + FeatureFlags: featureFlags, + } + ctx = config.ToContext(ctx, cfg) + + EmitCloudEventsWhenConditionChange(ctx, nil, after, object) + fakeClient.CheckCloudEventsUnordered(t, "with sink", wantCloudEvents) +} + func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool, expectedEventCount int) context.Context { var ctx context.Context ctx, _ = rtesting.SetupFakeContext(t) diff --git a/pkg/reconciler/events/event.go b/pkg/reconciler/events/event.go index e2bb638b1a3..c4af716f7c9 100644 --- a/pkg/reconciler/events/event.go +++ b/pkg/reconciler/events/event.go @@ -19,27 +19,10 @@ package events import ( "context" - cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" + "github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" - "knative.dev/pkg/controller" - "knative.dev/pkg/logging" -) - -const ( - // EventReasonSucceded is the reason set for events about successful completion of TaskRuns / PipelineRuns - EventReasonSucceded = "Succeeded" - // EventReasonFailed is the reason set for events about unsuccessful completion of TaskRuns / PipelineRuns - EventReasonFailed = "Failed" - // EventReasonStarted is the reason set for events about the start of TaskRuns / PipelineRuns - EventReasonStarted = "Started" - // EventReasonError is the reason set for events related to TaskRuns / PipelineRuns reconcile errors - EventReasonError = "Error" ) // Emit emits events for object @@ -48,78 +31,12 @@ const ( // k8s events are always sent if afterCondition is different from beforeCondition // Cloud events are always sent if enabled, i.e. if a sink is available func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { - recorder := controller.GetEventRecorder(ctx) - logger := logging.FromContext(ctx) - configs := config.FromContextOrDefaults(ctx) - sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") - if sendCloudEvents { - ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) - } - - sendKubernetesEvents(recorder, beforeCondition, afterCondition, object) - - if sendCloudEvents { - // Only send events if the new condition represents a change - if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) { - err := cloudevent.SendCloudEventWithRetries(ctx, object) - if err != nil { - logger.Warnf("Failed to emit cloud events %v", err.Error()) - } - } - } + k8sevent.EmitK8sEvents(ctx, beforeCondition, afterCondition, object) + cloudevent.EmitCloudEventsWhenConditionChange(ctx, beforeCondition, afterCondition, object) } -// EmitCloudEvents emits CloudEvents (only) for object -func EmitCloudEvents(ctx context.Context, object runtime.Object) { - logger := logging.FromContext(ctx) - configs := config.FromContextOrDefaults(ctx) - sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") - if sendCloudEvents { - ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) - } +// EmitCloudEvents is refactored to cloudevent, this is to avoid breaking change +var EmitCloudEvents = cloudevent.EmitCloudEvents - if sendCloudEvents { - err := cloudevent.SendCloudEventWithRetries(ctx, object) - if err != nil { - logger.Warnf("Failed to emit cloud events %v", err.Error()) - } - } -} - -func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { - // Events that are going to be sent - // - // Status "ConditionUnknown": - // beforeCondition == nil, emit EventReasonStarted - // beforeCondition != nil, emit afterCondition.Reason - // - // Status "ConditionTrue": emit EventReasonSucceded - // Status "ConditionFalse": emit EventReasonFailed - if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) && afterCondition != nil { - // If the condition changed, and the target condition is not empty, we send an event - switch afterCondition.Status { - case corev1.ConditionTrue: - c.Event(object, corev1.EventTypeNormal, EventReasonSucceded, afterCondition.Message) - case corev1.ConditionFalse: - c.Event(object, corev1.EventTypeWarning, EventReasonFailed, afterCondition.Message) - case corev1.ConditionUnknown: - if beforeCondition == nil { - // If the condition changed, the status is "unknown", and there was no condition before, - // we emit the "Started event". We ignore further updates of the "unknown" status. - c.Event(object, corev1.EventTypeNormal, EventReasonStarted, "") - } else { - // If the condition changed, the status is "unknown", and there was a condition before, - // we emit an event that matches the reason and message of the condition. - // This is used for instance to signal the transition from "started" to "running" - c.Event(object, corev1.EventTypeNormal, afterCondition.Reason, afterCondition.Message) - } - } - } -} - -// EmitError emits a failure associated to an error -func EmitError(c record.EventRecorder, err error, object runtime.Object) { - if err != nil { - c.Event(object, corev1.EventTypeWarning, EventReasonError, err.Error()) - } -} +// EmitError is refactored to k8sevent, this is to avoid breaking change +var EmitError = k8sevent.EmitError diff --git a/pkg/reconciler/events/event_test.go b/pkg/reconciler/events/event_test.go index 25f0b64be39..93919a3b416 100644 --- a/pkg/reconciler/events/event_test.go +++ b/pkg/reconciler/events/event_test.go @@ -17,15 +17,12 @@ limitations under the License. package events import ( - "errors" "testing" - "time" "github.com/tektoncd/pipeline/pkg/apis/config" - "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" - eventstest "github.com/tektoncd/pipeline/test/events" + "github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" @@ -35,147 +32,6 @@ import ( rtesting "knative.dev/pkg/reconciler/testing" ) -func TestSendKubernetesEvents(t *testing.T) { - testcases := []struct { - name string - before *apis.Condition - after *apis.Condition - wantEvents []string - }{{ - name: "unknown to true with message", - before: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown, - }, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionTrue, - Message: "all done", - }, - wantEvents: []string{"Normal Succeeded all done"}, - }, { - name: "true to true", - before: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionTrue, - LastTransitionTime: apis.VolatileTime{Inner: metav1.NewTime(time.Now())}, - }, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionTrue, - LastTransitionTime: apis.VolatileTime{Inner: metav1.NewTime(time.Now().Add(5 * time.Minute))}, - }, - wantEvents: []string{}, - }, { - name: "false to false", - before: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionFalse, - }, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionFalse, - }, - wantEvents: []string{}, - }, { - name: "unknown to unknown", - before: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown, - Reason: "", - Message: "", - }, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown, - Reason: "foo", - Message: "bar", - }, - wantEvents: []string{"Normal foo bar"}, - }, { - name: "true to nil", - after: nil, - before: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionTrue, - }, - wantEvents: []string{}, - }, { - name: "nil to true", - before: nil, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionTrue, - }, - wantEvents: []string{"Normal Succeeded "}, - }, { - name: "nil to unknown with message", - before: nil, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown, - Message: "just starting", - }, - wantEvents: []string{"Normal Started "}, - }, { - name: "unknown to false with message", - before: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown, - }, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionFalse, - Message: "really bad", - }, - wantEvents: []string{"Warning Failed really bad"}, - }, { - name: "nil to false", - before: nil, - after: &apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionFalse, - }, - wantEvents: []string{"Warning Failed "}, - }} - - for _, ts := range testcases { - fr := record.NewFakeRecorder(1) - tr := &corev1.Pod{} - sendKubernetesEvents(fr, ts.before, ts.after, tr) - err := eventstest.CheckEventsOrdered(t, fr.Events, ts.name, ts.wantEvents) - if err != nil { - t.Errorf(err.Error()) - } - } -} - -func TestEmitError(t *testing.T) { - testcases := []struct { - name string - err error - wantEvents []string - }{{ - name: "with error", - err: errors.New("something went wrong"), - wantEvents: []string{"Warning Error something went wrong"}, - }, { - name: "without error", - err: nil, - wantEvents: []string{}, - }} - - for _, ts := range testcases { - fr := record.NewFakeRecorder(1) - tr := &corev1.Pod{} - EmitError(fr, ts.err, tr) - err := eventstest.CheckEventsOrdered(t, fr.Events, ts.name, ts.wantEvents) - if err != nil { - t.Errorf(err.Error()) - } - } -} - func TestEmit(t *testing.T) { objectStatus := duckv1.Status{ Conditions: []apis.Condition{{ @@ -234,61 +90,7 @@ func TestEmit(t *testing.T) { recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) Emit(ctx, nil, after, object) - if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { - t.Fatalf(err.Error()) - } - fakeClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents) - } -} - -func TestEmitCloudEvents(t *testing.T) { - - object := &v1alpha1.Run{ - ObjectMeta: metav1.ObjectMeta{ - SelfLink: "/run/test1", - }, - Status: v1alpha1.RunStatus{}, - } - testcases := []struct { - name string - data map[string]string - wantEvents []string - wantCloudEvents []string - }{{ - name: "without sink", - data: map[string]string{}, - wantEvents: []string{}, - wantCloudEvents: []string{}, - }, { - name: "with empty string sink", - data: map[string]string{"default-cloud-events-sink": ""}, - wantEvents: []string{}, - wantCloudEvents: []string{}, - }, { - name: "with sink", - data: map[string]string{"default-cloud-events-sink": "http://mysink"}, - wantEvents: []string{}, - wantCloudEvents: []string{`(?s)dev.tekton.event.run.started.v1.*test1`}, - }} - - for _, tc := range testcases { - // Setup the context and seed test data - ctx, _ := rtesting.SetupFakeContext(t) - ctx = cloudevent.WithClient(ctx, &cloudevent.FakeClientBehaviour{SendSuccessfully: true}, len(tc.wantCloudEvents)) - fakeClient := cloudevent.Get(ctx).(cloudevent.FakeClient) - - // Setup the config and add it to the context - defaults, _ := config.NewDefaultsFromMap(tc.data) - featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{}) - cfg := &config.Config{ - Defaults: defaults, - FeatureFlags: featureFlags, - } - ctx = config.ToContext(ctx, cfg) - - recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) - EmitCloudEvents(ctx, object) - if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { + if err := k8sevent.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { t.Fatalf(err.Error()) } fakeClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents) diff --git a/pkg/reconciler/events/k8sevent/doc.go b/pkg/reconciler/events/k8sevent/doc.go new file mode 100644 index 00000000000..9a6f44b22aa --- /dev/null +++ b/pkg/reconciler/events/k8sevent/doc.go @@ -0,0 +1,17 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8sevent diff --git a/pkg/reconciler/events/k8sevent/event.go b/pkg/reconciler/events/k8sevent/event.go new file mode 100644 index 00000000000..f0e1b390935 --- /dev/null +++ b/pkg/reconciler/events/k8sevent/event.go @@ -0,0 +1,80 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8sevent + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "knative.dev/pkg/apis" + "knative.dev/pkg/controller" +) + +const ( + // EventReasonSucceded is the reason set for events about successful completion of TaskRuns / PipelineRuns + EventReasonSucceded = "Succeeded" + // EventReasonFailed is the reason set for events about unsuccessful completion of TaskRuns / PipelineRuns + EventReasonFailed = "Failed" + // EventReasonStarted is the reason set for events about the start of TaskRuns / PipelineRuns + EventReasonStarted = "Started" + // EventReasonError is the reason set for events related to TaskRuns / PipelineRuns reconcile errors + EventReasonError = "Error" +) + +// EmitK8sEvents emits kubernetes events for object +// k8s events are always sent if afterCondition is different from beforeCondition +func EmitK8sEvents(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { + recorder := controller.GetEventRecorder(ctx) + // Events that are going to be sent + // + // Status "ConditionUnknown": + // beforeCondition == nil, emit EventReasonStarted + // beforeCondition != nil, emit afterCondition.Reason + // + // Status "ConditionTrue": emit EventReasonSucceded + // Status "ConditionFalse": emit EventReasonFailed + if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) && afterCondition != nil { + // If the condition changed, and the target condition is not empty, we send an event + switch afterCondition.Status { + case corev1.ConditionTrue: + recorder.Event(object, corev1.EventTypeNormal, EventReasonSucceded, afterCondition.Message) + case corev1.ConditionFalse: + recorder.Event(object, corev1.EventTypeWarning, EventReasonFailed, afterCondition.Message) + case corev1.ConditionUnknown: + if beforeCondition == nil { + // If the condition changed, the status is "unknown", and there was no condition before, + // we emit the "Started event". We ignore further updates of the "unknown" status. + recorder.Event(object, corev1.EventTypeNormal, EventReasonStarted, "") + } else { + // If the condition changed, the status is "unknown", and there was a condition before, + // we emit an event that matches the reason and message of the condition. + // This is used for instance to signal the transition from "started" to "running" + recorder.Event(object, corev1.EventTypeNormal, afterCondition.Reason, afterCondition.Message) + } + } + } +} + +// EmitError emits a failure associated to an error +func EmitError(c record.EventRecorder, err error, object runtime.Object) { + if err != nil { + c.Event(object, corev1.EventTypeWarning, EventReasonError, err.Error()) + } +} diff --git a/pkg/reconciler/events/k8sevent/event_test.go b/pkg/reconciler/events/k8sevent/event_test.go new file mode 100644 index 00000000000..650b64bf275 --- /dev/null +++ b/pkg/reconciler/events/k8sevent/event_test.go @@ -0,0 +1,233 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8sevent + +import ( + "errors" + "testing" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/controller" + rtesting "knative.dev/pkg/reconciler/testing" +) + +func TestEmitK8sEventsOnConditions(t *testing.T) { + testcases := []struct { + name string + before *apis.Condition + after *apis.Condition + wantEvents []string + }{{ + name: "unknown to true with message", + before: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + }, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + Message: "all done", + }, + wantEvents: []string{"Normal Succeeded all done"}, + }, { + name: "true to true", + before: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + LastTransitionTime: apis.VolatileTime{Inner: metav1.NewTime(time.Now())}, + }, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + LastTransitionTime: apis.VolatileTime{Inner: metav1.NewTime(time.Now().Add(5 * time.Minute))}, + }, + wantEvents: []string{}, + }, { + name: "false to false", + before: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }, + wantEvents: []string{}, + }, { + name: "unknown to unknown", + before: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: "", + Message: "", + }, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: "foo", + Message: "bar", + }, + wantEvents: []string{"Normal foo bar"}, + }, { + name: "true to nil", + after: nil, + before: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }, + wantEvents: []string{}, + }, { + name: "nil to true", + before: nil, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }, + wantEvents: []string{"Normal Succeeded "}, + }, { + name: "nil to unknown with message", + before: nil, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "just starting", + }, + wantEvents: []string{"Normal Started "}, + }, { + name: "unknown to false with message", + before: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + }, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Message: "really bad", + }, + wantEvents: []string{"Warning Failed really bad"}, + }, { + name: "nil to false", + before: nil, + after: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }, + wantEvents: []string{"Warning Failed "}, + }} + + for _, ts := range testcases { + tr := &corev1.Pod{} + ctx, _ := rtesting.SetupFakeContext(t) + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + EmitK8sEvents(ctx, ts.before, ts.after, tr) + err := CheckEventsOrdered(t, recorder.Events, ts.name, ts.wantEvents) + if err != nil { + t.Errorf(err.Error()) + } + } +} + +func TestEmitK8sEvents(t *testing.T) { + objectStatus := duckv1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.PipelineRunReasonStarted.String(), + }}, + } + object := &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + } + after := &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "just starting", + } + testcases := []struct { + name string + data map[string]string + wantEvents []string + }{{ + name: "without sink", + data: map[string]string{}, + wantEvents: []string{"Normal Started"}, + }, { + name: "with empty string sink", + data: map[string]string{"default-cloud-events-sink": ""}, + wantEvents: []string{"Normal Started"}, + }, { + name: "with sink", + data: map[string]string{"default-cloud-events-sink": "http://mysink"}, + wantEvents: []string{"Normal Started"}, + }} + + for _, tc := range testcases { + // Setup the context and seed test data + ctx, _ := rtesting.SetupFakeContext(t) + + // Setup the config and add it to the context + defaults, _ := config.NewDefaultsFromMap(tc.data) + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{}) + cfg := &config.Config{ + Defaults: defaults, + FeatureFlags: featureFlags, + } + ctx = config.ToContext(ctx, cfg) + + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + EmitK8sEvents(ctx, nil, after, object) + if err := CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil { + t.Fatalf(err.Error()) + } + } +} + +func TestEmitError(t *testing.T) { + testcases := []struct { + name string + err error + wantEvents []string + }{{ + name: "with error", + err: errors.New("something went wrong"), + wantEvents: []string{"Warning Error something went wrong"}, + }, { + name: "without error", + err: nil, + wantEvents: []string{}, + }} + + for _, ts := range testcases { + fr := record.NewFakeRecorder(1) + tr := &corev1.Pod{} + EmitError(fr, ts.err, tr) + err := CheckEventsOrdered(t, fr.Events, ts.name, ts.wantEvents) + if err != nil { + t.Errorf(err.Error()) + } + } +} diff --git a/test/events/events.go b/pkg/reconciler/events/k8sevent/events.go similarity index 97% rename from test/events/events.go rename to pkg/reconciler/events/k8sevent/events.go index f4a321c9099..0d03323c2e9 100644 --- a/test/events/events.go +++ b/pkg/reconciler/events/k8sevent/events.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package events +package k8sevent import ( "fmt" diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index d5b70c8bf2b..1de4a6d6de7 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -39,13 +39,13 @@ import ( resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" resolutionutil "github.com/tektoncd/pipeline/pkg/internal/resolution" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + "github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources" ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" resolutioncommon "github.com/tektoncd/pipeline/pkg/resolution/common" "github.com/tektoncd/pipeline/test" "github.com/tektoncd/pipeline/test/diff" - eventstest "github.com/tektoncd/pipeline/test/events" "github.com/tektoncd/pipeline/test/names" "github.com/tektoncd/pipeline/test/parse" "gomodules.xyz/jsonpatch/v2" @@ -2951,7 +2951,7 @@ spec: "Normal PipelineRunCouldntCancel PipelineRun \"test-pipeline-fails-to-cancel\" was cancelled but had errors trying to cancel TaskRuns", "Warning InternalError 1 error occurred", } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, prName, wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, prName, wantEvents) if err != nil { t.Errorf(err.Error()) } @@ -3067,7 +3067,7 @@ spec: "Normal PipelineRunCouldntTimeOut PipelineRun \"test-pipeline-fails-to-timeout\" was timed out but had errors trying to time out TaskRuns and/or Runs", "Warning InternalError 1 error occurred", } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, prName, wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, prName, wantEvents) if err != nil { t.Errorf(err.Error()) } @@ -7082,7 +7082,7 @@ func (prt PipelineRunTest) reconcileRun(namespace, pipelineRunName string, wantE // Check generated events match what's expected if len(wantEvents) > 0 { - if err := eventstest.CheckEventsOrdered(prt.Test, prt.TestAssets.Recorder.Events, pipelineRunName, wantEvents); err != nil { + if err := k8sevent.CheckEventsOrdered(prt.Test, prt.TestAssets.Recorder.Events, pipelineRunName, wantEvents); err != nil { prt.Test.Errorf(err.Error()) } } diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 0d4fe769ef5..53b9b4bb3e5 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -41,6 +41,7 @@ import ( resolutionutil "github.com/tektoncd/pipeline/pkg/internal/resolution" podconvert "github.com/tektoncd/pipeline/pkg/pod" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + "github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent" "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources" ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" @@ -48,7 +49,6 @@ import ( "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test" "github.com/tektoncd/pipeline/test/diff" - eventstest "github.com/tektoncd/pipeline/test/events" "github.com/tektoncd/pipeline/test/names" "github.com/tektoncd/pipeline/test/parse" corev1 "k8s.io/api/core/v1" @@ -701,7 +701,7 @@ spec: t.Errorf("Expected reason %q but was %s", v1beta1.TaskRunReasonRunning.String(), condition.Reason) } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, "reconcile-cloud-events", wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, "reconcile-cloud-events", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1193,7 +1193,7 @@ spec: t.Fatalf("Expected actions to be logged in the kubeclient, got none") } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.name, tc.wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.name, tc.wantEvents) if err != nil { t.Errorf(err.Error()) } @@ -1348,7 +1348,7 @@ spec: t.Fatalf("Expected actions to be logged in the kubeclient, got none") } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.name, tc.wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.name, tc.wantEvents) if err != nil { t.Errorf(err.Error()) } @@ -1687,7 +1687,7 @@ spec: t.Errorf("expected 2 actions, got %d. Actions: %#v", len(actions), actions) } - err := eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.name, tc.wantEvents) + err := k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -1965,7 +1965,7 @@ status: "Normal Running Not all Steps", "Normal Succeeded", } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, "test-reconcile-pod-updateStatus", wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, "test-reconcile-pod-updateStatus", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2064,7 +2064,7 @@ status: "Normal Started", "Warning Failed TaskRun \"test-taskrun-run-cancelled\" was cancelled", } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, "test-reconcile-on-cancelled-taskrun", wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, "test-reconcile-on-cancelled-taskrun", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2137,7 +2137,7 @@ status: "Normal Started", "Warning Failed TaskRun \"test-taskrun-run-timedout\" was cancelled. TaskRun cancelled as pipeline has been cancelled.", } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, "test-reconcile-on-timedout-taskrun", wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, "test-reconcile-on-timedout-taskrun", wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -2208,7 +2208,7 @@ status: if d := cmp.Diff(expectedStatus, condition, ignoreLastTransitionTime); d != "" { t.Fatalf("Did not get expected condition %s", diff.PrintWantGot(d)) } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, taskRun.Name, wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, taskRun.Name, wantEvents) if err != nil { t.Errorf(err.Error()) } @@ -2280,7 +2280,7 @@ status: if d := cmp.Diff(expectedStatus, condition, ignoreLastTransitionTime); d != "" { t.Fatalf("Did not get expected condition %s", diff.PrintWantGot(d)) } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, taskRun.Name, wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, taskRun.Name, wantEvents) if err != nil { t.Errorf(err.Error()) } @@ -2395,7 +2395,7 @@ status: if d := cmp.Diff(tc.expectedStatus, condition, ignoreLastTransitionTime); d != "" { t.Fatalf("Did not get expected condition %s", diff.PrintWantGot(d)) } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.taskRun.Name, tc.wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, tc.taskRun.Name, tc.wantEvents) if !(err == nil) { t.Errorf(err.Error()) } @@ -3543,7 +3543,7 @@ spec: } } - err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, tt.desc, tt.wantEvents) + err = k8sevent.CheckEventsOrdered(t, testAssets.Recorder.Events, tt.desc, tt.wantEvents) if !(err == nil) { t.Errorf(err.Error()) }