Skip to content

Commit

Permalink
Update external event recorder to upstream k8s
Browse files Browse the repository at this point in the history
Update external event recorder to be compatible with the upstream k8s
EventRecorder interface.
Since the EventRecorder interface methods don't return any error, add a
logger to the external event recorder to log any errors.

Add an event type constant for trace events.

Update `controller.Events.EventWithMetaf()` to send the available
metadata information to the kubernetes event recorder with
`AnnotatedEventf()`.

Update the tests to work with the function signature changes.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Oct 10, 2021
1 parent 7cdf669 commit c300f37
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 74 deletions.
14 changes: 2 additions & 12 deletions runtime/controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package controller
import (
"context"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -80,18 +78,10 @@ func (e Events) EventWithMeta(ctx context.Context, obj client.Object, metadata m
// EventWithMetaf emits a Kubernetes event, and forwards the event and metadata to the ExternalEventRecorder if configured.
func (e Events) EventWithMetaf(ctx context.Context, obj client.Object, metadata map[string]string, severity, reason, msgFmt string, args ...interface{}) {
if e.EventRecorder != nil {
e.EventRecorder.Eventf(obj, severityToEventType(severity), reason, msgFmt, args...)
e.EventRecorder.AnnotatedEventf(obj, metadata, severityToEventType(severity), reason, msgFmt, args...)
}
if e.ExternalEventRecorder != nil {
ref, err := reference.GetReference(e.Scheme, obj)
if err != nil {
logr.FromContextOrDiscard(ctx).Error(err, "unable to get object reference to send event")
return
}
if err := e.ExternalEventRecorder.Eventf(*ref, metadata, severity, reason, msgFmt, args...); err != nil {
logr.FromContextOrDiscard(ctx).Error(err, "unable to send event")
return
}
e.ExternalEventRecorder.AnnotatedEventf(obj, metadata, severityToEventType(severity), reason, msgFmt, args...)
}
}

Expand Down
3 changes: 3 additions & 0 deletions runtime/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
EventSeverityError string = "error"
)

// EventTypeTrace represents a trace event.
const EventTypeTrace string = "Trace"

// Event is a report of an event issued by a controller.
// +kubebuilder:object:generate=true
type Event struct {
Expand Down
101 changes: 67 additions & 34 deletions runtime/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ import (
"os"
"time"

"github.com/go-logr/logr"
"github.com/hashicorp/go-retryablehttp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
)

// Recorder posts events to the webhook address.
Expand All @@ -40,11 +44,19 @@ type Recorder struct {

// Retryable HTTP client.
Client *retryablehttp.Client

// Scheme of the recorded objects.
Scheme *runtime.Scheme

// Log is the recorder logger.
Log logr.Logger
}

var _ kuberecorder.EventRecorder = &Recorder{}

// NewRecorder creates an event Recorder with default settings.
// The recorder performs automatic retries for connection errors and 500-range response codes.
func NewRecorder(webhook, reportingController string) (*Recorder, error) {
func NewRecorder(scheme *runtime.Scheme, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
if _, err := url.Parse(webhook); err != nil {
return nil, err
}
Expand All @@ -55,89 +67,110 @@ func NewRecorder(webhook, reportingController string) (*Recorder, error) {
httpClient.Logger = nil

return &Recorder{
Scheme: scheme,
Webhook: webhook,
ReportingController: reportingController,
Client: httpClient,
Log: log,
}, nil
}

// EventInfof records an event with information severity.
func (r *Recorder) EventInfof(
object corev1.ObjectReference,
metadata map[string]string,
reason string, messageFmt string, args ...interface{}) error {
return r.Eventf(object, metadata, EventSeverityInfo, reason, messageFmt, args...)
// Event records an event in the webhook address.
func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) {
r.AnnotatedEventf(object, nil, eventtype, reason, message)
}

// EventErrorf records an event with error severity.
func (r *Recorder) EventErrorf(
object corev1.ObjectReference,
metadata map[string]string,
reason string, messageFmt string, args ...interface{}) error {
return r.Eventf(object, metadata, EventSeverityError, reason, messageFmt, args...)
// Event records an event in the webhook address.
func (r *Recorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
r.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...)
}

// Eventf constructs an event from the given information and performs a HTTP POST to the webhook address.
func (r *Recorder) Eventf(
object corev1.ObjectReference,
metadata map[string]string,
severity, reason string,
messageFmt string, args ...interface{}) error {
// AnnotatedEventf constructs an event from the given information and performs a HTTP POST to the webhook address.
func (r *Recorder) AnnotatedEventf(
object runtime.Object,
annotations map[string]string,
eventtype, reason string,
messageFmt string, args ...interface{}) {

ref, err := reference.GetReference(r.Scheme, object)
if err != nil {
r.Log.Error(err, "failed to get object reference")
}

// Convert the eventType to severity.
severity := eventTypeToSeverity(eventtype)

// Do not send trace events to notification controller,
// traces are persisted as Kubernetes events only.
if severity == EventSeverityTrace {
return nil
return
}

if r.Client == nil {
return fmt.Errorf("retryable HTTP client has not been initialized")
r.Log.Error(nil, "retryable HTTP client has not been initialized")
return
}

message := fmt.Sprintf(messageFmt, args...)

if object.Kind == "" {
return fmt.Errorf("failed to get object kind")
if ref.Kind == "" {
r.Log.Error(nil, "failed to get object kind")
return
}

if object.Name == "" {
return fmt.Errorf("failed to get object name")
if ref.Name == "" {
r.Log.Error(nil, "failed to get object name")
return
}

if object.Namespace == "" {
return fmt.Errorf("failed to get object namespace")
if ref.Namespace == "" {
r.Log.Error(nil, "failed to get object namespace")
return
}

hostname, err := os.Hostname()
if err != nil {
return err
r.Log.Error(err, "failed to get hostname")
return
}

event := Event{
InvolvedObject: object,
InvolvedObject: *ref,
Severity: severity,
Timestamp: metav1.Now(),
Message: message,
Reason: reason,
Metadata: metadata,
Metadata: annotations,
ReportingController: r.ReportingController,
ReportingInstance: hostname,
}

body, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal object into json, error: %w", err)
r.Log.Error(err, "failed to marshal object into json")
return
}

// avoid retrying rate limited requests
if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil &&
(res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) {
return nil
return
}

if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil {
return err
return
}
}

return nil
// eventTypeToSeverity maps the given eventType string to a gotk event severity
// type.
func eventTypeToSeverity(eventType string) string {
switch eventType {
case corev1.EventTypeWarning:
return EventSeverityError
case EventTypeTrace:
return EventSeverityTrace
default:
return EventSeverityInfo
}
}
70 changes: 42 additions & 28 deletions runtime/events/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ import (

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
)

func TestEventRecorder_Eventf(t *testing.T) {
func TestEventRecorder_AnnotatedEventf(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
b, err := io.ReadAll(r.Body)
require.NoError(t, err)

var payload Event
err = json.Unmarshal(b, &payload)
require.NoError(t, err)

require.Equal(t, "GitRepository", payload.InvolvedObject.Kind)
require.Equal(t, "ConfigMap", payload.InvolvedObject.Kind)
require.Equal(t, "webapp", payload.InvolvedObject.Name)
require.Equal(t, "gitops-system", payload.InvolvedObject.Namespace)
require.Equal(t, "true", payload.Metadata["test"])
Expand All @@ -45,24 +50,31 @@ func TestEventRecorder_Eventf(t *testing.T) {
}))
defer ts.Close()

eventRecorder, err := NewRecorder(ts.URL, "test-controller")
scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)

obj := corev1.ObjectReference{
Kind: "GitRepository",
Namespace: "gitops-system",
Name: "webapp",
}
obj := &corev1.ConfigMap{}
obj.Namespace = "gitops-system"
obj.Name = "webapp"

meta := map[string]string{
"test": "true",
}

err = eventRecorder.EventInfof(obj, meta, "sync", "sync %s", obj.Name)
require.NoError(t, err)
eventRecorder.AnnotatedEventf(obj, meta, corev1.EventTypeNormal, "sync", "sync %s", obj.Name)
require.Equal(t, 2, requestCount)

// When a trace event is sent, it's dropped, no new request.
eventRecorder.AnnotatedEventf(obj, meta, EventTypeTrace, "sync", "sync %s", obj.Name)
require.Equal(t, 2, requestCount)
}

func TestEventRecorder_Eventf_Retry(t *testing.T) {
func TestEventRecorder_AnnotatedEventf_Retry(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
b, err := io.ReadAll(r.Body)
require.NoError(t, err)

Expand All @@ -74,22 +86,24 @@ func TestEventRecorder_Eventf_Retry(t *testing.T) {
}))
defer ts.Close()

eventRecorder, err := NewRecorder(ts.URL, "test-controller")
scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)
eventRecorder.Client.RetryMax = 2

obj := corev1.ObjectReference{
Kind: "GitRepository",
Namespace: "gitops-system",
Name: "webapp",
}
obj := &corev1.ConfigMap{}
obj.Namespace = "gitops-system"
obj.Name = "webapp"

err = eventRecorder.EventErrorf(obj, nil, "sync", "sync %s", obj.Name)
require.Error(t, err)
eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name)
require.True(t, requestCount > 1)
}

func TestEventRecorder_Eventf_RateLimited(t *testing.T) {
func TestEventRecorder_AnnotatedEventf_RateLimited(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
b, err := io.ReadAll(r.Body)
require.NoError(t, err)

Expand All @@ -101,16 +115,16 @@ func TestEventRecorder_Eventf_RateLimited(t *testing.T) {
}))
defer ts.Close()

eventRecorder, err := NewRecorder(ts.URL, "test-controller")
scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
eventRecorder, err := NewRecorder(scheme, ctrl.Log, ts.URL, "test-controller")
require.NoError(t, err)
eventRecorder.Client.RetryMax = 2

obj := corev1.ObjectReference{
Kind: "GitRepository",
Namespace: "gitops-system",
Name: "webapp",
}
obj := &corev1.ConfigMap{}
obj.Namespace = "gitops-system"
obj.Name = "webapp"

err = eventRecorder.EventInfof(obj, nil, "sync", "sync %s", obj.Name)
require.NoError(t, err)
eventRecorder.AnnotatedEventf(obj, nil, "sync", "sync %s", obj.Name)
require.Equal(t, 1, requestCount)
}

0 comments on commit c300f37

Please sign in to comment.