-
Notifications
You must be signed in to change notification settings - Fork 400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experimental MessageBus component #281
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package bus | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/ThreeDotsLabs/watermill/message" | ||
) | ||
|
||
type MessageMarshaler interface { | ||
Marshal(ctx context.Context, v any) (*message.Message, error) | ||
Unmarshal(msg *message.Message, v any) (err error) | ||
Type(v any) string | ||
TypeFromMessage(msg *message.Message) string | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package bus | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
|
||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill/message" | ||
) | ||
|
||
type JSONMarshaler struct { | ||
NewUUID func() string | ||
GenerateType func(v any) string | ||
} | ||
|
||
func (m JSONMarshaler) Marshal(ctx context.Context, v any) (*message.Message, error) { | ||
b, err := json.Marshal(v) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
msg := message.NewMessage( | ||
m.newUUID(), | ||
b, | ||
) | ||
msg.Metadata.Set("type", m.Type(v)) | ||
|
||
return msg, nil | ||
} | ||
|
||
func (m JSONMarshaler) newUUID() string { | ||
if m.NewUUID != nil { | ||
return m.NewUUID() | ||
} | ||
|
||
// default | ||
return watermill.NewUUID() | ||
} | ||
|
||
func (JSONMarshaler) Unmarshal(msg *message.Message, v any) (err error) { | ||
return json.Unmarshal(msg.Payload, v) | ||
} | ||
|
||
func (m JSONMarshaler) Type(message any) string { | ||
if m.GenerateType != nil { | ||
return m.GenerateType(message) | ||
} | ||
|
||
return StructName(message) | ||
} | ||
|
||
func (m JSONMarshaler) TypeFromMessage(msg *message.Message) string { | ||
return msg.Metadata.Get("type") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package bus_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill/components/bus" | ||
) | ||
|
||
var jsonEventToMarshal = TestEvent{ | ||
ID: watermill.NewULID(), | ||
When: time.Date(2016, time.August, 15, 14, 13, 12, 0, time.UTC), | ||
} | ||
|
||
func TestJsonMarshaler(t *testing.T) { | ||
marshaler := bus.JSONMarshaler{} | ||
|
||
msg, err := marshaler.Marshal(context.Background(), jsonEventToMarshal) | ||
require.NoError(t, err) | ||
|
||
eventToUnmarshal := TestEvent{} | ||
err = marshaler.Unmarshal(msg, &eventToUnmarshal) | ||
require.NoError(t, err) | ||
|
||
assert.EqualValues(t, jsonEventToMarshal, eventToUnmarshal) | ||
} | ||
|
||
func TestJSONMarshaler_Marshal_new_uuid_set(t *testing.T) { | ||
marshaler := bus.JSONMarshaler{ | ||
NewUUID: func() string { | ||
return "foo" | ||
}, | ||
} | ||
|
||
msg, err := marshaler.Marshal(context.Background(), jsonEventToMarshal) | ||
require.NoError(t, err) | ||
|
||
assert.Equal(t, msg.UUID, "foo") | ||
} | ||
|
||
func TestJSONMarshaler_Marshal_generate_name(t *testing.T) { | ||
marshaler := bus.JSONMarshaler{ | ||
GenerateType: func(v any) string { | ||
return "foo" | ||
}, | ||
} | ||
|
||
msg, err := marshaler.Marshal(context.Background(), jsonEventToMarshal) | ||
require.NoError(t, err) | ||
|
||
assert.Equal(t, msg.Metadata.Get("name"), "foo") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package bus | ||
|
||
import ( | ||
"context" | ||
"reflect" | ||
|
||
"github.com/golang/protobuf/proto" | ||
"github.com/pkg/errors" | ||
|
||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill/message" | ||
) | ||
|
||
// ProtobufMarshaler is the default Protocol Buffers marshaler. | ||
type ProtobufMarshaler struct { | ||
NewUUID func() string | ||
GenerateType func(v any) string | ||
} | ||
|
||
// NoProtoMessageError is returned when the given value does not implement proto.Message. | ||
type NoProtoMessageError struct { | ||
v any | ||
} | ||
|
||
func (e NoProtoMessageError) Error() string { | ||
rv := reflect.ValueOf(e.v) | ||
if rv.Kind() != reflect.Ptr { | ||
return "v is not proto.Message, you must pass pointer value to implement proto.Message" | ||
} | ||
|
||
return "v is not proto.Message" | ||
} | ||
|
||
// Marshal marshals the given protobuf's message into watermill's Message. | ||
func (m ProtobufMarshaler) Marshal(ctx context.Context, v any) (*message.Message, error) { | ||
protoMsg, ok := v.(proto.Message) | ||
if !ok { | ||
return nil, errors.WithStack(NoProtoMessageError{v}) | ||
} | ||
|
||
b, err := proto.Marshal(protoMsg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
msg := message.NewMessage( | ||
m.newUUID(), | ||
b, | ||
) | ||
msg.Metadata.Set("type", m.Type(v)) | ||
|
||
return msg, nil | ||
} | ||
|
||
func (m ProtobufMarshaler) newUUID() string { | ||
if m.NewUUID != nil { | ||
return m.NewUUID() | ||
} | ||
|
||
// default | ||
return watermill.NewUUID() | ||
} | ||
|
||
// Unmarshal unmarshals given watermill's Message into protobuf's message. | ||
func (ProtobufMarshaler) Unmarshal(msg *message.Message, v any) (err error) { | ||
return proto.Unmarshal(msg.Payload, v.(proto.Message)) | ||
} | ||
|
||
func (m ProtobufMarshaler) Type(message any) string { | ||
if m.GenerateType != nil { | ||
return m.GenerateType(message) | ||
} | ||
|
||
return StructName(message) | ||
} | ||
|
||
func (m ProtobufMarshaler) TypeFromMessage(msg *message.Message) string { | ||
return msg.Metadata.Get("type") | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package bus_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/golang/protobuf/ptypes" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill/components/bus" | ||
) | ||
|
||
func TestProtobufMarshaler(t *testing.T) { | ||
marshaler := bus.ProtobufMarshaler{} | ||
|
||
when, err := ptypes.TimestampProto(time.Now()) | ||
require.NoError(t, err) | ||
eventToMarshal := &TestProtobufEvent{ | ||
Id: watermill.NewULID(), | ||
When: when, | ||
} | ||
|
||
msg, err := marshaler.Marshal(context.Background(), eventToMarshal) | ||
require.NoError(t, err) | ||
|
||
eventToUnmarshal := &TestProtobufEvent{} | ||
err = marshaler.Unmarshal(msg, eventToUnmarshal) | ||
Comment on lines
+29
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eventToUnmarshal := TestProtobufEvent{}
err = marshaler.Unmarshal(msg, &eventToUnmarshal) looks like a more common syntax in Go, isn't it? |
||
require.NoError(t, err) | ||
|
||
assert.EqualValues(t, eventToMarshal.String(), eventToUnmarshal.String()) | ||
assert.Equal(t, msg.Metadata.Get("name"), "cqrs_test.TestProtobufEvent") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably a copypaste, the package name shouldn't be |
||
} | ||
|
||
func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) { | ||
marshaler := bus.ProtobufMarshaler{ | ||
NewUUID: func() string { | ||
return "foo" | ||
}, | ||
} | ||
|
||
when, err := ptypes.TimestampProto(time.Now()) | ||
require.NoError(t, err) | ||
eventToMarshal := &TestProtobufEvent{ | ||
Id: watermill.NewULID(), | ||
When: when, | ||
} | ||
|
||
msg, err := marshaler.Marshal(context.Background(), eventToMarshal) | ||
require.NoError(t, err) | ||
|
||
assert.Equal(t, msg.UUID, "foo") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't handle the possible panic if the casting fails.
We could drop
NoProtoMessageError
:Or we could make the caller assume responsibility for passing the correct value, idk.
Usually we rather catch those kinds of exceptions.