Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental MessageBus component #281

Closed
wants to merge 4 commits into from
Closed

Experimental MessageBus component #281

wants to merge 4 commits into from

Conversation

m110
Copy link
Member

@m110 m110 commented Apr 18, 2022

Context

Apart from low-level Publisher and Subscriber interfaces, Watermill users can use either Router or the CQRS component.

After considering how we were using Watermill over the years and talking to other people in the community, it seems to me that:

  • Router is still a low-level API compared to the CQRS component.
    • It makes users responsible for marshaling and unmarshaling, often repetitive.
    • It's a nice API for routing messages from one Pub/Sub to another with little processing. However, event-driven applications rarely use such semantics (hence the weirdly named NoPublisherHandlerFunc we've added quite late).
  • The CQRS component addresses the issues above but seems to be quite complex for what most people use.
    • Separating commands and events is a rare use case. It seems that commands come most often from synchronous APIs, like HTTP or gRPC handlers. You can use just events or commands if you want, but it makes the APIs a bit weird overall (like CommandOrEvent in multiple places).
    • The Facade API doesn't provide a great experience. It's not modular, has a massive config, and mixes all concepts (publishers and subscribers, commands and queries). It makes you provide a publisher even if you don't need one.
    • In theory, the EventHandlers constructor makes it possible to pass dependencies to the handlers. But when using the component, we've been injecting dependencies on another level anyway, not using it.

I feel we could use a high-level API that's more pleasant to use in event-driven services than the Router and is easier to grasp than the CQRS Facade.

One example: it's not trivial right now to run handlers within consumer groups, even though it's a super popular use case. (Two different handlers within the same service consuming the same event type.)

Proposal

  • Add a MessageBus component (name to be discussed).
  • It works with typed messages (structs) and doesn't differentiate between commands and events.
  • Leverage generics to get rid of the weird NewEvent methods.
  • Keep the API closer to that of Router. AddHandler method over creating structs.
  • Decouple Publisher as a separate struct.
  • Replace gogo with golang/protobuf in the default Protobuf marshaler.
  • Add context.Context to Marshal to let users extend metadata with things like correlation ID.
  • Replace "event name" with "message type".
  • Changed default message type to struct name instead of full qualified struct name.

API Proposals

Option 1

Note:

  • We need bus.NewMessageHandler because methods can't be generic.
  • bus.NewMessageHandler is generic but the type is inferred from the handler func passed as the argument.
messageBus, err := bus.NewMessageBus(router, generateTopic, subscriberConstructor, watermill.NopLogger{})

err = messageBus.AddHandler(bus.NewMessageHandler(
	"testHandler",
	func(ctx context.Context, event ExampleEvent) error {
		fmt.Println("Handling", event.ID)
		return nil
	},
	marshaler,
	watermill.NopLogger{},
))

Option 2

messageBus, err := bus.NewMessageBus(router, bus.JSONMarshaler{}, generateTopic, subscriberConstructor, watermill.NopLogger{})

err = bus.AddHandler(
	messageBus,
	"testHandler",
	func(ctx context.Context, event ExampleEvent) error {
		fmt.Println("Handling", event.ID)
		return nil
	},
)

Option 3

Decouple consumer groups handling from generic handlers.

err = router.AddConsumerGroupNoPublisherHandler(
	"testHandler",
	subscriberConstructor,
	message.NewGenericHandler(func(ctx context.Context, event ExampleEvent) error {
		fmt.Println("Handling", event.ID)
		return nil
	}, marshaler, logger),
)


// Unmarshal unmarshals given watermill's Message into protobuf's message.
func (ProtobufMarshaler) Unmarshal(msg *message.Message, v any) (err error) {
return proto.Unmarshal(msg.Payload, v.(proto.Message))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't handle the possible panic if the casting fails.
We could drop NoProtoMessageError:

m, ok := v.(proto.Message)
if !ok {
    return NoProtoMessageError{v}
}
return proto.Unmarshal(msg.Payload, m)

Or we could make the caller assume responsibility for passing the correct value, idk.
Usually we rather catch those kinds of exceptions.

Comment on lines +29 to +30
eventToUnmarshal := &TestProtobufEvent{}
err = marshaler.Unmarshal(msg, eventToUnmarshal)
Copy link
Contributor

@maclav3 maclav3 Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventToUnmarshal := TestProtobufEvent{}
err = marshaler.Unmarshal(msg, &eventToUnmarshal)

looks like a more common syntax in Go, isn't it?

require.NoError(t, err)

assert.EqualValues(t, eventToMarshal.String(), eventToUnmarshal.String())
assert.Equal(t, msg.Metadata.Get("name"), "cqrs_test.TestProtobufEvent")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a copypaste, the package name shouldn't be cqrs_test.
Or, according to the PR description, there shouldn't be a package name at all

}, nil
}

func AddHandler[M any](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we maybe guard the new API with a build tag //go:build go1.18 ?
And provide an alternative API for the previous versions?

Also, there are some concerns about the performance impact of generics, like https://planetscale.com/blog/generics-can-make-your-go-code-slower (I didn't really read the whole article carefully, don't have a strong opinion), so we could make it something like //go:build go1.18 && generics to make people enable generics consciously before we are comfortable with production use of generics-based code.

That being said, would it make sense to add a non-generic version? How would we attain that anyway, wouldn't we have to use reflect for example (which probably voids any concerns we may have over performance, doesn't it)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Router and CQRS interfaces will still be there if people need it. The new API would be optional (but recommended).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maclav3 The article is good enough to explain some cases that you should careful to use generics in Go. And I recently reviewed the Go 1.20 changes that I belive Go 1.20 will fix some generics performance issues. E.g. golang/go#54238. But in some cases, it's still be more significant performance degradation by design.

"github.com/ThreeDotsLabs/watermill/message"
)

func TestNewEventProcessor(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestNewMessageBus. Also, do we have tests for the functioning of the MessageBus?
publisher_test.go looks like it provides some, but it uses cqrs.EventBus and not MessageBus.

// Name() string
// }
// It ignores if the value is a pointer or not.
func NamedStruct(fallback func(v any) string) func(v any) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine we could panic early in NamedStruct if fallback == nil (and not in the closure in L40)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could make an opinionated choice like

if fallback == nil {
    fallback = StructName
}

@m110
Copy link
Member Author

m110 commented Jul 2, 2023

Replaced by #367

@m110 m110 closed this Jul 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants