Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External event recorder adhere to EventRecorder #161

Merged
merged 3 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 0 additions & 107 deletions runtime/controller/events.go

This file was deleted.

3 changes: 3 additions & 0 deletions runtime/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
EventSeverityError string = "error"
)

// EventTypeTrace represents a trace event.
const EventTypeTrace string = "Trace"

// Event is a report of an event issued by a controller.
// +kubebuilder:object:generate=true
type Event struct {
Expand Down
144 changes: 106 additions & 38 deletions runtime/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,34 @@ import (
"os"
"time"

"github.com/go-logr/logr"
"github.com/hashicorp/go-retryablehttp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
)

// Recorder posts events to the webhook address.
// Recorder posts events to the Kubernetes API and any other event recorder webhook address, like the GitOps Toolkit
// notification-controller.
//
// Use it by embedding EventRecorder in reconciler struct:
//
// import (
// ...
// kuberecorder "k8s.io/client-go/tools/record"
// ...
// )
//
// type MyTypeReconciler {
// client.Client
// // ... etc.
// kuberecorder.EventRecorder
// }
//
// Use NewRecorder to create a working Recorder.
type Recorder struct {
// URL address of the events endpoint.
Webhook string
Expand All @@ -40,11 +62,23 @@ type Recorder struct {

// Retryable HTTP client.
Client *retryablehttp.Client

// EventRecorder is the Kubernetes event recorder.
EventRecorder kuberecorder.EventRecorder

// Scheme to look up the recorded objects.
Scheme *runtime.Scheme

// Log is the recorder logger.
Log logr.Logger
}

// NewRecorder creates an event Recorder with default settings.
// The recorder performs automatic retries for connection errors and 500-range response codes.
func NewRecorder(webhook, reportingController string) (*Recorder, error) {
var _ kuberecorder.EventRecorder = &Recorder{}

// NewRecorder creates an event Recorder with a Kubernetes event recorder and an external event recorder based on the
// given webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the
// external recorder.
func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string) (*Recorder, error) {
if _, err := url.Parse(webhook); err != nil {
return nil, err
}
Expand All @@ -55,89 +89,123 @@ func NewRecorder(webhook, reportingController string) (*Recorder, error) {
httpClient.Logger = nil

return &Recorder{
Scheme: mgr.GetScheme(),
Webhook: webhook,
ReportingController: reportingController,
Client: httpClient,
EventRecorder: mgr.GetEventRecorderFor(reportingController),
Log: log,
}, nil
}

// EventInfof records an event with information severity.
func (r *Recorder) EventInfof(
object corev1.ObjectReference,
metadata map[string]string,
reason string, messageFmt string, args ...interface{}) error {
return r.Eventf(object, metadata, EventSeverityInfo, reason, messageFmt, args...)
// Event records an event in the webhook address.
func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) {
r.AnnotatedEventf(object, nil, eventtype, reason, message)
}

// EventErrorf records an event with error severity.
func (r *Recorder) EventErrorf(
object corev1.ObjectReference,
metadata map[string]string,
reason string, messageFmt string, args ...interface{}) error {
return r.Eventf(object, metadata, EventSeverityError, reason, messageFmt, args...)
// Event records an event in the webhook address.
func (r *Recorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
r.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...)
}

// Eventf constructs an event from the given information and performs a HTTP POST to the webhook address.
func (r *Recorder) Eventf(
object corev1.ObjectReference,
metadata map[string]string,
severity, reason string,
messageFmt string, args ...interface{}) error {
// AnnotatedEventf constructs an event from the given information and performs a HTTP POST to the webhook address.
func (r *Recorder) AnnotatedEventf(
object runtime.Object,
annotations map[string]string,
eventtype, reason string,
messageFmt string, args ...interface{}) {

ref, err := reference.GetReference(r.Scheme, object)
if err != nil {
r.Log.Error(err, "failed to get object reference")
}

// Add object info in the logger.
log := r.Log.WithValues("name", ref.Name, "namespace", ref.Namespace, "reconciler kind", ref.Kind)

// Convert the eventType to severity.
severity := eventTypeToSeverity(eventtype)

// Do not send trace events to notification controller,
// traces are persisted as Kubernetes events only.
// traces are persisted as Kubernetes events only as normal events.
if severity == EventSeverityTrace {
return nil
r.EventRecorder.AnnotatedEventf(object, annotations, corev1.EventTypeNormal, reason, messageFmt, args...)
return
}

// Forward the event to the Kubernetes recorder.
r.EventRecorder.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)

if r.Client == nil {
return fmt.Errorf("retryable HTTP client has not been initialized")
err := fmt.Errorf("retryable HTTP client has not been initialized")
log.Error(err, "unable to record event")
return
}

message := fmt.Sprintf(messageFmt, args...)

if object.Kind == "" {
return fmt.Errorf("failed to get object kind")
if ref.Kind == "" {
err := fmt.Errorf("failed to get object kind")
log.Error(err, "unable to record event")
return
}

if object.Name == "" {
return fmt.Errorf("failed to get object name")
if ref.Name == "" {
err := fmt.Errorf("failed to get object name")
log.Error(err, "unable to record event")
return
}

if object.Namespace == "" {
return fmt.Errorf("failed to get object namespace")
if ref.Namespace == "" {
err := fmt.Errorf("failed to get object namespace")
log.Error(err, "unable to record event")
return
}

hostname, err := os.Hostname()
if err != nil {
return err
log.Error(err, "failed to get hostname")
return
}

event := Event{
InvolvedObject: object,
InvolvedObject: *ref,
Severity: severity,
Timestamp: metav1.Now(),
Message: message,
Reason: reason,
Metadata: metadata,
Metadata: annotations,
ReportingController: r.ReportingController,
ReportingInstance: hostname,
}

body, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal object into json, error: %w", err)
log.Error(err, "failed to marshal object into json")
return
}

// avoid retrying rate limited requests
if res, _ := r.Client.HTTPClient.Post(r.Webhook, "application/json", bytes.NewReader(body)); res != nil &&
(res.StatusCode == http.StatusTooManyRequests || res.StatusCode == http.StatusAccepted) {
return nil
return
}

if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil {
return err
log.Error(err, "unable to record event")
return
darkowlzz marked this conversation as resolved.
Show resolved Hide resolved
}
}

return nil
// eventTypeToSeverity maps the given eventType string to a GOTK event severity
// type.
func eventTypeToSeverity(eventType string) string {
switch eventType {
case corev1.EventTypeWarning:
return EventSeverityError
case EventTypeTrace:
return EventSeverityTrace
default:
return EventSeverityInfo
}
}
Loading