Skip to content

Commit

Permalink
Different MessageID implementations for message Production and Consum…
Browse files Browse the repository at this point in the history
…ption (#324)

This change splits the `MessageID` implementation in two:
1. `messageID` - A 24 byte structure that contains message identification
                 information only; to be used during message production
2. `trackingMessageID` - A 72 byte structucture that shares the same
                         message identification information as `messageID`
                         and adds `ackTracker`, `acker`, and `receivedTime`
                         fields; to be used during message consumption

Micro benchmarks show that passing arguments by value that are less-than
four words of memory are optimized by the Go runtime.  Results from the
pulsar/impl_message_bench_test.go module are below.

```
name            time/op
ProducerCall    1.46ns ± 5%
ProducerCall-4  1.47ns ± 5%
ConsumerCall    7.62ns ± 1%
ConsumerCall-4  7.53ns ± 5%
```

Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
  • Loading branch information
dferstay and Daniel Ferstay authored Jul 24, 2020
1 parent c75aa62 commit c6d905d
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 93 deletions.
14 changes: 7 additions & 7 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute

type acker interface {
AckID(id messageID)
NackID(id messageID)
AckID(id trackingMessageID)
NackID(id trackingMessageID)
}

type consumer struct {
Expand Down Expand Up @@ -263,7 +263,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: messageID{},
startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
Expand Down Expand Up @@ -488,19 +488,19 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}

func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
mid, ok := msgID.(messageID)
func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return messageID{}, false
return trackingMessageID{}, false
}

partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return messageID{}, false
return trackingMessageID{}, false
}

return mid, true
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *multiTopicConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand All @@ -139,7 +139,7 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}

func (c *multiTopicConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand Down
104 changes: 58 additions & 46 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
metadata map[string]string
replicateSubscriptionState bool
startMessageID messageID
startMessageID trackingMessageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
Expand Down Expand Up @@ -141,13 +141,13 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
startMessageID messageID
lastDequeuedMsg messageID
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID

eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id messageID)
clearQueueCh chan func(id trackingMessageID)

nackTracker *negativeAcksTracker
dlq *dlqRouter
Expand Down Expand Up @@ -175,7 +175,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id messageID)),
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
Expand Down Expand Up @@ -241,7 +241,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.state = consumerClosed
}

func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

Expand Down Expand Up @@ -269,8 +269,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
}
}

func (pc *partitionConsumer) AckID(msgID messageID) {
if !msgID.IsZero() && msgID.ack() {
func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
if !msgID.Undefined() && msgID.ack() {
acksCounter.Inc()
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
req := &ackRequest{
Expand All @@ -282,8 +282,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
}
}

func (pc *partitionConsumer) NackID(msgID messageID) {
pc.nackTracker.Add(msgID)
func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
pc.nackTracker.Add(msgID.messageID)
nacksCounter.Inc()
}

Expand Down Expand Up @@ -328,7 +328,7 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}

func (pc *partitionConsumer) Seek(msgID messageID) error {
func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
Expand Down Expand Up @@ -522,17 +522,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return nil
}

func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
if pc.startMessageID.IsZero() {
func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) bool {
if pc.startMessageID.Undefined() {
return false
}

if pc.options.startMessageIDInclusive {
return pc.startMessageID.greater(msgID)
return pc.startMessageID.greater(msgID.messageID)
}

// Non inclusive
return pc.startMessageID.greaterEqual(msgID)
return pc.startMessageID.greaterEqual(msgID.messageID)
}

func (pc *partitionConsumer) ConnectionClosed() {
Expand Down Expand Up @@ -647,16 +647,16 @@ func (pc *partitionConsumer) dispatcher() {
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
var nextMessageInQueue messageID
var nextMessageInQueue trackingMessageID
go func() {
pc.queueCh <- nil
}()
for m := range pc.queueCh {
// the queue has been drained
if m == nil {
break
} else if nextMessageInQueue.IsZero() {
nextMessageInQueue = m[0].msgID.(messageID)
} else if nextMessageInQueue.Undefined() {
nextMessageInQueue = m[0].msgID.(trackingMessageID)
}
}

Expand All @@ -666,7 +666,7 @@ func (pc *partitionConsumer) dispatcher() {
}

type ackRequest struct {
msgID messageID
msgID trackingMessageID
}

type unsubscribeRequest struct {
Expand All @@ -684,13 +684,13 @@ type redeliveryRequest struct {

type getLastMsgIDRequest struct {
doneCh chan struct{}
msgID messageID
msgID trackingMessageID
err error
}

type seekRequest struct {
doneCh chan struct{}
msgID messageID
msgID trackingMessageID
err error
}

Expand Down Expand Up @@ -870,15 +870,15 @@ func (pc *partitionConsumer) grabConn() error {
}
}

func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
if pc.state != consumerReady {
return messageID{}
return trackingMessageID{}
}
wg := &sync.WaitGroup{}
wg.Add(1)
var msgID messageID
var msgID trackingMessageID

pc.clearQueueCh <- func(id messageID) {
pc.clearQueueCh <- func(id trackingMessageID) {
msgID = id
wg.Done()
}
Expand All @@ -891,12 +891,12 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application
*/
func (pc *partitionConsumer) clearReceiverQueue() messageID {
func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()

if !nextMessageInQueue.IsZero() {
if !nextMessageInQueue.Undefined() {
return getPreviousMessage(nextMessageInQueue)
} else if !pc.lastDequeuedMsg.IsZero() {
} else if !pc.lastDequeuedMsg.Undefined() {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
Expand All @@ -906,22 +906,32 @@ func (pc *partitionConsumer) clearReceiverQueue() messageID {
}
}

func getPreviousMessage(mid messageID) messageID {
func getPreviousMessage(mid trackingMessageID) trackingMessageID {
if mid.batchIdx >= 0 {
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
partitionIdx: mid.partitionIdx,
return trackingMessageID{
messageID: messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
partitionIdx: mid.partitionIdx,
},
tracker: mid.tracker,
consumer: mid.consumer,
receivedTime: mid.receivedTime,
}
}

// Get on previous message in previous entry
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
partitionIdx: mid.partitionIdx,
return trackingMessageID{
messageID: messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
partitionIdx: mid.partitionIdx,
},
tracker: mid.tracker,
consumer: mid.consumer,
receivedTime: mid.receivedTime,
}
}

Expand Down Expand Up @@ -977,8 +987,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}

func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
if msgID.IsZero() {
func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
if msgID.Undefined() {
return nil
}

Expand All @@ -988,14 +998,16 @@ func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
}
}

func convertToMessageID(id *pb.MessageIdData) messageID {
func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
if id == nil {
return messageID{}
return trackingMessageID{}
}

msgID := messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
msgID := trackingMessageID{
messageID: messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
},
}
if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
Expand Down
14 changes: 7 additions & 7 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(messageID).tracker)
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(messageID))
pc.AckID(messages[0].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand All @@ -75,11 +75,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(messageID).tracker)
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(messageID))
pc.AckID(messages[0].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand All @@ -105,12 +105,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(messageID).tracker)
assert.NotNil(t, m.ID().(trackingMessageID).tracker)
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
pc.AckID(messages[i].msgID.(messageID))
pc.AckID(messages[i].msgID.(trackingMessageID))
}

select {
Expand All @@ -120,7 +120,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}

// ack last message
pc.AckID(messages[9].msgID.(messageID))
pc.AckID(messages[9].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *regexConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand All @@ -184,7 +184,7 @@ func (c *regexConsumer) Nack(msg Message) {
}

func (c *regexConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand Down
Loading

0 comments on commit c6d905d

Please sign in to comment.