Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible deadlock when AckWithResponse is true due to queueCh is full #1310

Closed
71 changes: 57 additions & 14 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,17 @@ type partitionConsumer struct {
availablePermits *availablePermits

// the size of the queue channel for buffering messages
maxQueueSize int32
queueCh chan *message
maxQueueSize int32

// pendingMessages queues all messages received from the broker but not delivered to the user via Chan() or
// Receive() methods.
// There is a background goroutine that sends messages from the connection to `pendingMessages` via `queueInCh` and
// reads messages from `pendingMessages` via `queueOutCh` so that the `dispatcher` goroutine can read messages from
// the `queueOutCh`.
pendingMessages *list.List
queueInCh chan *message
queueOutCh chan *message

startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID

Expand Down Expand Up @@ -354,7 +363,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan *message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
Expand Down Expand Up @@ -419,6 +427,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}
pc.log.Info("Created consumer")
pc.setConsumerState(consumerReady)
pc.startQueueMessagesFromBroker()

startingMessageID := pc.startMessageID.get()
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
Expand Down Expand Up @@ -1171,7 +1180,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.markScaleIfNeed()
}

pc.queueCh <- &message{
pc.queueInCh <- &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
Expand Down Expand Up @@ -1373,7 +1382,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.markScaleIfNeed()
}

pc.queueCh <- msg
pc.queueInCh <- msg
}

if skippedMessages > 0 {
Expand Down Expand Up @@ -1537,12 +1546,14 @@ func (pc *partitionConsumer) dispatcher() {
}()
var queueMsg *message
for {
var queueCh chan *message
queueMsgCh := pc.queueOutCh
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int

if queueMsg != nil {
// Do not read from the queued message channel since there is already a message polled in the last loop
queueMsgCh = nil
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: queueMsg,
Expand All @@ -1563,8 +1574,6 @@ func (pc *partitionConsumer) dispatcher() {
} else {
pc.log.Debug("skip dispatching messages when seeking")
}
} else {
queueCh = pc.queueCh
}

select {
Expand Down Expand Up @@ -1602,7 +1611,7 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.Debug("received dispatcherSeekingControlCh, set isSeek to true")
pc.isSeeking.Store(true)

case msg, ok := <-queueCh:
case msg, ok := <-queueMsgCh:
if !ok {
return
}
Expand All @@ -1625,11 +1634,9 @@ func (pc *partitionConsumer) dispatcher() {
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
var nextMessageInQueue *trackingMessageID
go func() {
pc.queueCh <- nil
}()
pc.queueInCh <- nil

for m := range pc.queueCh {
for m := range pc.queueOutCh {
// the queue has been drained
if m == nil {
break
Expand Down Expand Up @@ -2077,7 +2084,7 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
}

func (pc *partitionConsumer) markScaleIfNeed() {
// availablePermits + incomingMessages (messages in queueCh) is the number of prefetched messages
// availablePermits + incomingMessages (messages in pendingMessages) is the number of prefetched messages
// The result of auto-scale we expected is currentQueueSize is slightly bigger than prefetched messages
prev := pc.scaleReceiverQueueHint.Swap(pc.availablePermits.get()+pc.incomingMessages.Load() >=
pc.currentQueueSize.Load())
Expand Down Expand Up @@ -2217,6 +2224,42 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
return *pc.conn.Load()
}

func (pc *partitionConsumer) startQueueMessagesFromBroker() {
pc.queueInCh = make(chan *message)
pc.queueOutCh = make(chan *message)
pc.pendingMessages = list.New()

go func() {
defer func() {
close(pc.queueInCh)
close(pc.queueOutCh)
pc.log.Debug("exiting queueMessagesFromBroker")
}()

for {
front := pc.pendingMessages.Front()
if front == nil {
select {
case msg := <-pc.queueInCh:
pc.pendingMessages.PushBack(msg)
case <-pc.closeCh:
return
}
} else {
msg := front.Value.(*message)
select {
case pc.queueOutCh <- msg:
pc.pendingMessages.Remove(front)
case msg := <-pc.queueInCh:
pc.pendingMessages.PushBack(msg)
case <-pc.closeCh:
return
}
}
}
}()
}

func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {
if msgID == nil {
return nil
Expand Down
26 changes: 17 additions & 9 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pulsar
import (
"sync"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
Expand All @@ -30,7 +31,7 @@ import (
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan *message, 1),
closeCh: make(chan struct{}),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
Expand All @@ -40,14 +41,15 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
pc.startQueueMessagesFromBroker()

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}

// ensure the tracker was set on the message id
message := <-pc.queueCh
message := <-pc.queueOutCh
id := message.ID().(*trackingMessageID)
assert.Nil(t, id.tracker)

Expand All @@ -68,7 +70,7 @@ func newTestMetrics() *internal.LeveledMetrics {
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan *message, 1),
closeCh: make(chan struct{}),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
Expand All @@ -78,14 +80,15 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
pc.startQueueMessagesFromBroker()

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}

// ensure the tracker was set on the message id
message := <-pc.queueCh
message := <-pc.queueOutCh
id := message.ID().(*trackingMessageID)
assert.Nil(t, id.tracker)

Expand All @@ -103,7 +106,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan *message, 10),
closeCh: make(chan struct{}),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
Expand All @@ -113,6 +116,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
pc.startQueueMessagesFromBroker()

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
Expand All @@ -121,14 +125,18 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {

// ensure the tracker was set on the message id
var messageIDs []*trackingMessageID
for i := 0; i < 10; i++ {
running := true
for running {
select {
case m := <-pc.queueCh:
case m := <-pc.queueOutCh:
id := m.ID().(*trackingMessageID)
assert.NotNil(t, id.tracker)
messageIDs = append(messageIDs, id)
default:
break
if len(messageIDs) == 10 {
running = false
}
case <-time.After(5 * time.Second):
running = false
}
}

Expand Down
63 changes: 63 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4877,3 +4877,66 @@ func receiveMessages(t *testing.T, consumer Consumer, numMessages int) []Message
assert.Equal(t, numMessages, len(msgs))
return msgs
}

func TestAckResponseNotBlocked(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
OperationTimeout: 5 * time.Second,
})
assert.Nil(t, err)
defer client.Close()

topic := fmt.Sprintf("test-ack-response-not-blocked-%v", time.Now().Nanosecond())
assert.Nil(t, createPartitionedTopic(topic, 10))

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)

ctx := context.Background()
numMessages := 1000
for i := 0; i < numMessages; i++ {
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("value-%d", i)),
}, func(_ MessageID, _ *ProducerMessage, err error) {
if err != nil {
t.Fatal(err)
}
})
if i%100 == 99 {
assert.Nil(t, producer.Flush())
}
}
producer.Flush()
producer.Close()

// Set a small receiver queue size to trigger ack response blocking if the internal `queueCh`
// is a channel with the same size
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
Type: KeyShared,
EnableBatchIndexAcknowledgment: true,
AckWithResponse: true,
ReceiverQueueSize: 5,
})
assert.Nil(t, err)
msgIDs := make([]MessageID, 0)
for i := 0; i < numMessages; i++ {
if msg, err := consumer.Receive(context.Background()); err != nil {
t.Fatal(err)
} else {
msgIDs = append(msgIDs, msg.ID())
if len(msgIDs) >= 10 {
if err := consumer.AckIDList(msgIDs); err != nil {
t.Fatal("Failed to acked messages: ", msgIDs, " ", err)
} else {
t.Log("Acked messages: ", msgIDs)
}
msgIDs = msgIDs[:0]
}
}
}
}
Loading