Skip to content

Commit

Permalink
Add events framework
Browse files Browse the repository at this point in the history
This PR introduces an event framework for driving defined system events
through a sink interface. A logrus logging sink is implemented
initially for logging event locally but event could also be
dispatched to alternative metrics/logging mechanisms.

The initial event which is added for this is to log and
identify potential loss of data between polling intervals.
This lossy feed problem is detailed in issue #83 and whilst
this doesn't fix the problem, it can help to identify and monitor
occurances of this issue within a long running production environment.
Lossy feed detection occurs on a feed by feed basis and is up to the
implementor to enable through the use of a utility struct `LossyFeedAlerter`.
  • Loading branch information
Qinusty committed Apr 20, 2021
1 parent 2a58fc5 commit c66676b
Show file tree
Hide file tree
Showing 10 changed files with 443 additions and 4 deletions.
21 changes: 21 additions & 0 deletions config/scheduledfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strconv"

"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds"
"github.com/ossf/package-feeds/feeds/crates"
"github.com/ossf/package-feeds/feeds/goproxy"
Expand All @@ -24,6 +25,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus"
)

// Loads a ScheduledFeedConfig struct from a yaml config file
Expand Down Expand Up @@ -105,6 +107,25 @@ func (sConfig *ScheduledFeedConfig) GetScheduledFeeds() (map[string]feeds.Schedu
return scheduledFeeds, nil
}

func (sConfig *ScheduledFeedConfig) GetEventHandler() (*events.Handler, error) {
if sConfig.EventsConfig == nil {
sConfig.eventHandler = events.NewNullHandler()
} else if sConfig.eventHandler == nil {
var sink events.Sink
switch sConfig.EventsConfig.Sink {
case events.LOGGING_SINK_TYPE:
sink = events.NewLoggingEventSink(log.New())
default:
return nil, fmt.Errorf(
"unknown sink type '%v' provided to events configuration",
sConfig.EventsConfig.Sink,
)
}
sConfig.eventHandler = events.NewHandler(sink, sConfig.EventsConfig.EventFilter)
}
return sConfig.eventHandler, nil
}

// Produces a Publisher object from the provided PublisherConfig
// The PublisherConfig.Type value is evaluated and the appropriate Publisher is
// constructed from the Config field.
Expand Down
24 changes: 20 additions & 4 deletions config/structs.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package config

import "github.com/ossf/package-feeds/events"

type ScheduledFeedConfig struct {
PubConfig PublisherConfig `yaml:"publisher"`
EnabledFeeds []string `yaml:"enabled_feeds"`
HttpPort int `yaml:"http_port,omitempty"`
CutoffDelta string `yaml:"cutoff_delta"`
// Configures the publisher for pushing packages after polling
PubConfig PublisherConfig `yaml:"publisher"`

// Configures the feeds to be used for polling from package repositories
EnabledFeeds []string `yaml:"enabled_feeds"`

HttpPort int `yaml:"http_port,omitempty"`
CutoffDelta string `yaml:"cutoff_delta"`

// Configures the EventHandler instance to be used throughout the package-feeds application
EventsConfig *EventsConfig `yaml:"events"`

eventHandler *events.Handler
}

type PublisherConfig struct {
Type string `mapstructure:"type"`
Config interface{} `mapstructure:"config"`
}

type EventsConfig struct {
Sink string `mapstructure:"sink"`
EventFilter events.Filter `mapstructure:"filter"`
}
102 changes: 102 additions & 0 deletions events/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package events

const (
// Event Types
LOSSY_FEED_EVENT = "LOSSY_FEED"

// Components
FEEDS_COMPONENT = "Feeds"
)

type Sink interface {
AddEvent(e Event) error
}

type Event interface {
GetComponent() string
GetType() string
GetMessage() string
}

type Filter struct {
enabledEventTypes []string
disabledEventTypes []string

enabledComponents []string
}

type Handler struct {
eventSink Sink
eventFilter Filter
}

func NewHandler(sink Sink, filter Filter) *Handler {
return &Handler{
eventSink: sink,
eventFilter: filter,
}
}

func NewNullHandler() *Handler {
return &Handler{}
}

// Creates a filter for use with an event handler, nil can be provided for non values.
func NewFilter(enabledEventTypes, disabledEventTypes, enabledComponents []string) *Filter {
if enabledEventTypes == nil {
enabledEventTypes = []string{}
}
if disabledEventTypes == nil {
disabledEventTypes = []string{}
}
if enabledComponents == nil {
enabledComponents = []string{}
}
return &Filter{
enabledEventTypes: enabledEventTypes,
disabledEventTypes: disabledEventTypes,
enabledComponents: enabledComponents,
}
}

// Dispatches an event to the configured sink if it passes the configured filter.
// Filters are applied as follows:
// - disabled event types are always disabled.
// - enabled event types are enabled
// - enabled components are enabled except for disabled event types
func (h Handler) DispatchEvent(e Event) error {
if h.eventSink == nil {
return nil
}
dispatch := false
filter := h.eventFilter

eComponent := e.GetComponent()
eType := e.GetType()

// Enable components
if stringExistsInSlice(eComponent, filter.enabledComponents) {
dispatch = true
}
// Handle specific event types
if stringExistsInSlice(eType, filter.enabledEventTypes) {
dispatch = true
} else if stringExistsInSlice(eType, filter.disabledEventTypes) {
dispatch = false
}

if dispatch {
return h.eventSink.AddEvent(e)
}
return nil
}

// Checks for existance of a string within a slice of strings
func stringExistsInSlice(s string, slice []string) bool {
for _, sliceStr := range slice {
if s == sliceStr {
return true
}
}
return false
}
93 changes: 93 additions & 0 deletions events/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package events

import (
"testing"
)

func TestHandlerDispatchEventNoFilterConfigured(t *testing.T) {
t.Parallel()

sink := &MockSink{}
filter := NewFilter(nil, nil, nil)

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 0 {
t.Error("LossyFeedEvent was dispatched despite not being enabled")
}
}

func TestHandlerDispatchEventFilterAllowLossyFeed(t *testing.T) {
t.Parallel()

sink := &MockSink{}
filter := NewFilter([]string{LOSSY_FEED_EVENT}, nil, nil)

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 1 {
t.Error("LossyFeedEvent was not dispatched despite being configured to allow dispatch")
}
}

func TestHandlerDispatchEventFilterAllowFeedComponent(t *testing.T) {
t.Parallel()

sink := &MockSink{}
filter := NewFilter(nil, nil, []string{FEEDS_COMPONENT})

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 1 {
t.Error("LossyFeedEvent was not dispatched despite feeds component being allowed")
}
}

func TestHandlerDispatchEventFilterDisableLossyFeed(t *testing.T) {
t.Parallel()

sink := &MockSink{}
filter := NewFilter(nil, []string{LOSSY_FEED_EVENT}, []string{FEEDS_COMPONENT})

handler := NewHandler(sink, *filter)

event := &LossyFeedEvent{
Feed: "Foo",
}

err := handler.DispatchEvent(event)
if err != nil {
t.Fatal(err)
}

if len(sink.events) != 0 {
t.Error("LossyFeedEvent was dispatched despite being configured to disable dispatch")
}
}
30 changes: 30 additions & 0 deletions events/logrus_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package events

import (
"github.com/sirupsen/logrus"
)

const (
LOGGING_SINK_TYPE = "stdout"
)

type LoggingEventSink struct {
logger *logrus.Logger
}

// Creates an event sink which logs events using a provided logrus logger,
// fields "component" and "event_type" are applied to the logger and
// warnings are logged for each event.
func NewLoggingEventSink(logger *logrus.Logger) *LoggingEventSink {
return &LoggingEventSink{
logger: logger,
}
}

func (sink LoggingEventSink) AddEvent(e Event) error {
sink.logger.WithFields(logrus.Fields{
"event_type": e.GetType(),
"component": e.GetComponent(),
}).Warn(e.GetMessage())
return nil
}
45 changes: 45 additions & 0 deletions events/logrus_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package events

import (
"testing"

"github.com/sirupsen/logrus/hooks/test"
)

func TestLogrusSink(t *testing.T) {
t.Parallel()

log, hook := test.NewNullLogger()

sink := NewLoggingEventSink(log)

event := LossyFeedEvent{
Feed: "Foo",
}

err := sink.AddEvent(event)
if err != nil {
t.Error(err)
}

logEntry := hook.LastEntry()
if logEntry == nil {
t.Error("Log entry was not added to the configured logger")
}

if logEntry.Data["event_type"] != event.GetType() {
t.Errorf(
"Log entry had incorrect event_type field '%v' when '%v' was expected",
logEntry.Data["event_type"],
event.GetType(),
)
}

if logEntry.Data["component"] != event.GetComponent() {
t.Errorf(
"Log entry had incorrect component field '%v' when '%v' was expected",
logEntry.Data["component"],
event.GetComponent(),
)
}
}
23 changes: 23 additions & 0 deletions events/lossy_feed_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package events

import (
"fmt"
)

type LossyFeedEvent struct {
Feed string

Event
}

func (e LossyFeedEvent) GetComponent() string {
return FEEDS_COMPONENT
}

func (e LossyFeedEvent) GetType() string {
return LOSSY_FEED_EVENT
}

func (e LossyFeedEvent) GetMessage() string {
return fmt.Sprintf("detected potential missing package data when polling %v feed", e.Feed)
}
14 changes: 14 additions & 0 deletions events/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

type MockSink struct {
events []Event
}

func (s *MockSink) GetEvents() []Event {
return s.events
}

func (s *MockSink) AddEvent(e Event) error {
s.events = append(s.events, e)
return nil
}
Loading

0 comments on commit c66676b

Please sign in to comment.