Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible deadlock when AckWithResponse is true due to queueCh is full #1310

Closed

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Nov 21, 2024

Motivation

When AckWithResponse is enabled, a deadlock could happen easily if queueCh and messagesCh of partitionConsumer are full. In this case, if the consumer receives new messages from broker, MessageReceived will be blocked at

The stacks could be:

3  0x0000000100ea83c0 in github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).MessageReceived
   at ./consumer_partition.go:1376
4  0x0000000100b1c73c in github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage
   at ./internal/connection.go:751
5  0x0000000100b1ab7c in github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand
   at ./internal/connection.go:590

As shown in the stacks above, the connection.run() goroutine is blocked so that it could not handle any new command anymore, including the ACK response. Then the ACK related method will fail with "request timed out". The deadlock cannot be resolved unless the consumer peeks new messages again to make messagesCh not full, then queueCh will move messages to messagesCh so that queueCh will be not full.

The root cause is that queueCh is a buffered channel that has a fixed size of ReceiverQueueSize. However, the broker could dispatch more messages than the ReceiverQueueSize because the permits in Flow requests only limits the number of entries to read, not the number of messages. Hence this issue could be easily reproduced by reducing the ReceiverQueueSize and sending many messages with a great batch size. See how TestAckResponseNotBlocked reproduces this issue.

Modifications

Add a list of messages (pendingMessages) to support queueing unlimited number of messages for partitionedConsumer. The flow control is actually controlled by the availablePermits and related Flow requests so that the queue size won't be too large (it depends on the batch size).

Add two channels (queueInCh and queueOutCh) for the following loops:

  • Run a background goroutine to send messages from queueInCh -> pendingMessages -> queueOutCh
  • In MessageReceived, send the received message to queueInCh so that the message will be queued to pendingMessages. It could never be blocked.
  • In dispatcher(), poll messages from queueOutCh, which polls the 1st message from pendingMessages.

The background goroutine will exist by close(pc.closeCh) when the consumer is closed. It happens after completing the CloseConsumer RPC in internalClose so there should not be a MessageReceived call that sends messages to the closed queueInCh.

Add TestAckResponseNotBlocked to verify it works with a very small receiver queue size config.

@BewareMyPower BewareMyPower marked this pull request as draft November 21, 2024 11:56
@BewareMyPower BewareMyPower marked this pull request as ready for review November 21, 2024 12:23
Copy link

@fracasula fracasula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this PR because it was not in a draft state when I started.

I wanted to comment also that perhaps using two channels queueInCh and queueOutCh might help make the code more understandable and easier to maintain.

What it's being proposed here is to use a more complex object with an embedded list, 3 channels and its own go routine running in the background.

pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
@BewareMyPower BewareMyPower marked this pull request as draft November 25, 2024 03:17
@BewareMyPower BewareMyPower marked this pull request as ready for review November 25, 2024 12:39
@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Nov 25, 2024

Hi @fracasula, thanks for your comments and I've addressed all of them. You can check out the design from updated Modifications part in the PR description.

@shibd @RobertIndie Please also take a look.

@shibd shibd requested a review from fracasula November 26, 2024 03:36
shibd added a commit to shibd/pulsar-client-go that referenced this pull request Nov 26, 2024
@shibd
Copy link
Member

shibd commented Nov 26, 2024

hi, @BewareMyPower I put our discuss on here.

Before #1283, There won't be the issue mentioned in this PR because we can ensure that the pulled messages will never overflow queueCh. Previously, it was an array, and if an entry had batch messages, they were stored as an array.

Furthermore, the availablePermits mechanism will ensure that the number of pulled entries does not exceed the size set by pendingQueueSize.

@shibd
Copy link
Member

shibd commented Nov 26, 2024

I have revert #1283 and copy unit test from this PR: #1311

This is an alternative solution to the issue.

The current PR offers a method similar to an unbounded channel.

The PR #1311 maintains the initial behavior, using availablePermits to ensure the channel is never filled.

We can compare which approach is more user-friendly. From my perspective, #1311 is simpler and the code is more readable.

BewareMyPower pushed a commit that referenced this pull request Nov 26, 2024
### Motivation

For the issue please refer to the PR description: #1310

Here have an analysis: #1310 (comment)

### Modifications
- Rever #1283 
- Add test from #1310 to cover `queueCh` never not full.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants