diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 3af3e3d67..803375f47 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -155,8 +155,17 @@ type partitionConsumer struct { availablePermits *availablePermits // the size of the queue channel for buffering messages - maxQueueSize int32 - queueCh *unboundedChannel[*message] + maxQueueSize int32 + + // pendingMessages queues all messages received from the broker but not delivered to the user via Chan() or + // Receive() methods. + // There is a background goroutine that sends messages from the connection to `pendingMessages` via `queueInCh` and + // reads messages from `pendingMessages` via `queueOutCh` so that the `dispatcher` goroutine can read messages from + // the `queueOutCh`. + pendingMessages *list.List + queueInCh chan *message + queueOutCh chan *message + startMessageID atomicMessageID lastDequeuedMsg *trackingMessageID @@ -354,7 +363,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon partitionIdx: int32(options.partitionIdx), eventsCh: make(chan interface{}, 10), maxQueueSize: int32(options.receiverQueueSize), - queueCh: newUnboundedChannel[*message](), startMessageID: atomicMessageID{msgID: options.startMessageID}, connectedCh: make(chan struct{}), messageCh: messageCh, @@ -419,6 +427,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon } pc.log.Info("Created consumer") pc.setConsumerState(consumerReady) + pc.startQueueMessagesFromBroker() startingMessageID := pc.startMessageID.get() if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) { @@ -949,11 +958,6 @@ func (pc *partitionConsumer) Close() { // wait for request to finish <-req.doneCh - - // It will close `queueCh.in`. If `MessageReceived` was called after that, it will panic because new messages - // will be sent to a closed channel. However, generally it's impossible because the broker will not be able to - // dispatch messages to this consumer after receiving the close request. - pc.queueCh.stop() } func (pc *partitionConsumer) Seek(msgID MessageID) error { @@ -1176,7 +1180,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } - pc.queueCh.inCh <- &message{ + pc.queueInCh <- &message{ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()), key: msgMeta.GetPartitionKey(), @@ -1378,7 +1382,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } - pc.queueCh.inCh <- msg + pc.queueInCh <- msg } if skippedMessages > 0 { @@ -1542,12 +1546,14 @@ func (pc *partitionConsumer) dispatcher() { }() var queueMsg *message for { - var queueCh <-chan *message + queueMsgCh := pc.queueOutCh var messageCh chan ConsumerMessage var nextMessage ConsumerMessage var nextMessageSize int if queueMsg != nil { + // Do not read from the queued message channel since there is already a message polled in the last loop + queueMsgCh = nil nextMessage = ConsumerMessage{ Consumer: pc.parentConsumer, Message: queueMsg, @@ -1568,8 +1574,6 @@ func (pc *partitionConsumer) dispatcher() { } else { pc.log.Debug("skip dispatching messages when seeking") } - } else { - queueCh = pc.queueCh.outCh } select { @@ -1607,7 +1611,7 @@ func (pc *partitionConsumer) dispatcher() { pc.log.Debug("received dispatcherSeekingControlCh, set isSeek to true") pc.isSeeking.Store(true) - case msg, ok := <-queueCh: + case msg, ok := <-queueMsgCh: if !ok { return } @@ -1630,9 +1634,9 @@ func (pc *partitionConsumer) dispatcher() { // drain the message queue on any new connection by sending a // special nil message to the channel so we know when to stop dropping messages var nextMessageInQueue *trackingMessageID - pc.queueCh.inCh <- nil + pc.queueInCh <- nil - for m := range pc.queueCh.outCh { + for m := range pc.queueOutCh { // the queue has been drained if m == nil { break @@ -2080,7 +2084,7 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() { } func (pc *partitionConsumer) markScaleIfNeed() { - // availablePermits + incomingMessages (messages in queueCh) is the number of prefetched messages + // availablePermits + incomingMessages (messages in pendingMessages) is the number of prefetched messages // The result of auto-scale we expected is currentQueueSize is slightly bigger than prefetched messages prev := pc.scaleReceiverQueueHint.Swap(pc.availablePermits.get()+pc.incomingMessages.Load() >= pc.currentQueueSize.Load()) @@ -2220,6 +2224,42 @@ func (pc *partitionConsumer) _getConn() internal.Connection { return *pc.conn.Load() } +func (pc *partitionConsumer) startQueueMessagesFromBroker() { + pc.queueInCh = make(chan *message) + pc.queueOutCh = make(chan *message) + pc.pendingMessages = list.New() + + go func() { + defer func() { + close(pc.queueInCh) + close(pc.queueOutCh) + pc.log.Debug("exiting queueMessagesFromBroker") + }() + + for { + front := pc.pendingMessages.Front() + if front == nil { + select { + case msg := <-pc.queueInCh: + pc.pendingMessages.PushBack(msg) + case <-pc.closeCh: + return + } + } else { + msg := front.Value.(*message) + select { + case pc.queueOutCh <- msg: + pc.pendingMessages.Remove(front) + case msg := <-pc.queueInCh: + pc.pendingMessages.PushBack(msg) + case <-pc.closeCh: + return + } + } + } + }() +} + func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData { if msgID == nil { return nil diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 6c2a7a153..ec8edc21a 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -31,7 +31,7 @@ import ( func TestSingleMessageIDNoAckTracker(t *testing.T) { eventsCh := make(chan interface{}, 1) pc := partitionConsumer{ - queueCh: newUnboundedChannel[*message](), + closeCh: make(chan struct{}), eventsCh: eventsCh, compressionProviders: sync.Map{}, options: &partitionConsumerOpts{}, @@ -41,6 +41,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) + pc.startQueueMessagesFromBroker() headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -48,7 +49,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { } // ensure the tracker was set on the message id - message := <-pc.queueCh.outCh + message := <-pc.queueOutCh id := message.ID().(*trackingMessageID) assert.Nil(t, id.tracker) @@ -69,7 +70,7 @@ func newTestMetrics() *internal.LeveledMetrics { func TestBatchMessageIDNoAckTracker(t *testing.T) { eventsCh := make(chan interface{}, 1) pc := partitionConsumer{ - queueCh: newUnboundedChannel[*message](), + closeCh: make(chan struct{}), eventsCh: eventsCh, compressionProviders: sync.Map{}, options: &partitionConsumerOpts{}, @@ -79,6 +80,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) + pc.startQueueMessagesFromBroker() headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -86,7 +88,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { } // ensure the tracker was set on the message id - message := <-pc.queueCh.outCh + message := <-pc.queueOutCh id := message.ID().(*trackingMessageID) assert.Nil(t, id.tracker) @@ -104,7 +106,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { func TestBatchMessageIDWithAckTracker(t *testing.T) { eventsCh := make(chan interface{}, 1) pc := partitionConsumer{ - queueCh: newUnboundedChannel[*message](), + closeCh: make(chan struct{}), eventsCh: eventsCh, compressionProviders: sync.Map{}, options: &partitionConsumerOpts{}, @@ -114,6 +116,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) + pc.startQueueMessagesFromBroker() headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -125,7 +128,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { running := true for running { select { - case m := <-pc.queueCh.outCh: + case m := <-pc.queueOutCh: id := m.ID().(*trackingMessageID) assert.NotNil(t, id.tracker) messageIDs = append(messageIDs, id) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 72e3910ed..e3bf9bfb8 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4887,6 +4887,7 @@ func TestAckResponseNotBlocked(t *testing.T) { defer client.Close() topic := fmt.Sprintf("test-ack-response-not-blocked-%v", time.Now().Nanosecond()) + assert.Nil(t, createPartitionedTopic(topic, 10)) producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, @@ -4894,7 +4895,7 @@ func TestAckResponseNotBlocked(t *testing.T) { assert.Nil(t, err) ctx := context.Background() - numMessages := 100 + numMessages := 1000 for i := 0; i < numMessages; i++ { producer.SendAsync(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("value-%d", i)), @@ -4903,7 +4904,9 @@ func TestAckResponseNotBlocked(t *testing.T) { t.Fatal(err) } }) - time.Sleep(1 * time.Millisecond) + if i%100 == 99 { + assert.Nil(t, producer.Flush()) + } } producer.Flush() producer.Close() @@ -4917,7 +4920,7 @@ func TestAckResponseNotBlocked(t *testing.T) { Type: KeyShared, EnableBatchIndexAcknowledgment: true, AckWithResponse: true, - ReceiverQueueSize: 10, + ReceiverQueueSize: 5, }) assert.Nil(t, err) msgIDs := make([]MessageID, 0) @@ -4925,7 +4928,6 @@ func TestAckResponseNotBlocked(t *testing.T) { if msg, err := consumer.Receive(context.Background()); err != nil { t.Fatal(err) } else { - t.Log("Received message: ", msg.ID()) msgIDs = append(msgIDs, msg.ID()) if len(msgIDs) >= 10 { if err := consumer.AckIDList(msgIDs); err != nil { diff --git a/pulsar/unbounded_channel.go b/pulsar/unbounded_channel.go deleted file mode 100644 index 902252540..000000000 --- a/pulsar/unbounded_channel.go +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pulsar - -import ( - "container/list" -) - -type unboundedChannel[T interface{}] struct { - values *list.List - inCh chan<- T - outCh <-chan T - closeCh chan struct{} -} - -func newUnboundedChannel[T interface{}]() *unboundedChannel[T] { - inCh := make(chan T) - outCh := make(chan T) - c := &unboundedChannel[T]{ - values: list.New(), - inCh: inCh, - outCh: outCh, - closeCh: make(chan struct{}), - } - go func() { - for { - front := c.values.Front() - var ch chan T - var value T - if front != nil { - value = front.Value.(T) - ch = outCh - } - // A send to a nil channel blocks forever so when no values are available, - // it would never send a value to ch - select { - case v := <-inCh: - c.values.PushBack(v) - case ch <- value: - c.values.Remove(front) - case <-c.closeCh: - close(inCh) - close(outCh) - return - } - } - }() - return c -} - -func (c *unboundedChannel[T]) stop() { - c.closeCh <- struct{}{} -} diff --git a/pulsar/unbounded_channel_test.go b/pulsar/unbounded_channel_test.go deleted file mode 100644 index a6ef8d0a6..000000000 --- a/pulsar/unbounded_channel_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pulsar - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestUnboundedChannel(t *testing.T) { - c := newUnboundedChannel[int]() - defer c.stop() - go func() { - for i := 0; i < 10; i++ { - c.inCh <- i - } - }() - - for i := 0; i < 10; i++ { - v := <-c.outCh - assert.Equal(t, i, v) - } - - go func() { - time.Sleep(1 * time.Second) - c.inCh <- -1 - }() - start := time.Now() - v := <-c.outCh - elapsed := time.Since(start) - assert.Equal(t, v, -1) - // Verify the read blocks for at least 500ms - assert.True(t, elapsed >= 500*time.Millisecond) - - // Verify the send values will not be blocked - for i := 0; i < 10000; i++ { - c.inCh <- i - } - for i := 0; i < 10000; i++ { - v := <-c.outCh - assert.Equal(t, i, v) - } -}