Skip to content

Commit

Permalink
Retryable event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Mar 5, 2021
1 parent 3e1a09e commit 76601be
Show file tree
Hide file tree
Showing 17 changed files with 217 additions and 80 deletions.
28 changes: 27 additions & 1 deletion eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)

// Test async errors from handlers.
errorHandler := mocks.NewEventHandler("error_handler")
errorHandler.Err = errors.New("handler error")
errorHandler.ErrOnce = errors.New("handler error")
if err := bus1.AddHandler(ctx, eh.MatchAll{}, errorHandler); err != nil {
t.Fatal("there should be no error:", err)
}
Expand All @@ -234,9 +234,35 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
// Good case.
if err.Error() != "could not handle event (error_handler): handler error: (Event@3)" {
t.Error("incorrect error sent on event bus:", err)
t.Logf("%#v", err.Event)
}
}

// Retryable events.
retryHandler := mocks.NewEventHandler("retry_handler")
retryHandler.ErrOnce = eh.RetryableEventError{Err: errors.New("retryable error")}
bus1.AddHandler(ctx, eh.MatchAll{}, retryHandler)

time.Sleep(timeout) // Need to wait here for handlers to be added.

event4 := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event4"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 4),
eh.WithMetadata(map[string]interface{}{"meta": "data", "num": int32(42)}),
)
if err := bus1.HandleEvent(ctx, event4); err != nil {
t.Error("there should be no error:", err)
}
select {
case <-time.After(timeout):
t.Error("there should be a retried event in time")
case <-retryHandler.Recv:
}
retryHandler.Lock()
if retryHandler.NumHandleEvent != 2 {
t.Error("the handler should have been called twice")
}
retryHandler.Unlock()

// Cancel all handlers and wait.
cancel()
bus1.Wait()
Expand Down
9 changes: 7 additions & 2 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,19 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
msg.Nack()
return
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
msg.Nack()
return
}

msg.Ack()
Expand Down
9 changes: 7 additions & 2 deletions eventbus/jetstream/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,19 @@ func (b *EventBus) handler(ctx context.Context, m eh.EventMatcher, h eh.EventHan

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
msg.Nak()
return
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in Jetstream event bus: %s", err)
}
msg.Nak()
return
}

msg.AckSync()
Expand Down
7 changes: 6 additions & 1 deletion eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,18 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
return
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
return
}

r.CommitMessages(ctx, msg)
Expand Down
18 changes: 15 additions & 3 deletions eventbus/local/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type evt struct {
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch chan []byte) {
defer b.wg.Done()

for {
Expand All @@ -161,7 +161,19 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
select {
case ch <- data:
// Retry event by putting it back on the bus.
default:
log.Printf("eventhorizon: publish queue full for retry in local event bus")
}
continue
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
Expand All @@ -187,7 +199,7 @@ func NewGroup() *Group {
}
}

func (g *Group) channel(id string) <-chan []byte {
func (g *Group) channel(id string) chan []byte {
g.busMu.Lock()
defer g.busMu.Unlock()

Expand Down
19 changes: 16 additions & 3 deletions eventbus/redis/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand
defer b.wg.Done()

msgHandler := b.handler(m, h, groupName)
readOpt := ">"
for {
streams, err := b.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: groupName + "_" + b.clientID,
Streams: []string{b.streamName, ">"},
Streams: []string{b.streamName, readOpt},
}).Result()
if err != nil && err != context.Canceled {
err = fmt.Errorf("could not receive: %w", err)
Expand All @@ -219,6 +220,13 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand
msgHandler(ctx, &msg)
}
}

// Flip flop the read option to read new and non-acked messages every other time.
if readOpt == ">" {
readOpt = "0"
} else {
readOpt = ">"
}
}
}

Expand Down Expand Up @@ -253,14 +261,19 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
// TODO: Nack if possible.
return
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
}
// TODO: Nack if possible.
return
}

_, err = b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result()
Expand Down
19 changes: 19 additions & 0 deletions eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package eventhorizon

import (
"context"
"fmt"
"reflect"
"runtime"
"strings"
Expand All @@ -40,6 +41,24 @@ type EventHandler interface {
HandleEvent(context.Context, Event) error
}

// RetryableEventError is a "soft" error that handlers should return if they want the
// handler to be retried. This will often be the case when handling events (for
// example in a saga) where related read models have not yet been projected.
// NOTE: The retry behavior is dependent on the eventbus implementation used.
type RetryableEventError struct {
Err error
}

// Error implements the Error method of the error interface.
func (e RetryableEventError) Error() string {
return fmt.Sprintf("retryable: %s", e.Err)
}

// Cause returns the cause of this error.
func (e RetryableEventError) Cause() error {
return e.Err
}

// EventHandlerFunc is a function that can be used as a event handler.
type EventHandlerFunc func(context.Context, Event) error

Expand Down
77 changes: 51 additions & 26 deletions eventhandler/projector/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (e Error) Cause() error {
// ErrModelNotSet is when a model factory is not set on the EventHandler.
var ErrModelNotSet = errors.New("model not set")

// ErrIncorrectEntityVersion is when an entity has an incorrect version.
var ErrIncorrectEntityVersion = errors.New("incorrect entity version")

// ErrIncorrectProjectedEntityVersion is when an entity has an incorrect version after projection.
var ErrIncorrectProjectedEntityVersion = errors.New("incorrect projected entity version")

// NewEventHandler creates a new EventHandler.
func NewEventHandler(projector Projector, repo eh.ReadWriteRepo, options ...Option) *EventHandler {
h := &EventHandler{
Expand Down Expand Up @@ -124,23 +130,38 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
defer cancel()
}
entity, err := h.repo.Find(findCtx, event.AggregateID())
if rrErr, ok := err.(eh.RepoError); ok && rrErr.Err == eh.ErrEntityNotFound {
if h.factoryFn == nil {
if err != nil {
if errors.Is(err, eh.ErrEntityNotFound) {
// Create the model if there was no previous.
// TODO: Consider that the event can still have been projected elsewhere
// but not yet available in this find. Handle this before/when saving!
if h.factoryFn == nil {
return Error{
Err: ErrModelNotSet,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}
entity = h.factoryFn()
} else if errors.Is(err, version.ErrIncorrectLoadedEntityVersion) {
// Retry handling the event if model had the incorrect version.
return eh.RetryableEventError{
Err: Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
},
}
} else {
return Error{
Err: ErrModelNotSet,
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}
entity = h.factoryFn()
} else if err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}

// The entity should be one version behind the event.
Expand All @@ -149,17 +170,19 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
entityVersion = entity.AggregateVersion()

// Ignore old/duplicate events.
if entity.AggregateVersion() >= event.Version() {
if event.Version() <= entity.AggregateVersion() {
return nil
}

if entity.AggregateVersion()+1 != event.Version() {
return Error{
Err: eh.ErrIncorrectEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
if event.Version() != entity.AggregateVersion()+1 {
return eh.RetryableEventError{
Err: Error{
Err: ErrIncorrectEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
},
}
}
}
Expand All @@ -181,7 +204,7 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
entityVersion = newEntity.AggregateVersion()
if newEntity.AggregateVersion() != event.Version() {
return Error{
Err: eh.ErrIncorrectEntityVersion,
Err: ErrIncorrectProjectedEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
Expand All @@ -203,12 +226,14 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
}
} else {
if err := h.repo.Remove(ctx, event.AggregateID()); err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
return eh.RetryableEventError{
Err: Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
},
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions eventhandler/projector/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ func TestEventHandler_SaveError(t *testing.T) {
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event := eh.NewEvent(mocks.EventType, eventData, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 1))
repo.LoadErr = eh.RepoError{
Err: eh.ErrEntityNotFound,
}
projector.newEntity = &mocks.SimpleModel{
ID: id,
}

saveErr := errors.New("save error")
repo.SaveErr = saveErr
expectedErr := Error{
Expand Down
7 changes: 3 additions & 4 deletions middleware/commandhandler/tracing/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ func NewMiddleware() eh.CommandHandlerMiddleware {
return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error {
opName := fmt.Sprintf("Command(%s)", cmd.CommandType())
sp, ctx := opentracing.StartSpanFromContext(ctx, opName)

err := h.HandleCommand(ctx, cmd)

sp.SetTag("eh.command_type", cmd.CommandType())
sp.SetTag("eh.aggregate_type", cmd.AggregateType())
sp.SetTag("eh.aggregate_id", cmd.AggregateID())

err := h.HandleCommand(ctx, cmd)
if err != nil {
ext.LogError(sp, err)
}
sp.Finish()

sp.Finish()
return err
})
})
Expand Down
2 changes: 1 addition & 1 deletion middleware/eventhandler/async/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestEventHandler(t *testing.T) {
m, errCh = NewMiddleware()
h = eh.UseEventHandlerMiddleware(inner, m)
handlingErr := errors.New("handling error")
inner.Err = handlingErr
inner.ErrOnce = handlingErr
ctx := context.Background()
if err := h.HandleEvent(ctx, event); err != nil {
t.Error("there should never be an error:", err)
Expand Down
Loading

0 comments on commit 76601be

Please sign in to comment.