Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request/reply support #397

Merged
merged 16 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion components/cqrs/command_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,26 @@ func NewCommandBus(

// Send sends command to the command bus.
func (c CommandBus) Send(ctx context.Context, cmd any) error {
return c.SendWithModifiedMessage(ctx, cmd, nil)
}

func (c CommandBus) SendWithModifiedMessage(ctx context.Context, cmd any, modify func(*message.Message) error) error {
msg, topicName, err := c.newMessage(ctx, cmd)
if err != nil {
return err
}

return c.publisher.Publish(topicName, msg)
if modify != nil {
if err := modify(msg); err != nil {
return errors.Wrap(err, "cannot modify message")
}
}

if err := c.publisher.Publish(topicName, msg); err != nil {
return err
}

return nil
}

func (c CommandBus) newMessage(ctx context.Context, command any) (*message.Message, string, error) {
Expand Down
5 changes: 4 additions & 1 deletion components/cqrs/command_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ type genericCommandHandler[Command any] struct {

// NewCommandHandler creates a new CommandHandler implementation based on provided function
// and command type inferred from function argument.
func NewCommandHandler[Command any](handlerName string, handleFunc func(ctx context.Context, cmd *Command) error) CommandHandler {
func NewCommandHandler[Command any](
handlerName string,
handleFunc func(ctx context.Context, cmd *Command) error,
) CommandHandler {
return &genericCommandHandler[Command]{
handleFunc: handleFunc,
handlerName: handlerName,
Expand Down
9 changes: 7 additions & 2 deletions components/cqrs/command_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ type CommandProcessorConfig struct {
Logger watermill.LoggerAdapter

// If true, CommandProcessor will ack messages even if CommandHandler returns an error.
// If RequestReplyEnabled is enabled and sending reply fails, the message will be nack-ed anyway.
// If RequestReplyBackend is not null and sending reply fails, the message will be nack-ed anyway. todo: verify
// todo: test if it works properly with RequestReply (it should nack by default as well?)
// todo: describe that it doesn't affect RequestReplyBackend filures - sending reply failure will always nack the message
AckCommandHandlingErrors bool
m110 marked this conversation as resolved.
Show resolved Hide resolved

// disableRouterAutoAddHandlers is used to keep backwards compatibility.
Expand Down Expand Up @@ -314,12 +316,15 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
"received_command_type": messageCmdName,
})

ctx := CtxWithOriginalMessage(msg.Context(), msg)
msg.SetContext(ctx)

if err := p.config.Marshaler.Unmarshal(msg, cmd); err != nil {
return err
}

handle := func(params CommandProcessorOnHandleParams) (err error) {
return params.Handler.Handle(params.Message.Context(), params.Command)
return params.Handler.Handle(ctx, params.Command)
m110 marked this conversation as resolved.
Show resolved Hide resolved
}
if p.config.OnHandle != nil {
handle = p.config.OnHandle
Expand Down
64 changes: 64 additions & 0 deletions components/cqrs/command_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,67 @@ func TestCommandProcessor_AddHandlersToRouter_without_disableRouterAutoAddHandle
err = cp.AddHandlersToRouter(router)
assert.ErrorContains(t, err, "AddHandlersToRouter should be called only when using deprecated NewCommandProcessor")
}

func TestCommandProcessor_original_msg_set_to_ctx(t *testing.T) {
logger := watermill.NewCaptureLogger()

marshaler := cqrs.JSONMarshaler{}

msgToSend, err := marshaler.Marshal(&TestCommand{ID: "1"})
require.NoError(t, err)

mockSub := &mockSubscriber{
MessagesToSend: []*message.Message{
msgToSend,
},
}

router, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
return "commands", nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return mockSub, nil
},
Marshaler: marshaler,
Logger: logger,
AckCommandHandlingErrors: true,
},
)
require.NoError(t, err)

var msgFromCtx *message.Message

err = commandProcessor.AddHandlers(cqrs.NewCommandHandler(
"handler", func(ctx context.Context, cmd *TestCommand) error {
msgFromCtx = cqrs.OriginalMessageFromCtx(ctx)
return nil
}),
)
require.NoError(t, err)

go func() {
err := router.Run(context.Background())
assert.NoError(t, err)
}()

<-router.Running()

select {
case <-msgToSend.Acked():
// ok
case <-msgToSend.Nacked():
// nack received
t.Fatal("nack received, message should be acked")
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for ack")
}

require.NotNil(t, msgFromCtx)
assert.Equal(t, msgToSend, msgFromCtx)
}
27 changes: 27 additions & 0 deletions components/cqrs/ctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cqrs

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"
)

type ctxKey string

const (
originalMessage ctxKey = "original_message"
)

// OriginalMessageFromCtx returns the original message that was received by the event/command handler.
func OriginalMessageFromCtx(ctx context.Context) *message.Message {
val, ok := ctx.Value(originalMessage).(*message.Message)
if !ok {
return nil
}
return val
}

// CtxWithOriginalMessage returns a new context with the original message attached.
func CtxWithOriginalMessage(ctx context.Context, msg *message.Message) context.Context {
return context.WithValue(ctx, originalMessage, msg)
}
5 changes: 4 additions & 1 deletion components/cqrs/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ type genericEventHandler[T any] struct {

// NewEventHandler creates a new EventHandler implementation based on provided function
// and event type inferred from function argument.
func NewEventHandler[T any](handlerName string, handleFunc func(ctx context.Context, event *T) error) EventHandler {
func NewEventHandler[T any](
handlerName string,
handleFunc func(ctx context.Context, event *T) error,
) EventHandler {
return &genericEventHandler[T]{
handleFunc: handleFunc,
handlerName: handlerName,
Expand Down
5 changes: 4 additions & 1 deletion components/cqrs/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,15 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
"received_event_type": messageEventName,
})

ctx := CtxWithOriginalMessage(msg.Context(), msg)
msg.SetContext(ctx)

if err := p.config.Marshaler.Unmarshal(msg, event); err != nil {
return err
}

handle := func(params EventProcessorOnHandleParams) error {
return params.Handler.Handle(params.Message.Context(), params.Event)
return params.Handler.Handle(ctx, params.Event)
}
if p.config.OnHandle != nil {
handle = p.config.OnHandle
Expand Down
5 changes: 4 additions & 1 deletion components/cqrs/event_processor_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,15 @@ func (p EventGroupProcessor) routerHandlerGroupFunc(handlers []GroupEventHandler
"received_event_type": messageEventName,
})

ctx := CtxWithOriginalMessage(msg.Context(), msg)
msg.SetContext(ctx)

if err := p.config.Marshaler.Unmarshal(msg, event); err != nil {
return err
}

handle := func(params EventGroupProcessorOnHandleParams) error {
return params.Handler.Handle(params.Message.Context(), params.Event)
return params.Handler.Handle(ctx, params.Event)
}
if p.config.OnHandle != nil {
handle = p.config.OnHandle
Expand Down
64 changes: 64 additions & 0 deletions components/cqrs/event_processor_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,67 @@ func TestEventProcessor_handler_group(t *testing.T) {
assert.Equal(t, 1, handler1Calls)
assert.Equal(t, 1, handler2Calls)
}

func TestEventGroupProcessor_original_msg_set_to_ctx(t *testing.T) {
ts := NewTestServices()

msg, err := ts.Marshaler.Marshal(&TestEvent{})
require.NoError(t, err)

mockSub := &mockSubscriber{
MessagesToSend: []*message.Message{
msg,
},
}

router, err := message.NewRouter(message.RouterConfig{}, ts.Logger)
require.NoError(t, err)

cp, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return mockSub, nil
},
AckOnUnknownEvent: true,
Marshaler: ts.Marshaler,
Logger: ts.Logger,
},
)
require.NoError(t, err)

var msgFromCtx *message.Message

err = cp.AddHandlersGroup(
"some_group",
cqrs.NewGroupEventHandler(
func(ctx context.Context, cmd *TestEvent) error {
msgFromCtx = cqrs.OriginalMessageFromCtx(ctx)
return nil
}),
)
require.NoError(t, err)

go func() {
err := router.Run(context.Background())
assert.NoError(t, err)
}()

<-router.Running()

select {
case <-msg.Acked():
// ok
case <-msg.Nacked():
// nack received
t.Fatal("nack received, message should be acked")
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for ack")
}

require.NotNil(t, msgFromCtx)
assert.Equal(t, msg, msgFromCtx)
}
63 changes: 63 additions & 0 deletions components/cqrs/event_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -474,3 +475,65 @@ func TestEventProcessor_AddHandlersToRouter_without_disableRouterAutoAddHandlers
err = cp.AddHandlersToRouter(router)
assert.ErrorContains(t, err, "AddHandlersToRouter should be called only when using deprecated NewEventProcessor")
}

func TestEventProcessor_original_msg_set_to_ctx(t *testing.T) {
ts := NewTestServices()

msg, err := ts.Marshaler.Marshal(&TestEvent{})
require.NoError(t, err)

mockSub := &mockSubscriber{
MessagesToSend: []*message.Message{
msg,
},
}

router, err := message.NewRouter(message.RouterConfig{}, ts.Logger)
require.NoError(t, err)

cp, err := cqrs.NewEventProcessorWithConfig(
router,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return mockSub, nil
},
AckOnUnknownEvent: true,
Marshaler: ts.Marshaler,
Logger: ts.Logger,
},
)
require.NoError(t, err)

var msgFromCtx *message.Message

err = cp.AddHandlers(cqrs.NewEventHandler(
"handler", func(ctx context.Context, cmd *TestEvent) error {
msgFromCtx = cqrs.OriginalMessageFromCtx(ctx)
return nil
}),
)
require.NoError(t, err)

go func() {
err := router.Run(context.Background())
assert.NoError(t, err)
}()

<-router.Running()

select {
case <-msg.Acked():
// ok
case <-msg.Nacked():
// nack received
t.Fatal("nack received, message should be acked")
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for ack")
}

require.NotNil(t, msgFromCtx)
assert.Equal(t, msg, msgFromCtx)
}
Loading
Loading