Skip to content

Commit

Permalink
Add event framework
Browse files Browse the repository at this point in the history
Introduces an event framework for driving defined system events
through a sink interface. A logrus logging sink is implemented
initially for logging event locally but event could also be
dispatched to alternative metrics/logging mechanisms.

The initial event which is added for this is to log and
identify potential loss of data between polling intervals.
This lossy feed problem is detailed in issue #83 and whilst
this doesn't fix the problem, it can help to identify and monitor
occurances of this issue within a long running production environment.
  • Loading branch information
Qinusty committed Apr 16, 2021
1 parent a331ce2 commit 1c704e7
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 9 deletions.
8 changes: 7 additions & 1 deletion cmd/scheduled-feed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ossf/package-feeds/config"
"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds/scheduler"
"github.com/ossf/package-feeds/publisher"

Expand Down Expand Up @@ -85,7 +86,12 @@ func main() {
if err != nil {
log.Fatal(err)
}
sched := scheduler.New(feeds)
// TODO: use base logger throughout package-feeds
eventHandler := events.NewHandler(
events.NewLoggingEventSink(log.New()),
*events.NewFilter(nil,nil,[]string{events.FEEDS_COMPONENT}),
)
sched := scheduler.New(feeds, *eventHandler)

log.Printf("listening on port %v", appConfig.HttpPort)
delta, err := time.ParseDuration(appConfig.CutoffDelta)
Expand Down
6 changes: 5 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"testing"

"github.com/ossf/package-feeds/config"
"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds/scheduler"
"github.com/ossf/package-feeds/publisher/stdout"


)

const (
Expand Down Expand Up @@ -43,7 +46,8 @@ func TestDefault(t *testing.T) {
if err != nil {
t.Fatalf("failed to initialize feeds: %v", err)
}
_ = scheduler.New(feeds)
eventHandler := events.NewNullHandler()
_ = scheduler.New(feeds, *eventHandler)
}

func TestGetScheduledFeeds(t *testing.T) {
Expand Down
124 changes: 124 additions & 0 deletions events/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package events

import (
"fmt"
)

const (
// Event Types
LOSSY_FEED_EVENT = "LOSSY_FEED"

// Components
FEEDS_COMPONENT = "Feeds"
)

type Sink interface {
AddEvent(e Event) error
}

type Event interface {
GetComponent() string
GetType() string
GetMessage() string
}

type LossyFeedEvent struct {
Feed string

Event
}

type Filter struct {
enabledEventTypes []string
disabledEventTypes []string

enabledComponents []string
}

type Handler struct {
eventSink Sink
eventFilter Filter
}

func NewHandler(sink Sink, filter Filter) *Handler {
return &Handler{
eventSink: sink,
eventFilter: filter,
}
}

func NewNullHandler() *Handler {
return &Handler{}
}

// Creates a filter for use with an event handler, nil can be provided for non values.
func NewFilter(enabledEventTypes, disabledEventTypes, enabledComponents []string) *Filter {
if enabledEventTypes == nil {
enabledEventTypes = []string{}
}
if disabledEventTypes == nil {
disabledEventTypes = []string{}
}
if enabledComponents == nil {
enabledComponents = []string{}
}
return &Filter{
enabledEventTypes: enabledEventTypes,
disabledEventTypes: disabledEventTypes,
enabledComponents: enabledComponents,
}
}

// Dispatches an event to the configured sink if it passes the configured filter.
// Filters are applied as follows:
// - disabled event types are always disabled.
// - enabled event types are enabled
// - enabled components are enabled except for disabled event types
func (h Handler) DispatchEvent(e Event) error {
if h.eventSink == nil {
return nil
}
dispatch := false
filter := h.eventFilter

eComponent := e.GetComponent()
eType := e.GetType()

// Enable components
if stringExistsInSlice(eComponent, filter.enabledComponents) {
dispatch = true
}
// Handle specific event types
if stringExistsInSlice(eType, filter.enabledEventTypes) {
dispatch = true
} else if stringExistsInSlice(eType, filter.disabledEventTypes) {
dispatch = false
}

if dispatch {
return h.eventSink.AddEvent(e)
}
return nil
}

// Checks for existance of a string within a slice of strings
func stringExistsInSlice(s string, slice []string) bool {
for _, sliceStr := range slice {
if s == sliceStr {
return true
}
}
return false
}

func (e LossyFeedEvent) GetComponent() string {
return FEEDS_COMPONENT
}

func (e LossyFeedEvent) GetType() string {
return LOSSY_FEED_EVENT
}

func (e LossyFeedEvent) GetMessage() string {
return fmt.Sprintf("detected potential missing package data when polling %v feed", e.Feed)
}
85 changes: 85 additions & 0 deletions events/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package events

import (
"testing"
)

func TestHandlerDispatchEventNoFilterConfigured(t *testing.T) {
sink := &MockSink{}
filter := NewFilter(nil, nil, nil)

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 0 {
t.Error("LossyFeedEvent was dispatched despite not being enabled")
}
}

func TestHandlerDispatchEventFilterAllowLossyFeed(t *testing.T) {
sink := &MockSink{}
filter := NewFilter([]string{LOSSY_FEED_EVENT}, nil, nil)

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 1 {
t.Error("LossyFeedEvent was not dispatched despite being configured to allow dispatch")
}
}

func TestHandlerDispatchEventFilterAllowFeedComponent(t *testing.T) {
sink := &MockSink{}
filter := NewFilter(nil, nil, []string{FEEDS_COMPONENT})

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 1 {
t.Error("LossyFeedEvent was not dispatched despite feeds component being allowed")
}
}

func TestHandlerDispatchEventFilterDisableLossyFeed(t *testing.T) {
sink := &MockSink{}
filter := NewFilter(nil, []string{LOSSY_FEED_EVENT}, []string{FEEDS_COMPONENT})

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 0 {
t.Error("LossyFeedEvent was dispatched despite being configured to disable dispatch")
}
}
26 changes: 26 additions & 0 deletions events/logrus_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package events

import (
"github.com/sirupsen/logrus"
)

type LoggingEventSink struct {
logger *logrus.Logger
}

// Creates an event sink which logs events using a provided logrus logger,
// fields "component" and "event_type" are applied to the logger and
// warnings are logged for each event.
func NewLoggingEventSink(logger *logrus.Logger) *LoggingEventSink {
return &LoggingEventSink{
logger: logger,
}
}

func (sink LoggingEventSink) AddEvent(e Event) error {
sink.logger.WithFields(logrus.Fields{
"event_type": e.GetType(),
"component": e.GetComponent(),
}).Warn(e.GetMessage())
return nil
}
43 changes: 43 additions & 0 deletions events/logrus_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package events

import (
"testing"

"github.com/sirupsen/logrus/hooks/test"
)

func TestLogrusSink(t *testing.T) {
log, hook := test.NewNullLogger()

sink := NewLoggingEventSink(log)

event := LossyFeedEvent{
Feed: "Foo",
}

err := sink.AddEvent(event)
if err != nil {
t.Error(err)
}

logEntry := hook.LastEntry()
if logEntry == nil {
t.Error("Log entry was not added to the configured logger")
}

if logEntry.Data["event_type"] != event.GetType() {
t.Errorf(
"Log entry had incorrect event_type field '%v' when '%v' was expected",
logEntry.Data["event_type"],
event.GetType(),
)
}

if logEntry.Data["component"] != event.GetComponent() {
t.Errorf(
"Log entry had incorrect component field '%v' when '%v' was expected",
logEntry.Data["component"],
event.GetComponent(),
)
}
}
14 changes: 14 additions & 0 deletions events/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

type MockSink struct {
events []Event
}

func (s *MockSink) GetEvents() []Event {
return s.events
}

func (s *MockSink) AddEvent(e Event) error {
s.events = append(s.events, e)
return nil
}
Loading

0 comments on commit 1c704e7

Please sign in to comment.