Skip to content

Commit

Permalink
Merge pull request #161 from darkowlzz/event-rec-to-k8s-upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco authored Oct 19, 2021
2 parents ecd2e49 + e418a53 commit 5e99e19
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 173 deletions.
107 changes: 0 additions & 107 deletions runtime/controller/events.go

This file was deleted.

3 changes: 3 additions & 0 deletions runtime/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
EventSeverityError string = "error"
)

// EventTypeTrace represents a trace event.
const EventTypeTrace string = "Trace"

// Event is a report of an event issued by a controller.
// +kubebuilder:object:generate=true
type Event struct {
Expand Down
144 changes: 106 additions & 38 deletions runtime/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,34 @@ import (
"os"
"time"

"github.com/go-logr/logr"
"github.com/hashicorp/go-retryablehttp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
)

// Recorder posts events to the webhook address.
// Recorder posts events to the Kubernetes API and any other event recorder webhook address, like the GitOps Toolkit
// notification-controller.
//
// Use it by embedding EventRecorder in reconciler struct:
//
// import (
// ...
// kuberecorder "k8s.io/client-go/tools/record"
// ...
// )
//
// type MyTypeReconciler {
// client.Client
// // ... etc.
// kuberecorder.EventRecorder
// }
//
// Use NewRecorder to create a working Recorder.
type Recorder struct {
// URL address of the events endpoint.
Webhook string
Expand All @@ -40,11 +62,23 @@ type Recorder struct {

// Retryable HTTP client.
Client *retryablehttp.Client

// EventRecorder is the Kubernetes event recorder.
EventRecorder kuberecorder.EventRecorder

// Scheme to look up the recorded objects.
Scheme *runtime.Scheme

// Log is the recorder logger.
Log logr.Logger
}

// NewRecorder creates an event Recorder with default settings.
// The recorder performs automatic retries for connection errors and 500-range response codes.
func NewRecorder(webhook, reportingController string) (*Recorder, error) {
var _ kuberecorder.EventRecorder = &Recorder{}

// NewRecorder creates an event Recorder with a Kubernetes event recorder and an external event recorder based on the
// given webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the
// external recorder.
func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
if _, err := url.Parse(webhook); err != nil {
return nil, err
}
Expand All @@ -55,89 +89,123 @@ func NewRecorder(webhook, reportingController string) (*Recorder, error) {
httpClient.Logger = nil

return &Recorder{
Scheme: mgr.GetScheme(),
Webhook: webhook,
ReportingController: reportingController,
Client: httpClient,
EventRecorder: mgr.GetEventRecorderFor(reportingController),
Log: log,
}, nil
}

// EventInfof records an event with information severity.
func (r *Recorder) EventInfof(
object corev1.ObjectReference,
metadata map[string]string,
reason string, messageFmt string, args ...interface{}) error {
return r.Eventf(object, metadata, EventSeverityInfo, reason, messageFmt, args...)
// Event records an event in the webhook address.
func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) {
r.AnnotatedEventf(object, nil, eventtype, reason, message)
}

// EventErrorf records an event with error severity.
func (r *Recorder) EventErrorf(
object corev1.ObjectReference,
metadata map[string]string,
reason string, messageFmt string, args ...interface{}) error {
return r.Eventf(object, metadata, EventSeverityError, reason, messageFmt, args...)
// Event records an event in the webhook address.
func (r *Recorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
r.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...)
}

// Eventf constructs an event from the given information and performs a HTTP POST to the webhook address.
func (r *Recorder) Eventf(
object corev1.ObjectReference,
metadata map[string]string,
severity, reason string,
messageFmt string, args ...interface{}) error {
// AnnotatedEventf constructs an event from the given information and performs a HTTP POST to the webhook address.
func (r *Recorder) AnnotatedEventf(
object runtime.Object,
annotations map[string]string,
eventtype, reason string,
messageFmt string, args ...interface{}) {

ref, err := reference.GetReference(r.Scheme, object)
if err != nil {
r.Log.Error(err, "failed to get object reference")
}

// Add object info in the logger.
log := r.Log.WithValues("name", ref.Name, "namespace", ref.Namespace, "reconciler kind", ref.Kind)

// Convert the eventType to severity.
severity := eventTypeToSeverity(eventtype)

// Do not send trace events to notification controller,
// traces are persisted as Kubernetes events only.
// traces are persisted as Kubernetes events only as normal events.
if severity == EventSeverityTrace {
return nil
r.EventRecorder.AnnotatedEventf(object, annotations, corev1.EventTypeNormal, reason, messageFmt, args...)
return
}

// Forward the event to the Kubernetes recorder.
r.EventRecorder.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)

if r.Client == nil {
return fmt.Errorf("retryable HTTP client has not been initialized")
err := fmt.Errorf("retryable HTTP client has not been initialized")
log.Error(err, "unable to record event")
return
}

message := fmt.Sprintf(messageFmt, args...)

if object.Kind == "" {
return fmt.Errorf("failed to get object kind")
if ref.Kind == "" {
err := fmt.Errorf("failed to get object kind")
log.Error(err, "unable to record event")
return
}

if object.Name == "" {
return fmt.Errorf("failed to get object name")
if ref.Name == "" {
err := fmt.Errorf("failed to get object name")
log.Error(err, "unable to record event")
return
}

if object.Namespace == "" {
return fmt.Errorf("failed to get object namespace")
if ref.Namespace == "" {
err := fmt.Errorf("failed to get object namespace")
log.Error(err, "unable to record event")
return
}

hostname, err := os.Hostname()
if err != nil {
return err
log.Error(err, "failed to get hostname")
return
}

event := Event{
InvolvedObject: object,
InvolvedObject: *ref,
Severity: severity,
Timestamp: metav1.Now(),
Message: message,
Reason: reason,
Metadata: metadata,
Metadata: annotations,
ReportingController: r.ReportingController,
ReportingInstance: hostname,
}

body, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal object into json, error: %w", err)
log.Error(err, "failed to marshal object into json")
return
}

// avoid retrying rate limited requests
if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil &&
(res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) {
return nil
return
}

if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil {
return err
log.Error(err, "unable to record event")
return
}
}

return nil
// eventTypeToSeverity maps the given eventType string to a GOTK event severity
// type.
func eventTypeToSeverity(eventType string) string {
switch eventType {
case corev1.EventTypeWarning:
return EventSeverityError
case EventTypeTrace:
return EventSeverityTrace
default:
return EventSeverityInfo
}
}
Loading

0 comments on commit 5e99e19

Please sign in to comment.