Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: expired messages clog up flow control #9309

Closed
hongalex opened this issue Jan 26, 2024 · 6 comments · Fixed by #9311
Closed

pubsub: expired messages clog up flow control #9309

hongalex opened this issue Jan 26, 2024 · 6 comments · Fixed by #9311
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@hongalex
Copy link
Member

Client

PubSub

Environment

all

Code

	sub.ReceiveSettings.NumGoroutines = 1
	sub.ReceiveSettings.MaxOutstandingMessages = 1
	sub.ReceiveSettings.MaxExtension = 30 * time.Second
	var received int32

	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Println("message ", string(msg.Data), received)
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		if received == 1 {
 			// do nothing
		} else {
			msg.Ack()
		}
		fmt.Println("done")
	})

Expected behavior

With low flow control set (e.g. MaxOutstandingMessages = 1), expired messages do not clog up flow control.

Actual behavior

Flow control is blocked since release is never called. No new messages can be processed.

Additional context

Currently when messages expire, it means the lease management mechanism deletes it from the map that tracks how long to process it for. However, cleanup of resources (including releasing flow control resources), happens only when the message's done function is called.

Flow control release should happen after the end of the user callback (passed into Receive).

@hongalex hongalex added the triage me I really want to be triaged. label Jan 26, 2024
@hongalex hongalex self-assigned this Jan 26, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 26, 2024
@hongalex hongalex added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. and removed api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged. labels Jan 26, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 26, 2024
@nh731685
Copy link

Can this be added to https://github.com/googleapis/google-cloud-go/blob/main/pubsub/CHANGES.md as a breaking change?

I understand the importance of this bug fix, and that compatible API usage is documented for versions 1.19+.
As a library consumer since version 1.3.1, there was nothing in CHANGES.md to indicate my team would need to rework our code to safely upgrade to API version 1.36+.

@hongalex hongalex reopened this May 29, 2024
@hongalex
Copy link
Member Author

@nh731685 Sorry this caused a problem for you. This fix was merely intended to make it so that if ack and nack were not called, flow control resources won't be held indefinitely. Can you describe more about how this breaks your code? Is there a significant amount of time between calling ack and when your callback completes?

@nh731685
Copy link

In our flow, the callback completes before Ack or Nack. Ack or Nack are called in another goroutine, anywhere between tens of milliseconds up to 10 minutes after the callback completes. This other goroutine that calls Ack/Nack is a gRPC handler.

We relied on the flow control of MaxOutstandingMessages to protect the system from becoming overloaded. With this bug fix, MaxOutstandingMessages is no longer honored when Ack is called asynchronously.

I understand that as a safe library pattern, protecting the user from losing PubSub messages and eventually deadlocking is desirable.

Hypothetically, we can work around this change by adjusting the callback to wait for some Ack/Nack hint on a channel before letting the callback return.

@hongalex
Copy link
Member Author

I see. Calling Ack/Nack in a goroutine is undefined behavior and not supported. We started calling this out a few years back (noting this was after you started using the library), in response to an issue reported to us (reference for my future self: internal issue 222259118). In summary, a user was acking messages in a goroutine like what you were doing. However, when they cancelled the Receive call, they were not able to properly ack messages. The reason is that the shutdown behavior of the subscriber only blocks on callbacks completing, and not when all messages have been Acked/Nacked. This means the goroutine that sends out acks is shutdown immediately when there are no callbacks left, even when the processing of the message is still happening in another goroutine.

I do have plans to support acking/nacking outside the callback, tracked in #8200. In theory, this would require to rewriting the lease management system to be "aware" of in-flight ack/nacks and when messages expire. This new system would need to guard against the issue of dropping messages when acks are still in flight and also freeing up flow control when messages expire.

@nh731685
Copy link

Thank you, Alex. It sounds like my team should use a workaround such as the one described above so we can take library updates.

@hongalex
Copy link
Member Author

Yeah I think the workaround sounds reasonable. I'll close out this issue, but recommend commenting on #8200 if you have more concerns here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants