Skip to content

Commit

Permalink
Merge pull request #33 from ThreeDotsLabs/bump-watermill-3
Browse files Browse the repository at this point in the history
Bump watermill and fix races in tests
  • Loading branch information
m110 authored Aug 26, 2024
2 parents 5e491bb + 3cf9aa1 commit cef4dd1
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# for Watermill development purposes.
# For Watermill based application docker please check https://watermill.io/docs/getting-started/

version: '3'
services:
googlecloud:
image: google/cloud-sdk:360.0.0
image: google/cloud-sdk:489.0.0
entrypoint: gcloud --quiet beta emulators pubsub start --host-port=0.0.0.0:8085 --verbosity=debug --log-http
ports:
- 8085:8085
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ toolchain go1.22.5
require (
cloud.google.com/go v0.115.1 // indirect
cloud.google.com/go/pubsub v1.42.0
github.com/ThreeDotsLabs/watermill v1.3.6
github.com/ThreeDotsLabs/watermill v1.3.7
github.com/cenkalti/backoff/v3 v3.2.2
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ cloud.google.com/go/longrunning v0.5.12/go.mod h1:S5hMV8CDJ6r50t2ubVJSKQVv5u0rmi
cloud.google.com/go/pubsub v1.42.0 h1:PVTbzorLryFL5ue8esTS2BfehUs0ahyNOY9qcd+HMOs=
cloud.google.com/go/pubsub v1.42.0/go.mod h1:KADJ6s4MbTwhXmse/50SebEhE4SmUwHi48z3/dHar1Y=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ThreeDotsLabs/watermill v1.3.6 h1:Lhw7/dKFrFh3rAvBt6r+x/KceQtYXYWrBsaWihR8LM0=
github.com/ThreeDotsLabs/watermill v1.3.6/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o=
github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
2 changes: 1 addition & 1 deletion pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
if err != nil {
return err
}
t.EnableMessageOrdering = p.config.EnableMessageOrdering

logFields := make(watermill.LogFields, 2)
logFields["topic"] = topic
Expand Down Expand Up @@ -212,6 +211,7 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e
p.topicsLock.Lock()
defer func() {
if err == nil {
t.EnableMessageOrdering = p.config.EnableMessageOrdering
p.topics[topic] = t
}
p.topicsLock.Unlock()
Expand Down
63 changes: 63 additions & 0 deletions pkg/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"time"

"cloud.google.com/go/pubsub"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill"
Expand Down Expand Up @@ -276,3 +278,64 @@ func produceMessages(t *testing.T, topic string, howMany int) {

require.NoError(t, pub.Publish(topic, messages...))
}

func TestPublishOrdering(t *testing.T) {
pub, sub := newPubSub(
t,
true,
googlecloud.NewOrderingMarshaler(func(topic string, msg *message.Message) (string, error) {
return msg.Metadata["ordering"], nil
}),
googlecloud.NewOrderingUnmarshaler(func(orderingKey string, msg *message.Message) error {
return nil
}),
googlecloud.TopicSubscriptionNameWithSuffix("TestPublishOrdering"),
)

defer func() {
_ = pub.Close()
_ = sub.Close()
}()

topic := fmt.Sprintf("topic_ordering_%v", uuid.NewString())

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messages, err := sub.Subscribe(ctx, topic)
require.NoError(t, err)

newMsg := func(id string, ordering string) *message.Message {
msg := message.NewMessage(id, []byte{})
msg.Metadata["ordering"] = ordering
return msg
}

toPublish := []*message.Message{
newMsg("1", "A"),
newMsg("2", "A"),
newMsg("3", "B"),
newMsg("4", "B"),
}

for i := range toPublish {
err := pub.Publish(topic, toPublish[i])
require.NoError(t, err)
}

received := map[string][]string{}

for i := 0; i < len(toPublish); i++ {
select {
case msg := <-messages:
key := msg.Metadata["ordering"]
received[key] = append(received[key], msg.UUID)
msg.Ack()
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
}

assert.Equal(t, []string{"1", "2"}, received["A"])
assert.Equal(t, []string{"3", "4"}, received["B"])
}

0 comments on commit cef4dd1

Please sign in to comment.