Skip to content

Commit

Permalink
Add event framework
Browse files Browse the repository at this point in the history
Introduces an event framework for driving defined system events
through a sink interface. A logrus logging sink is implemented
initially for logging event locally but event could also be
dispatched to alternative metrics/logging mechanisms.

The initial event which is added for this is to log and
identify potential loss of data between polling intervals.
This lossy feed problem is detailed in issue #83 and whilst
this doesn't fix the problem, it can help to identify and monitor
occurances of this issue within a long running production environment.
  • Loading branch information
Qinusty committed Apr 16, 2021
1 parent a331ce2 commit 78af451
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 7 deletions.
117 changes: 117 additions & 0 deletions events/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package events

import (
"fmt"
)

const (
// Event Types
LOSSY_FEED_EVENT = "LOSSY_FEED"

// Components
FEEDS_COMPONENT = "Feeds"
)

type Sink interface {
AddEvent(e Event) error
}

type Event interface {
GetComponent() string
GetType() string
GetMessage() string
}

type LossyFeedEvent struct {
Feed string

Event
}

type Filter struct {
enabledEventTypes []string
disabledEventTypes []string

enabledComponents []string
}

type Handler struct {
eventSink Sink
eventFilter Filter
}

func NewHandler(sink Sink, filter Filter) *Handler {
return &Handler{
eventSink: sink,
eventFilter: filter,
}
}

// Creates a filter for use with an event handler, nil can be provided for non values.
func NewFilter(enabledEventTypes, disabledEventTypes, enabledComponents []string) *Filter {
if enabledEventTypes == nil {
enabledEventTypes = []string{}
}
if disabledEventTypes == nil {
disabledEventTypes = []string{}
}
if enabledComponents == nil {
enabledComponents = []string{}
}
return &Filter{
enabledEventTypes: enabledEventTypes,
disabledEventTypes: disabledEventTypes,
enabledComponents: enabledComponents,
}
}

// Dispatches an event to the configured sink if it passes the configured filter.
// Filters are applied as follows:
// - disabled event types are always disabled.
// - enabled event types are enabled
// - enabled components are enabled except for disabled event types
func (h Handler) DispatchEvent(e Event) error {
dispatch := false
filter := h.eventFilter

eComponent := e.GetComponent()
eType := e.GetType()

// Enable components
if stringExistsInSlice(eComponent, filter.enabledComponents) {
dispatch = true
}
// Handle specific event types
if stringExistsInSlice(eType, filter.enabledEventTypes) {
dispatch = true
} else if stringExistsInSlice(eType, filter.disabledEventTypes) {
dispatch = false
}

if dispatch {
return h.eventSink.AddEvent(e)
}
return nil
}

// Checks for existance of a string within a slice of strings
func stringExistsInSlice(s string, slice []string) bool {
for _, sliceStr := range slice {
if s == sliceStr {
return true
}
}
return false
}

func (e LossyFeedEvent) GetComponent() string {
return FEEDS_COMPONENT
}

func (e LossyFeedEvent) GetType() string {
return LOSSY_FEED_EVENT
}

func (e LossyFeedEvent) GetMessage() string {
return fmt.Sprintf("detected potential missing package data when polling %v feed", e.Feed)
}
85 changes: 85 additions & 0 deletions events/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package events

import (
"testing"
)

func TestHandlerDispatchEventNoFilterConfigured(t *testing.T) {
sink := &MockSink{}
filter := NewFilter(nil, nil, nil)

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 0 {
t.Error("LossyFeedEvent was dispatched despite not being enabled")
}
}

func TestHandlerDispatchEventFilterAllowLossyFeed(t *testing.T) {
sink := &MockSink{}
filter := NewFilter([]string{LOSSY_FEED_EVENT}, nil, nil)

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 1 {
t.Error("LossyFeedEvent was not dispatched despite being configured to allow dispatch")
}
}

func TestHandlerDispatchEventFilterAllowFeedComponent(t *testing.T) {
sink := &MockSink{}
filter := NewFilter(nil, nil, []string{FEEDS_COMPONENT})

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 1 {
t.Error("LossyFeedEvent was not dispatched despite feeds component being allowed")
}
}

func TestHandlerDispatchEventFilterDisableLossyFeed(t *testing.T) {
sink := &MockSink{}
filter := NewFilter(nil, []string{LOSSY_FEED_EVENT}, []string{FEEDS_COMPONENT})

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 0 {
t.Error("LossyFeedEvent was dispatched despite being configured to disable dispatch")
}
}
26 changes: 26 additions & 0 deletions events/logrus_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package events

import (
"github.com/sirupsen/logrus"
)

type LoggingEventSink struct {
logger *logrus.Logger
}

// Creates an event sink which logs events using a provided logrus logger,
// fields "component" and "event_type" are applied to the logger and
// warnings are logged for each event.
func NewLoggingEventSink(logger *logrus.Logger) *LoggingEventSink {
return &LoggingEventSink{
logger: logger,
}
}

func (sink LoggingEventSink) AddEvent(e Event) error {
sink.logger.WithFields(logrus.Fields{
"event_type": e.GetType(),
"component": e.GetComponent(),
}).Warn(e.GetMessage())
return nil
}
43 changes: 43 additions & 0 deletions events/logrus_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package events

import (
"testing"

"github.com/sirupsen/logrus/hooks/test"
)

func TestLogrusSink(t *testing.T) {
log, hook := test.NewNullLogger()

sink := NewLoggingEventSink(log)

event := LossyFeedEvent{
Feed: "Foo",
}

err := sink.AddEvent(event)
if err != nil {
t.Error(err)
}

logEntry := hook.LastEntry()
if logEntry == nil {
t.Error("Log entry was not added to the configured logger")
}

if logEntry.Data["event_type"] != event.GetType() {
t.Errorf(
"Log entry had incorrect event_type field '%v' when '%v' was expected",
logEntry.Data["event_type"],
event.GetType(),
)
}

if logEntry.Data["component"] != event.GetComponent() {
t.Errorf(
"Log entry had incorrect component field '%v' when '%v' was expected",
logEntry.Data["component"],
event.GetComponent(),
)
}
}
14 changes: 14 additions & 0 deletions events/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

type MockSink struct {
events []Event
}

func (s *MockSink) GetEvents() []Event {
return s.events
}

func (s *MockSink) AddEvent(e Event) error {
s.events = append(s.events, e)
return nil
}
40 changes: 36 additions & 4 deletions feeds/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package scheduler

import (
"github.com/ossf/package-feeds/feeds"
"sort"
"time"

"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds"

log "github.com/sirupsen/logrus"
)

// Scheduler is a registry of feeds that should be run on a schedule
type Scheduler struct {
registry map[string]feeds.ScheduledFeed
registry map[string]feeds.ScheduledFeed
previousPollResults map[string]pollResult
eventHandler events.Handler
}

// New returns a new Scheduler with configured feeds registered
func New(feeds map[string]feeds.ScheduledFeed) *Scheduler {
func New(feeds map[string]feeds.ScheduledFeed, handler events.Handler) *Scheduler {
return &Scheduler{
registry: feeds,
registry: feeds,
previousPollResults: map[string]pollResult{},
eventHandler: handler,
}
}

Expand Down Expand Up @@ -43,6 +50,11 @@ func (s *Scheduler) Poll(cutoff time.Time) ([]*feeds.Package, []error) {
packages := []*feeds.Package{}
for i := 0; i < len(s.registry); i++ {
result := <-results
// Ensure packages are sorted by CreatedDate in order of most recent
sort.SliceStable(result.packages, func(i, j int) bool {
return result.packages[j].CreatedDate.Before(result.packages[i].CreatedDate)
})

logger := log.WithField("feed", result.name)
if result.err != nil {
logger.WithError(result.err).Error("error fetching packages")
Expand All @@ -51,6 +63,26 @@ func (s *Scheduler) Poll(cutoff time.Time) ([]*feeds.Package, []error) {
}
packages = append(packages, result.packages...)
logger.WithField("num_processed", len(result.packages)).Print("processed packages")

previousResult, ok := s.previousPollResults[result.name]
nonZeroResults := len(previousResult.packages) > 0 && len(result.packages) > 0
if ok && nonZeroResults {
if !result.isOverlappingWith(s.previousPollResults[result.name]) {
s.eventHandler.DispatchEvent(events.LossyFeedEvent{
Feed: result.name,
})
}
}
s.previousPollResults[result.name] = result
}
return packages, errs
}

// Checks whether there is an overlap in package creation date between a result
// and a previous result. This assumes that pollResult.packages is sorted by
// CreatedDate in order of most recent first.
func (r pollResult) isOverlappingWith(previousResult pollResult) bool {
rOldestResult := r.packages[len(r.packages)-1]
previousResultMostRecent := previousResult.packages[0]
return previousResultMostRecent.CreatedDate.After(rOldestResult.CreatedDate)
}
Loading

0 comments on commit 78af451

Please sign in to comment.