From 78af451efaf6876c00f4320955672c058ecbc205 Mon Sep 17 00:00:00 2001 From: Josh Smith Date: Thu, 15 Apr 2021 16:09:52 +0100 Subject: [PATCH] Add event framework Introduces an event framework for driving defined system events through a sink interface. A logrus logging sink is implemented initially for logging event locally but event could also be dispatched to alternative metrics/logging mechanisms. The initial event which is added for this is to log and identify potential loss of data between polling intervals. This lossy feed problem is detailed in issue #83 and whilst this doesn't fix the problem, it can help to identify and monitor occurances of this issue within a long running production environment. --- events/handler.go | 117 ++++++++++++++++++++++++++++++ events/handler_test.go | 85 ++++++++++++++++++++++ events/logrus_sink.go | 26 +++++++ events/logrus_sink_test.go | 43 +++++++++++ events/mocks.go | 14 ++++ feeds/scheduler/scheduler.go | 40 +++++++++- feeds/scheduler/scheduler_test.go | 65 ++++++++++++++++- 7 files changed, 383 insertions(+), 7 deletions(-) create mode 100644 events/handler.go create mode 100644 events/handler_test.go create mode 100644 events/logrus_sink.go create mode 100644 events/logrus_sink_test.go create mode 100644 events/mocks.go diff --git a/events/handler.go b/events/handler.go new file mode 100644 index 00000000..430fe6f7 --- /dev/null +++ b/events/handler.go @@ -0,0 +1,117 @@ +package events + +import ( + "fmt" +) + +const ( + // Event Types + LOSSY_FEED_EVENT = "LOSSY_FEED" + + // Components + FEEDS_COMPONENT = "Feeds" +) + +type Sink interface { + AddEvent(e Event) error +} + +type Event interface { + GetComponent() string + GetType() string + GetMessage() string +} + +type LossyFeedEvent struct { + Feed string + + Event +} + +type Filter struct { + enabledEventTypes []string + disabledEventTypes []string + + enabledComponents []string +} + +type Handler struct { + eventSink Sink + eventFilter Filter +} + +func NewHandler(sink Sink, filter Filter) *Handler { + return &Handler{ + eventSink: sink, + eventFilter: filter, + } +} + +// Creates a filter for use with an event handler, nil can be provided for non values. +func NewFilter(enabledEventTypes, disabledEventTypes, enabledComponents []string) *Filter { + if enabledEventTypes == nil { + enabledEventTypes = []string{} + } + if disabledEventTypes == nil { + disabledEventTypes = []string{} + } + if enabledComponents == nil { + enabledComponents = []string{} + } + return &Filter{ + enabledEventTypes: enabledEventTypes, + disabledEventTypes: disabledEventTypes, + enabledComponents: enabledComponents, + } +} + +// Dispatches an event to the configured sink if it passes the configured filter. +// Filters are applied as follows: +// - disabled event types are always disabled. +// - enabled event types are enabled +// - enabled components are enabled except for disabled event types +func (h Handler) DispatchEvent(e Event) error { + dispatch := false + filter := h.eventFilter + + eComponent := e.GetComponent() + eType := e.GetType() + + // Enable components + if stringExistsInSlice(eComponent, filter.enabledComponents) { + dispatch = true + } + // Handle specific event types + if stringExistsInSlice(eType, filter.enabledEventTypes) { + dispatch = true + } else if stringExistsInSlice(eType, filter.disabledEventTypes) { + dispatch = false + } + + if dispatch { + return h.eventSink.AddEvent(e) + } + return nil +} + +// Checks for existance of a string within a slice of strings +func stringExistsInSlice(s string, slice []string) bool { + for _, sliceStr := range slice { + if s == sliceStr { + return true + } + } + return false +} + +func (e LossyFeedEvent) GetComponent() string { + return FEEDS_COMPONENT +} + +func (e LossyFeedEvent) GetType() string { + return LOSSY_FEED_EVENT +} + +func (e LossyFeedEvent) GetMessage() string { + return fmt.Sprintf("detected potential missing package data when polling %v feed", e.Feed) +} diff --git a/events/handler_test.go b/events/handler_test.go new file mode 100644 index 00000000..638bbc0a --- /dev/null +++ b/events/handler_test.go @@ -0,0 +1,85 @@ +package events + +import ( + "testing" +) + +func TestHandlerDispatchEventNoFilterConfigured(t *testing.T) { + sink := &MockSink{} + filter := NewFilter(nil, nil, nil) + + handler := NewHandler(sink, *filter) + + event := &LossyFeedEvent{ + Feed: "Foo", + } + + err := handler.DispatchEvent(event) + if err != nil { + t.Fatal(err) + } + + if len(sink.events) != 0 { + t.Error("LossyFeedEvent was dispatched despite not being enabled") + } +} + +func TestHandlerDispatchEventFilterAllowLossyFeed(t *testing.T) { + sink := &MockSink{} + filter := NewFilter([]string{LOSSY_FEED_EVENT}, nil, nil) + + handler := NewHandler(sink, *filter) + + event := &LossyFeedEvent{ + Feed: "Foo", + } + + err := handler.DispatchEvent(event) + if err != nil { + t.Fatal(err) + } + + if len(sink.events) != 1 { + t.Error("LossyFeedEvent was not dispatched despite being configured to allow dispatch") + } +} + +func TestHandlerDispatchEventFilterAllowFeedComponent(t *testing.T) { + sink := &MockSink{} + filter := NewFilter(nil, nil, []string{FEEDS_COMPONENT}) + + handler := NewHandler(sink, *filter) + + event := &LossyFeedEvent{ + Feed: "Foo", + } + + err := handler.DispatchEvent(event) + if err != nil { + t.Fatal(err) + } + + if len(sink.events) != 1 { + t.Error("LossyFeedEvent was not dispatched despite feeds component being allowed") + } +} + +func TestHandlerDispatchEventFilterDisableLossyFeed(t *testing.T) { + sink := &MockSink{} + filter := NewFilter(nil, []string{LOSSY_FEED_EVENT}, []string{FEEDS_COMPONENT}) + + handler := NewHandler(sink, *filter) + + event := &LossyFeedEvent{ + Feed: "Foo", + } + + err := handler.DispatchEvent(event) + if err != nil { + t.Fatal(err) + } + + if len(sink.events) != 0 { + t.Error("LossyFeedEvent was dispatched despite being configured to disable dispatch") + } +} diff --git a/events/logrus_sink.go b/events/logrus_sink.go new file mode 100644 index 00000000..2d42f371 --- /dev/null +++ b/events/logrus_sink.go @@ -0,0 +1,26 @@ +package events + +import ( + "github.com/sirupsen/logrus" +) + +type LoggingEventSink struct { + logger *logrus.Logger +} + +// Creates an event sink which logs events using a provided logrus logger, +// fields "component" and "event_type" are applied to the logger and +// warnings are logged for each event. +func NewLoggingEventSink(logger *logrus.Logger) *LoggingEventSink { + return &LoggingEventSink{ + logger: logger, + } +} + +func (sink LoggingEventSink) AddEvent(e Event) error { + sink.logger.WithFields(logrus.Fields{ + "event_type": e.GetType(), + "component": e.GetComponent(), + }).Warn(e.GetMessage()) + return nil +} diff --git a/events/logrus_sink_test.go b/events/logrus_sink_test.go new file mode 100644 index 00000000..b0eeb957 --- /dev/null +++ b/events/logrus_sink_test.go @@ -0,0 +1,43 @@ +package events + +import ( + "testing" + + "github.com/sirupsen/logrus/hooks/test" +) + +func TestLogrusSink(t *testing.T) { + log, hook := test.NewNullLogger() + + sink := NewLoggingEventSink(log) + + event := LossyFeedEvent{ + Feed: "Foo", + } + + err := sink.AddEvent(event) + if err != nil { + t.Error(err) + } + + logEntry := hook.LastEntry() + if logEntry == nil { + t.Error("Log entry was not added to the configured logger") + } + + if logEntry.Data["event_type"] != event.GetType() { + t.Errorf( + "Log entry had incorrect event_type field '%v' when '%v' was expected", + logEntry.Data["event_type"], + event.GetType(), + ) + } + + if logEntry.Data["component"] != event.GetComponent() { + t.Errorf( + "Log entry had incorrect component field '%v' when '%v' was expected", + logEntry.Data["component"], + event.GetComponent(), + ) + } +} diff --git a/events/mocks.go b/events/mocks.go new file mode 100644 index 00000000..facbe856 --- /dev/null +++ b/events/mocks.go @@ -0,0 +1,14 @@ +package events + +type MockSink struct { + events []Event +} + +func (s *MockSink) GetEvents() []Event { + return s.events +} + +func (s *MockSink) AddEvent(e Event) error { + s.events = append(s.events, e) + return nil +} diff --git a/feeds/scheduler/scheduler.go b/feeds/scheduler/scheduler.go index dd2262e2..dcea37a9 100644 --- a/feeds/scheduler/scheduler.go +++ b/feeds/scheduler/scheduler.go @@ -1,21 +1,28 @@ package scheduler import ( - "github.com/ossf/package-feeds/feeds" + "sort" "time" + "github.com/ossf/package-feeds/events" + "github.com/ossf/package-feeds/feeds" + log "github.com/sirupsen/logrus" ) // Scheduler is a registry of feeds that should be run on a schedule type Scheduler struct { - registry map[string]feeds.ScheduledFeed + registry map[string]feeds.ScheduledFeed + previousPollResults map[string]pollResult + eventHandler events.Handler } // New returns a new Scheduler with configured feeds registered -func New(feeds map[string]feeds.ScheduledFeed) *Scheduler { +func New(feeds map[string]feeds.ScheduledFeed, handler events.Handler) *Scheduler { return &Scheduler{ - registry: feeds, + registry: feeds, + previousPollResults: map[string]pollResult{}, + eventHandler: handler, } } @@ -43,6 +50,11 @@ func (s *Scheduler) Poll(cutoff time.Time) ([]*feeds.Package, []error) { packages := []*feeds.Package{} for i := 0; i < len(s.registry); i++ { result := <-results + // Ensure packages are sorted by CreatedDate in order of most recent + sort.SliceStable(result.packages, func(i, j int) bool { + return result.packages[j].CreatedDate.Before(result.packages[i].CreatedDate) + }) + logger := log.WithField("feed", result.name) if result.err != nil { logger.WithError(result.err).Error("error fetching packages") @@ -51,6 +63,26 @@ func (s *Scheduler) Poll(cutoff time.Time) ([]*feeds.Package, []error) { } packages = append(packages, result.packages...) logger.WithField("num_processed", len(result.packages)).Print("processed packages") + + previousResult, ok := s.previousPollResults[result.name] + nonZeroResults := len(previousResult.packages) > 0 && len(result.packages) > 0 + if ok && nonZeroResults { + if !result.isOverlappingWith(s.previousPollResults[result.name]) { + s.eventHandler.DispatchEvent(events.LossyFeedEvent{ + Feed: result.name, + }) + } + } + s.previousPollResults[result.name] = result } return packages, errs } + +// Checks whether there is an overlap in package creation date between a result +// and a previous result. This assumes that pollResult.packages is sorted by +// CreatedDate in order of most recent first. +func (r pollResult) isOverlappingWith(previousResult pollResult) bool { + rOldestResult := r.packages[len(r.packages)-1] + previousResultMostRecent := previousResult.packages[0] + return previousResultMostRecent.CreatedDate.After(rOldestResult.CreatedDate) +} diff --git a/feeds/scheduler/scheduler_test.go b/feeds/scheduler/scheduler_test.go index 47dade52..7e07af2a 100644 --- a/feeds/scheduler/scheduler_test.go +++ b/feeds/scheduler/scheduler_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" ) @@ -30,10 +31,10 @@ func TestPoll(t *testing.T) { err: packageErr, }, } + mockSink := &events.MockSink{} + eventHandler := events.NewHandler(mockSink, *events.NewFilter(nil, nil, nil)) - sched := &Scheduler{ - registry: registry, - } + sched := New(registry, *eventHandler) gotPackages, gotErrs := sched.Poll(time.Now()) if len(gotErrs) != 1 { t.Fatalf("incorrect number of errors received. expected %d got %d", 1, len(gotErrs)) @@ -52,3 +53,61 @@ func TestPoll(t *testing.T) { } } } + +func TestPollLossyFeedWarnings(t *testing.T) { + mockSink := &events.MockSink{} + eventHandler := events.NewHandler( + mockSink, + *events.NewFilter([]string{events.LOSSY_FEED_EVENT}, nil, nil), + ) + + baseTime := time.Date(2021, time.April, 15, 12, 0, 0, 0, time.UTC) + firstPackageSet := []*feeds.Package{ + {Name: "fooPkg1", CreatedDate: baseTime.Add(time.Minute * 1)}, + {Name: "fooPkg2", CreatedDate: baseTime.Add(time.Minute * 2)}, + } + secondPackageSet := []*feeds.Package{ + {Name: "fooPkg3", CreatedDate: baseTime.Add(time.Minute * 10)}, + {Name: "fooPkg4", CreatedDate: baseTime.Add(time.Minute * 20)}, + } + registry1 := map[string]feeds.ScheduledFeed{ + "foo-feed": mockFeed{ + packages: firstPackageSet, + }, + } + registry2 := map[string]feeds.ScheduledFeed{ + "foo-feed": mockFeed{ + packages: secondPackageSet, + }, + } + sched := New(registry1, *eventHandler) + + _, errs := sched.Poll(time.Now()) + if len(errs) > 0 { + t.Fatalf("failed to poll scheduler: %v", errs) + } + sched.registry = registry2 + _, errs = sched.Poll(time.Now()) + if len(errs) > 0 { + t.Fatalf("failed to poll scheduler: %v", errs) + } + + sinkEvents := mockSink.GetEvents() + if len(sinkEvents) == 0 { + t.Fatal("lossy feed warnings not dispatched to the sink") + } + if sinkEvents[0].GetType() != events.LOSSY_FEED_EVENT { + t.Fatalf( + "sink event is not the correct type, expected '%v' but found '%v'", + events.LOSSY_FEED_EVENT, + sinkEvents[0].GetType(), + ) + } + if sinkEvents[0].GetComponent() != events.FEEDS_COMPONENT { + t.Fatalf( + "sink event is not the correct component, expected '%v' but found '%v'", + events.FEEDS_COMPONENT, + sinkEvents[0].GetComponent(), + ) + } +}