Skip to content

Commit

Permalink
Different MessageID implementations for message Production and Consum…
Browse files Browse the repository at this point in the history
…ption

This change splits the `MessageID` implementation in two:
1. `messageID` - A 24 byte structure that contains message identification
                 information only; to be used during message production
2. `trackingMessageID` - A 72 byte structucture that shares the same
                         message identification information as `messageID`
                         and adds `ackTracker`, `acker`, and `receivedTime`
                         fields; to be used during message consumption

Micro benchmarks show that passing arguments by value that are less-than
four words of memory are optimized by the Go runtime.  Results from the
pulsar/impl_message_bench_test.go module are below.

```
name            time/op
ProducerCall    1.46ns ± 5%
ProducerCall-4  1.47ns ± 5%
ConsumerCall    7.62ns ± 1%
ConsumerCall-4  7.53ns ± 5%
```
  • Loading branch information
Daniel Ferstay committed Jul 14, 2020
1 parent 220dcb4 commit 593d7c1
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 93 deletions.
14 changes: 7 additions & 7 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute

type acker interface {
AckID(id messageID)
NackID(id messageID)
AckID(id trackingMessageID)
NackID(id trackingMessageID)
}

type consumer struct {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: messageID{},
startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
}
Expand Down Expand Up @@ -483,19 +483,19 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}

func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
mid, ok := msgID.(messageID)
func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return messageID{}, false
return trackingMessageID{}, false
}

partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return messageID{}, false
return trackingMessageID{}, false
}

return mid, true
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *multiTopicConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand All @@ -139,7 +139,7 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}

func (c *multiTopicConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand Down
104 changes: 58 additions & 46 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
metadata map[string]string
replicateSubscriptionState bool
startMessageID messageID
startMessageID trackingMessageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
Expand Down Expand Up @@ -140,13 +140,13 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
startMessageID messageID
lastDequeuedMsg messageID
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID

eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id messageID)
clearQueueCh chan func(id trackingMessageID)

nackTracker *negativeAcksTracker
dlq *dlqRouter
Expand Down Expand Up @@ -174,7 +174,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id messageID)),
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
Expand Down Expand Up @@ -238,7 +238,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.state = consumerClosed
}

func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

Expand Down Expand Up @@ -266,8 +266,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
}
}

func (pc *partitionConsumer) AckID(msgID messageID) {
if !msgID.IsZero() && msgID.ack() {
func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
if !msgID.Undefined() && msgID.ack() {
acksCounter.Inc()
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
req := &ackRequest{
Expand All @@ -277,8 +277,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
}
}

func (pc *partitionConsumer) NackID(msgID messageID) {
pc.nackTracker.Add(msgID)
func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
pc.nackTracker.Add(msgID.messageID)
nacksCounter.Inc()
}

Expand Down Expand Up @@ -317,7 +317,7 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}

func (pc *partitionConsumer) Seek(msgID messageID) error {
func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
Expand Down Expand Up @@ -506,17 +506,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return nil
}

func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
if pc.startMessageID.IsZero() {
func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) bool {
if pc.startMessageID.Undefined() {
return false
}

if pc.options.startMessageIDInclusive {
return pc.startMessageID.greater(msgID)
return pc.startMessageID.greater(msgID.messageID)
}

// Non inclusive
return pc.startMessageID.greaterEqual(msgID)
return pc.startMessageID.greaterEqual(msgID.messageID)
}

func (pc *partitionConsumer) ConnectionClosed() {
Expand Down Expand Up @@ -631,16 +631,16 @@ func (pc *partitionConsumer) dispatcher() {
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
var nextMessageInQueue messageID
var nextMessageInQueue trackingMessageID
go func() {
pc.queueCh <- nil
}()
for m := range pc.queueCh {
// the queue has been drained
if m == nil {
break
} else if nextMessageInQueue.IsZero() {
nextMessageInQueue = m[0].msgID.(messageID)
} else if nextMessageInQueue.Undefined() {
nextMessageInQueue = m[0].msgID.(trackingMessageID)
}
}

Expand All @@ -650,7 +650,7 @@ func (pc *partitionConsumer) dispatcher() {
}

type ackRequest struct {
msgID messageID
msgID trackingMessageID
}

type unsubscribeRequest struct {
Expand All @@ -668,13 +668,13 @@ type redeliveryRequest struct {

type getLastMsgIDRequest struct {
doneCh chan struct{}
msgID messageID
msgID trackingMessageID
err error
}

type seekRequest struct {
doneCh chan struct{}
msgID messageID
msgID trackingMessageID
err error
}

Expand Down Expand Up @@ -854,15 +854,15 @@ func (pc *partitionConsumer) grabConn() error {
}
}

func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
if pc.state != consumerReady {
return messageID{}
return trackingMessageID{}
}
wg := &sync.WaitGroup{}
wg.Add(1)
var msgID messageID
var msgID trackingMessageID

pc.clearQueueCh <- func(id messageID) {
pc.clearQueueCh <- func(id trackingMessageID) {
msgID = id
wg.Done()
}
Expand All @@ -875,12 +875,12 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application
*/
func (pc *partitionConsumer) clearReceiverQueue() messageID {
func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()

if !nextMessageInQueue.IsZero() {
if !nextMessageInQueue.Undefined() {
return getPreviousMessage(nextMessageInQueue)
} else if !pc.lastDequeuedMsg.IsZero() {
} else if !pc.lastDequeuedMsg.Undefined() {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
Expand All @@ -890,22 +890,32 @@ func (pc *partitionConsumer) clearReceiverQueue() messageID {
}
}

func getPreviousMessage(mid messageID) messageID {
func getPreviousMessage(mid trackingMessageID) trackingMessageID {
if mid.batchIdx >= 0 {
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
partitionIdx: mid.partitionIdx,
return trackingMessageID{
messageID: messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
partitionIdx: mid.partitionIdx,
},
tracker: mid.tracker,
consumer: mid.consumer,
receivedTime: mid.receivedTime,
}
}

// Get on previous message in previous entry
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
partitionIdx: mid.partitionIdx,
return trackingMessageID{
messageID: messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
partitionIdx: mid.partitionIdx,
},
tracker: mid.tracker,
consumer: mid.consumer,
receivedTime: mid.receivedTime,
}
}

Expand Down Expand Up @@ -961,8 +971,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}

func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
if msgID.IsZero() {
func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
if msgID.Undefined() {
return nil
}

Expand All @@ -972,14 +982,16 @@ func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
}
}

func convertToMessageID(id *pb.MessageIdData) messageID {
func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
if id == nil {
return messageID{}
return trackingMessageID{}
}

msgID := messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
msgID := trackingMessageID{
messageID: messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
},
}
if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
Expand Down
14 changes: 7 additions & 7 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(messageID).tracker)
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(messageID))
pc.AckID(messages[0].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand All @@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(messageID).tracker)
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(messageID))
pc.AckID(messages[0].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand All @@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(messageID).tracker)
assert.NotNil(t, m.ID().(trackingMessageID).tracker)
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
pc.AckID(messages[i].msgID.(messageID))
pc.AckID(messages[i].msgID.(trackingMessageID))
}

select {
Expand All @@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}

// ack last message
pc.AckID(messages[9].msgID.(messageID))
pc.AckID(messages[9].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *regexConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand All @@ -184,7 +184,7 @@ func (c *regexConsumer) Nack(msg Message) {
}

func (c *regexConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand Down
Loading

0 comments on commit 593d7c1

Please sign in to comment.