Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for event driven tink #301

Merged
merged 13 commits into from
Dec 7, 2020
1 change: 1 addition & 0 deletions .codespell-whitelist
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
alls
ba
cas
eventtypes
60 changes: 60 additions & 0 deletions client/informers/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package informers
mmlb marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"io"

"github.com/tinkerbell/tink/client"
"github.com/tinkerbell/tink/protos/events"
)

// Informer is the base informer
type Informer interface {
Start(ctx context.Context, req *events.WatchRequest, fn func(e *events.Event) error) error
}

type sharedInformer struct {
eventsCh chan *events.Event
errCh chan error
}

// New returns an instance of event informer
func New() Informer {
return &sharedInformer{
eventsCh: make(chan *events.Event),
errCh: make(chan error),
}
}

func (s *sharedInformer) Start(ctx context.Context, req *events.WatchRequest, fn func(e *events.Event) error) error {
defer close(s.errCh)
stream, err := client.EventsClient.Watch(ctx, req)
if err != nil {
return err
}

go processEvents(s.eventsCh, s.errCh, fn)
mmlb marked this conversation as resolved.
Show resolved Hide resolved

var event *events.Event
for event, err = stream.Recv(); err == nil && event != nil; event, err = stream.Recv() {
if err == io.EOF {
return nil
}
s.eventsCh <- event
}
if err != nil {
return err
}
close(s.eventsCh)
return <-s.errCh
}

func processEvents(eventsCh <-chan *events.Event, errCh chan<- error, fn func(e *events.Event) error) {
for event := range eventsCh {
err := fn(event)
if err != nil {
errCh <- err
return
}
}
}
61 changes: 61 additions & 0 deletions client/informers/informer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package informers

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/tinkerbell/tink/protos/events"
)

func TestProcessEvents(t *testing.T) {
var eventsCh chan *events.Event
var counter int
testCases := map[string]struct {
eventGenerator func()
fn func(e *events.Event) error
expectedCounter int
}{
"no error": {
eventGenerator: func() {
for i := 0; i < 5; i++ {
eventsCh <- &events.Event{}
}
},
fn: func(e *events.Event) error {
counter++
return nil
},
expectedCounter: 5,
},
"error": {
eventGenerator: func() {
for i := 0; i <= 5; i++ {
if i == 3 {
break
}
eventsCh <- &events.Event{}
}
},
fn: func(e *events.Event) error {
if counter == 2 {
return errors.New("event processing error")
}
counter++
return nil
},
expectedCounter: 2,
},
}

for name, tc := range testCases {
eventsCh = make(chan *events.Event)
counter = 0

t.Run(name, func(t *testing.T) {
go processEvents(eventsCh, nil, tc.fn)
tc.eventGenerator()
assert.Equal(t, tc.expectedCounter, counter)
})
}
}
34 changes: 34 additions & 0 deletions client/informers/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package informers

import "github.com/tinkerbell/tink/protos/events"

type watchRequestModifier func(*events.WatchRequest)

func watchRequest(m ...watchRequestModifier) *events.WatchRequest {
req := &events.WatchRequest{
EventTypes: []events.EventType{},
ResourceTypes: []events.ResourceType{},
}
for _, fn := range m {
fn(req)
}
return req
}

func withEventTypes(ets []events.EventType) watchRequestModifier {
return func(e *events.WatchRequest) {
e.EventTypes = ets
}
}

func withResourceTypes(rts []events.ResourceType) watchRequestModifier {
return func(e *events.WatchRequest) {
e.ResourceTypes = rts
}
}

func withResourceID(id string) watchRequestModifier {
return func(e *events.WatchRequest) {
e.ResourceId = id
}
}
114 changes: 114 additions & 0 deletions client/informers/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package informers

import (
"encoding/json"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/tinkerbell/tink/protos/events"
)

// Notification represents an event notification
type Notification struct {
ID string `json:"id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
ResourceType string `json:"resource_type,omitempty"`
EventType string `json:"event_type,omitempty"`
Data interface{} `json:"data,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
}

// ToEvent converts a notification into events.Event type
func (n Notification) ToEvent() (*events.Event, error) {
d, err := json.Marshal(n.Data)
if err != nil {
return nil, err
}

createdAt, err := ptypes.TimestampProto(*n.CreatedAt)
if err != nil {
return nil, err
}

return &events.Event{
Id: n.ID,
ResourceId: n.ResourceID,
ResourceType: ResourceType(n.ResourceType),
EventType: EventType(n.EventType),
Data: d,
CreatedAt: createdAt,
}, nil
}

// Prefix adds prefix to notification's resource and event type
func (n *Notification) Prefix() {
const (
resourceTypePrefix = "RESOURCE_TYPE_"
eventTypePrefix = "EVENT_TYPE_"
)
n.ResourceType = resourceTypePrefix + n.ResourceType
n.EventType = eventTypePrefix + n.EventType
}

// Filter a notification based on given reducer.
func Filter(n *Notification, reducer func(n *Notification) bool) bool {
return reducer(n)
}

// ResourceType returns events.ResourceType for a given key
func ResourceType(name string) events.ResourceType {
switch name {
case events.ResourceType_RESOURCE_TYPE_TEMPLATE.String():
return events.ResourceType_RESOURCE_TYPE_TEMPLATE
case events.ResourceType_RESOURCE_TYPE_HARDWARE.String():
return events.ResourceType_RESOURCE_TYPE_HARDWARE
case events.ResourceType_RESOURCE_TYPE_WORKFLOW.String():
return events.ResourceType_RESOURCE_TYPE_WORKFLOW
default:
return events.ResourceType_RESOURCE_TYPE_UNKNOWN
}
}

// EventType returns events.EventType for a given key
func EventType(name string) events.EventType {
switch name {
case events.EventType_EVENT_TYPE_CREATED.String():
return events.EventType_EVENT_TYPE_CREATED
case events.EventType_EVENT_TYPE_UPDATED.String():
return events.EventType_EVENT_TYPE_UPDATED
case events.EventType_EVENT_TYPE_DELETED.String():
return events.EventType_EVENT_TYPE_DELETED
default:
return events.EventType_EVENT_TYPE_UNKNOWN
}
}

// Reduce returns a closure to filter notifications.
func Reduce(req *events.WatchRequest) func(n *Notification) bool {
return func(n *Notification) bool {
if req.ResourceId != "" && n.ResourceID != req.ResourceId {
return true
}

eType := EventType(n.EventType)
for i, t := range req.EventTypes {
if t == eType {
break
}
if i == len(req.EventTypes)-1 && t != eType {
return true
}
}

rType := ResourceType(n.ResourceType)
for i, t := range req.ResourceTypes {
if t == rType {
break
}
if i == len(req.ResourceTypes)-1 && t != rType {
return true
}
}
return false
}
}
Loading