Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix bug with AckWithResult with exactly once disabled #7319

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,13 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2, func(ctx context.Context, m *Message) {
if exactlyOnceDelivery {
if _, err := m.AckWithResult().Get(ctx); err != nil {
t.Fatalf("failed to ack message with exactly once delivery: %v", err)
}
return
}
m.Ack()
})
if err != nil {
Expand Down Expand Up @@ -2003,16 +2009,13 @@ func TestIntegration_TopicRetention(t *testing.T) {
}
}

func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t)

for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
}

// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
}

func TestIntegration_TopicUpdateSchema(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pubsub/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,20 @@ func (ah *psAckHandler) OnNack() {
}

func (ah *psAckHandler) OnAckWithResult() *AckResult {
// call done with true to indicate ack.
ah.done(true)
if !ah.exactlyOnceDelivery {
return newSuccessAckResult()
}
// call done with true to indicate ack.
ah.done(true)
return ah.ackResult
}

func (ah *psAckHandler) OnNackWithResult() *AckResult {
// call done with false to indicate nack.
ah.done(false)
if !ah.exactlyOnceDelivery {
return newSuccessAckResult()
}
// call done with false to indicate nack.
ah.done(false)
return ah.ackResult
}

Expand Down
12 changes: 8 additions & 4 deletions pubsub/streaming_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) {

func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) {
sub := client.Subscription("S")
gotMsgs, err := pullN(context.Background(), sub, len(msgs), func(_ context.Context, m *Message) {
gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0, func(_ context.Context, m *Message) {
id, err := strconv.Atoi(msgAckID(m))
if err != nil {
t.Fatalf("pullN err: %v", err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestStreamingPullRetry(t *testing.T) {

sub := client.Subscription("S")
sub.ReceiveSettings.NumGoroutines = 1
gotMsgs, err := pullN(context.Background(), sub, len(testMessages), func(_ context.Context, m *Message) {
gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 0, func(_ context.Context, m *Message) {
id, err := strconv.Atoi(msgAckID(m))
if err != nil {
t.Fatalf("pullN err: %v", err)
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestStreamingPullConcurrent(t *testing.T) {
sub := client.Subscription("S")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
gotMsgs, err := pullN(ctx, sub, nMessages, func(ctx context.Context, m *Message) {
gotMsgs, err := pullN(ctx, sub, nMessages, 0, func(ctx context.Context, m *Message) {
m.Ack()
})
if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
Expand Down Expand Up @@ -513,7 +513,8 @@ func newMock(t *testing.T) (*Client, *mockServer) {
}

// pullN calls sub.Receive until at least n messages are received.
func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context, *Message)) ([]*Message, error) {
// Wait a provided number of seconds before cancelling.
func pullN(ctx context.Context, sub *Subscription, n, wait int, f func(context.Context, *Message)) ([]*Message, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the wait as int seems like an odd choice (wrong type, no indication of the base interval). Maybe just make it time.Duration?

var (
mu sync.Mutex
msgs []*Message
Expand All @@ -526,6 +527,9 @@ func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context
mu.Unlock()
f(ctx, m)
if nSeen >= n {
// Wait a specified amount of time so that for exactly once delivery,
// Acks aren't cancelled immediately.
time.Sleep(time.Duration(wait) * time.Second)
cancel()
}
})
Expand Down
4 changes: 2 additions & 2 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ type SubscriptionConfig struct {
// by Pub/Sub and have distinct MessageID values.
//
// Lastly, to guarantee messages have been acked or nacked properly, you must
// call Message.AckWithResponse() or Message.NackWithResponse(). These return an
// AckResponse which will be ready if the message has been acked (or failed to be acked).
// call Message.AckWithResult() or Message.NackWithResult(). These return an
// AckResult which will be ready if the message has been acked (or failed to be acked).
EnableExactlyOnceDelivery bool

// State indicates whether or not the subscription can receive messages.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) {
srv.Publish(topic.name, []byte{byte(i)}, nil)
}
sub.ReceiveSettings.Synchronous = synchronous
msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) {
msgs, err := pullN(ctx, sub, 256, 0, func(_ context.Context, m *Message) {
if exactlyOnceDelivery {
ar := m.AckWithResult()
// Don't use the above ctx here since that will get cancelled.
Expand Down