Skip to content

Commit

Permalink
feat(pubsub/pstest): subscription message ordering (#6257)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
arriven and hongalex authored Jul 1, 2022
1 parent df178f8 commit 71bd273
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
30 changes: 28 additions & 2 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
now := s.timeNowFunc()
s.maintainMessages(now)
var msgs []*pb.ReceivedMessage
for id, m := range s.msgs {
for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) {
if m.outstanding() {
continue
}
Expand All @@ -1028,6 +1028,32 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
return msgs
}

func filterMsgs(msgs map[string]*message, enableMessageOrdering bool) map[string]*message {
if !enableMessageOrdering {
return msgs
}
result := make(map[string]*message)

type msg struct {
id string
m *message
}
orderingKeyMap := make(map[string]msg)
for id, m := range msgs {
orderingKey := m.proto.Message.OrderingKey
if orderingKey == "" {
orderingKey = id
}
if val, ok := orderingKeyMap[orderingKey]; !ok || m.proto.Message.PublishTime.AsTime().Before(val.m.proto.Message.PublishTime.AsTime()) {
orderingKeyMap[orderingKey] = msg{m: m, id: id}
}
}
for _, val := range orderingKeyMap {
result[val.id] = val.m
}
return result
}

func (s *subscription) deliver() {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -1036,7 +1062,7 @@ func (s *subscription) deliver() {
s.maintainMessages(now)
// Try to deliver each remaining message.
curIndex := 0
for id, m := range s.msgs {
for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) {
if m.outstanding() {
continue
}
Expand Down
40 changes: 40 additions & 0 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,3 +1513,43 @@ func TestSubscriptionPushPull(t *testing.T) {
t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff)
}
}

func TestSubscriptionMessageOrdering(t *testing.T) {
ctx := context.Background()

s := NewServer()
defer s.Close()

top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: "projects/p/topics/t"})
if err != nil {
t.Errorf("Failed to init pubsub topic: %v", err)
}
sub, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: top.Name,
AckDeadlineSeconds: 30,
EnableMessageOrdering: true,
})
if err != nil {
t.Errorf("Failed to init pubsub subscription: %v", err)
}

const orderingKey = "ordering-key"
var ids []string
for i := 0; i < 1000; i++ {
ids = append(ids, s.PublishOrdered("projects/p/topics/t", []byte("hello"), nil, orderingKey))
}
for len(ids) > 0 {
pull, err := s.GServer.Pull(ctx, &pb.PullRequest{Subscription: sub.Name})
if err != nil {
t.Errorf("Failed to pull from server: %v", err)
}
for i, msg := range pull.ReceivedMessages {
if msg.Message.MessageId != ids[i] {
t.Errorf("want %s, got %s", ids[i], msg.AckId)
}
s.GServer.Acknowledge(ctx, &pb.AcknowledgeRequest{Subscription: sub.Name, AckIds: []string{msg.AckId}})
}
ids = ids[len(pull.ReceivedMessages):]
}
}

0 comments on commit 71bd273

Please sign in to comment.