Skip to content

Commit

Permalink
fix(pubsub): move flow control release to callback completion (#9311)
Browse files Browse the repository at this point in the history
* fix(pubsub): move flow control release to end of callback rather done function

* make test failure faster, add exactly once logic back
  • Loading branch information
hongalex authored Jan 26, 2024
1 parent 97d62c7 commit 2b6b0da
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
14 changes: 4 additions & 10 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -1389,24 +1388,19 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return nil
}
iter.eoMu.RLock()
ackh, _ := msgAckHandler(msg, iter.enableExactlyOnceDelivery)
msgAckHandler(msg, iter.enableExactlyOnceDelivery)
iter.eoMu.RUnlock()
old := ackh.doneFunc
msgLen := len(msg.Data)
ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) {
defer fc.release(ctx, msgLen)
old(ackID, ack, r, receiveTime)
}

wg.Add(1)
// Make sure the subscription has ordering enabled before adding to scheduler.
var key string
if s.enableOrdering {
key = msg.OrderingKey
}
// TODO(deklerk): Can we have a generic handler at the
// constructor level?
msgLen := len(msg.Data)
if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
defer fc.release(ctx, msgLen)
f(ctx2, msg.(*Message))
}); err != nil {
wg.Done()
Expand Down
50 changes: 50 additions & 0 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,3 +787,53 @@ func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
t.Fatal("expected message to not have been delivered when exactly once enabled")
})
}

func TestSubscribeMessageExpirationFlowControl(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "t")
subConfig := SubscriptionConfig{
Topic: topic,
}
s, err := client.CreateSubscription(ctx, "s", subConfig)
if err != nil {
t.Fatalf("create sub err: %v", err)
}

s.ReceiveSettings.NumGoroutines = 1
s.ReceiveSettings.MaxOutstandingMessages = 1
s.ReceiveSettings.MaxExtension = 10 * time.Second
s.ReceiveSettings.MaxExtensionPeriod = 10 * time.Second
r := topic.Publish(ctx, &Message{
Data: []byte("redelivered-message"),
})
if _, err := r.Get(ctx); err != nil {
t.Fatalf("failed to publish message: %v", err)
}

deliveryCount := 0
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
// Only acknowledge the message on the 2nd invocation of the callback (2nd delivery).
if deliveryCount == 1 {
msg.Ack()
}
// Otherwise, do nothing and let the message expire.
deliveryCount++
if deliveryCount == 2 {
cancel()
}
})
if deliveryCount != 2 {
t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount)
}
if err != nil {
t.Fatalf("s.Receive err: %v", err)
}
}

0 comments on commit 2b6b0da

Please sign in to comment.