From 66581c44f2309984475fe5053fa11707f6f93b8d Mon Sep 17 00:00:00 2001 From: Diego Becciolini Date: Fri, 17 May 2024 18:29:47 +0100 Subject: [PATCH] fix(pubsub): Closes #10094 - memory leak in pubsub receive (#10153) * fix(pubsub): make sure grpc stream gets closed * fix(pubsub): preserve cancellation error behaviour * fix(pubsub): the stream cancellation error is not necessarily a grpc error --------- Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com> --- pubsub/iterator.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 8e3155dca883..8dad53e8b897 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -163,6 +163,9 @@ func (it *messageIterator) stop() { it.checkDrained() it.mu.Unlock() it.wg.Wait() + if it.ps != nil { + it.ps.cancel() + } } // checkDrained closes the drained channel if the iterator has been stopped and all @@ -246,6 +249,14 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { rmsgs, err = it.pullMessages(maxToPull) } else { rmsgs, err = it.recvMessages() + // If stopping the iterator results in the grpc stream getting shut down and + // returning an error here, treat the same as above and return EOF. + // If the cancellation comes from the underlying grpc client getting closed, + // do propagate the cancellation error. + // See https://github.com/googleapis/google-cloud-go/pull/10153#discussion_r1600814775 + if err != nil && it.ps.ctx.Err() == context.Canceled { + err = io.EOF + } } // Any error here is fatal. if err != nil {