Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ordering key #16

Merged
merged 3 commits into from
Oct 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions pkg/googlecloud/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type MarshalerUnmarshaler interface {
Unmarshaler
}

func (m DefaultMarshalerUnmarshaler) Marshal(topic string, msg *message.Message) (*pubsub.Message, error) {
func (DefaultMarshalerUnmarshaler) Marshal(topic string, msg *message.Message) (*pubsub.Message, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
}
Expand All @@ -51,7 +51,7 @@ func (m DefaultMarshalerUnmarshaler) Marshal(topic string, msg *message.Message)
return marshaledMsg, nil
}

func (u DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error) {
func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error) {
metadata := make(message.Metadata, len(pubsubMsg.Attributes))

var id string
Expand All @@ -70,3 +70,61 @@ func (u DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*mess

return msg, nil
}

type GenerateOrderingKey func(topic string, msg *message.Message) (string, error)

type orderingMarshaler struct {
Marshaler

generateOrderingKey GenerateOrderingKey
}

func NewOrderingMarshaler(generateOrderingKey GenerateOrderingKey) Marshaler {
return &orderingMarshaler{
Marshaler: DefaultMarshalerUnmarshaler{},
generateOrderingKey: generateOrderingKey,
}
}

func (om orderingMarshaler) Marshal(topic string, msg *message.Message) (*pubsub.Message, error) {
marshaledMsg, err := om.Marshaler.Marshal(topic, msg)
if err != nil {
return nil, err
}

orderingKey, err := om.generateOrderingKey(topic, msg)
if err != nil {
return nil, errors.Wrap(err, "cannot generate ordering key")
}
marshaledMsg.OrderingKey = orderingKey

return marshaledMsg, nil
}

type ExtractOrderingKey func(orderingKey string, msg *message.Message) error

type orderingUnmarshaler struct {
Unmarshaler

extractOrderingKey ExtractOrderingKey
}

func NewOrderingUnmarshaler(extractOrderingKey ExtractOrderingKey) Unmarshaler {
return &orderingUnmarshaler{
Unmarshaler: DefaultMarshalerUnmarshaler{},
extractOrderingKey: extractOrderingKey,
}
}

func (ou orderingUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error) {
msg, err := ou.Unmarshaler.Unmarshal(pubsubMsg)
if err != nil {
return nil, err
}

if err := ou.extractOrderingKey(pubsubMsg.OrderingKey, msg); err != nil {
return nil, errors.Wrap(err, "cannot extract ordering key")
}

return msg, nil
}
3 changes: 3 additions & 0 deletions pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type PublisherConfig struct {
// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// Enables the topic message ordering
EnableMessageOrdering bool

// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
Expand Down Expand Up @@ -170,6 +172,7 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e
}()

t = p.client.Topic(topic)
t.EnableMessageOrdering = p.config.EnableMessageOrdering

// todo: theoretically, one could want different publish settings per topic, which is supported by the client lib
// different instances of publisher may be used then
Expand Down
66 changes: 59 additions & 7 deletions pkg/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (

// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work

func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscriptionName googlecloud.SubscriptionNameFn) (message.Publisher, message.Subscriber) {
func newPubSub(t *testing.T, enableMessageOrdering bool, marshaler googlecloud.Marshaler, unmarshaler googlecloud.Unmarshaler, subscriptionName googlecloud.SubscriptionNameFn) (message.Publisher, message.Subscriber) {
logger := watermill.NewStdLogger(true, true)

publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
Marshaler: marshaler,
EnableMessageOrdering: enableMessageOrdering,
Marshaler: marshaler,
},
logger,
)
Expand All @@ -34,9 +35,10 @@ func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscri
googlecloud.SubscriberConfig{
GenerateSubscriptionName: subscriptionName,
SubscriptionConfig: pubsub.SubscriptionConfig{
RetainAckedMessages: false,
RetainAckedMessages: false,
EnableMessageOrdering: enableMessageOrdering,
},
Unmarshaler: marshaler,
Unmarshaler: unmarshaler,
},
logger,
)
Expand All @@ -45,14 +47,44 @@ func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscri
return publisher, subscriber
}

func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
var defaultMarshalerUnmarshaler googlecloud.DefaultMarshalerUnmarshaler
return newPubSub(t, false, defaultMarshalerUnmarshaler, defaultMarshalerUnmarshaler, googlecloud.TopicSubscriptionName)
}

func createPubSubWithSubscriptionName(t *testing.T, subscriptionName string) (message.Publisher, message.Subscriber) {
return newPubSub(t, googlecloud.DefaultMarshalerUnmarshaler{},
var defaultMarshalerUnmarshaler googlecloud.DefaultMarshalerUnmarshaler
return newPubSub(t, false, defaultMarshalerUnmarshaler, defaultMarshalerUnmarshaler,
googlecloud.TopicSubscriptionNameWithSuffix(subscriptionName),
)
}

func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
return newPubSub(t, googlecloud.DefaultMarshalerUnmarshaler{}, googlecloud.TopicSubscriptionName)
func createPubSubWithOrdering(t *testing.T) (message.Publisher, message.Subscriber) {
return newPubSub(
t,
true,
googlecloud.NewOrderingMarshaler(func(topic string, msg *message.Message) (string, error) {
return "ordering_key", nil
}),
googlecloud.NewOrderingUnmarshaler(func(orderingKey string, msg *message.Message) error {
return nil
}),
googlecloud.TopicSubscriptionName,
)
}

func createPubSubWithSubscriptionNameWithOrdering(t *testing.T, subscriptionName string) (message.Publisher, message.Subscriber) {
return newPubSub(
t,
true,
googlecloud.NewOrderingMarshaler(func(topic string, msg *message.Message) (string, error) {
return "ordering_key", nil
}),
googlecloud.NewOrderingUnmarshaler(func(orderingKey string, msg *message.Message) error {
return nil
}),
googlecloud.TopicSubscriptionNameWithSuffix(subscriptionName),
)
}

func TestPublishSubscribe(t *testing.T) {
Expand All @@ -69,6 +101,26 @@ func TestPublishSubscribe(t *testing.T) {
)
}

func TestPublishSubscribeOrdering(t *testing.T) {
t.Skip("skipping because the emulator does not currently redeliver nacked messages when ordering is enabled")

if testing.Short() {
t.Skip("skipping long tests")
}

tests.TestPubSub(
m110 marked this conversation as resolved.
Show resolved Hide resolved
t,
tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: true,
Persistent: true,
},
createPubSubWithOrdering,
createPubSubWithSubscriptionNameWithOrdering,
)
}

func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
rand.Seed(time.Now().Unix())
testNumber := rand.Int()
Expand Down