From 71bd273b8a77ed22c41a1284813ee59eb6820bda Mon Sep 17 00:00:00 2001 From: Bohdan Ivashko <20084245+arriven@users.noreply.github.com> Date: Fri, 1 Jul 2022 23:40:44 +0300 Subject: [PATCH] feat(pubsub/pstest): subscription message ordering (#6257) Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com> --- pubsub/pstest/fake.go | 30 ++++++++++++++++++++++++++-- pubsub/pstest/fake_test.go | 40 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 964b692642ae..a362a63cc4d2 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -1006,7 +1006,7 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage { now := s.timeNowFunc() s.maintainMessages(now) var msgs []*pb.ReceivedMessage - for id, m := range s.msgs { + for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) { if m.outstanding() { continue } @@ -1028,6 +1028,32 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage { return msgs } +func filterMsgs(msgs map[string]*message, enableMessageOrdering bool) map[string]*message { + if !enableMessageOrdering { + return msgs + } + result := make(map[string]*message) + + type msg struct { + id string + m *message + } + orderingKeyMap := make(map[string]msg) + for id, m := range msgs { + orderingKey := m.proto.Message.OrderingKey + if orderingKey == "" { + orderingKey = id + } + if val, ok := orderingKeyMap[orderingKey]; !ok || m.proto.Message.PublishTime.AsTime().Before(val.m.proto.Message.PublishTime.AsTime()) { + orderingKeyMap[orderingKey] = msg{m: m, id: id} + } + } + for _, val := range orderingKeyMap { + result[val.id] = val.m + } + return result +} + func (s *subscription) deliver() { s.mu.Lock() defer s.mu.Unlock() @@ -1036,7 +1062,7 @@ func (s *subscription) deliver() { s.maintainMessages(now) // Try to deliver each remaining message. curIndex := 0 - for id, m := range s.msgs { + for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) { if m.outstanding() { continue } diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index ee5f604b5747..d061cc63aaba 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1513,3 +1513,43 @@ func TestSubscriptionPushPull(t *testing.T) { t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff) } } + +func TestSubscriptionMessageOrdering(t *testing.T) { + ctx := context.Background() + + s := NewServer() + defer s.Close() + + top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: "projects/p/topics/t"}) + if err != nil { + t.Errorf("Failed to init pubsub topic: %v", err) + } + sub, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{ + Name: "projects/p/subscriptions/s", + Topic: top.Name, + AckDeadlineSeconds: 30, + EnableMessageOrdering: true, + }) + if err != nil { + t.Errorf("Failed to init pubsub subscription: %v", err) + } + + const orderingKey = "ordering-key" + var ids []string + for i := 0; i < 1000; i++ { + ids = append(ids, s.PublishOrdered("projects/p/topics/t", []byte("hello"), nil, orderingKey)) + } + for len(ids) > 0 { + pull, err := s.GServer.Pull(ctx, &pb.PullRequest{Subscription: sub.Name}) + if err != nil { + t.Errorf("Failed to pull from server: %v", err) + } + for i, msg := range pull.ReceivedMessages { + if msg.Message.MessageId != ids[i] { + t.Errorf("want %s, got %s", ids[i], msg.AckId) + } + s.GServer.Acknowledge(ctx, &pb.AcknowledgeRequest{Subscription: sub.Name, AckIds: []string{msg.AckId}}) + } + ids = ids[len(pull.ReceivedMessages):] + } +}