Skip to content

Commit

Permalink
Optimize performance by passing MessageID implementations by pointers
Browse files Browse the repository at this point in the history
### Motivation

Currently there are three implementations of the `MessageID` interface:
- `messageID`: 24 bytes
- `trackingMessageID`: 64 bytes
- `chunkMessageID`: 80 bytes

However, for all methods of them, the receiver is a value rather than a
pointer. It's inefficient because each time a method is called, the copy
would happen.

Reference: https://go.dev/tour/methods/8

### Modifications

- Change the receiver from value to pointer for all `MessageID`
  implementations.
- Use pointers as the returned values and function parameters for these
  implementations everywhere.

The `trackingMessageID.Undefined` method is removed because it's never
used now. Though it's a public method, the struct and its factory
function are not exposed, so I think it's reasonable.

Remove the benchmark added in
apache#324. The result is
obvious and this test is meaningless. I tried passing the
`trackingMessageID` by pointer and the result reduced from 8.548 ns/op
to 1.628 ns/op. It's obvious because a pointer is only 8 bytes while a
`trackingMessageID` is 64 bytes. The overhead of accessing by pointers
is far less than copying the extra bytes.
  • Loading branch information
BewareMyPower committed Feb 27, 2023
1 parent e2ea255 commit 75ca0ae
Show file tree
Hide file tree
Showing 16 changed files with 210 additions and 272 deletions.
16 changes: 8 additions & 8 deletions pulsar/ack_grouping_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
func TestTimedTrackerIsDuplicate(t *testing.T) {
tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {})

tracker.add(messageID{batchIdx: 0, batchSize: 3})
tracker.add(messageID{batchIdx: 2, batchSize: 3})
assert.True(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 3}))
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 3}))
assert.True(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 3}))
tracker.add(&messageID{batchIdx: 0, batchSize: 3})
tracker.add(&messageID{batchIdx: 2, batchSize: 3})
assert.True(t, tracker.isDuplicate(&messageID{batchIdx: 0, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 1, batchSize: 3}))
assert.True(t, tracker.isDuplicate(&messageID{batchIdx: 2, batchSize: 3}))

tracker.flush()
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 3}))
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 3}))
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 0, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 1, batchSize: 3}))
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 2, batchSize: 3}))
}
22 changes: 11 additions & 11 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
startMessageID: nil,
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
Expand Down Expand Up @@ -531,8 +531,8 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
if delay < 0 {
delay = 0
}
msgID, ok := c.messageID(msg.ID())
if !ok {
msgID := c.messageID(msg.ID())
if msgID == nil {
return
}
props := make(map[string]string)
Expand Down Expand Up @@ -581,8 +581,8 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert

func (c *consumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
mid, ok := c.messageID(msg.ID())
if !ok {
mid := c.messageID(msg.ID())
if mid == nil {
return
}

Expand Down Expand Up @@ -743,22 +743,22 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}

func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
func (c *consumer) messageID(msgID MessageID) *trackingMessageID {
mid := toTrackingMessageID(msgID)
if mid == nil {
c.log.Warnf("invalid message id type %T", msgID)
return trackingMessageID{}, false
return nil
}

partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return trackingMessageID{}, false
return nil
}

return mid, true
return mid
}

func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {
Expand Down
16 changes: 8 additions & 8 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (c *multiTopicConsumer) Ack(msg Message) error {

// AckID the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
mid := toTrackingMessageID(msgID)
if mid == nil {
c.log.Warnf("invalid message id type %T", msgID)
return errors.New("invalid message id type in multi_consumer")
}
Expand All @@ -152,8 +152,8 @@ func (c *multiTopicConsumer) AckCumulative(msg Message) error {
// AckIDCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
func (c *multiTopicConsumer) AckIDCumulative(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
mid := toTrackingMessageID(msgID)
if mid == nil {
c.log.Warnf("invalid message id type %T", msgID)
return errors.New("invalid message id type in multi_consumer")
}
Expand Down Expand Up @@ -203,8 +203,8 @@ func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message, cus
func (c *multiTopicConsumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
msgID := msg.ID()
mid, ok := toTrackingMessageID(msgID)
if !ok {
mid := toTrackingMessageID(msgID)
if mid == nil {
c.log.Warnf("invalid message id type %T", msgID)
return
}
Expand All @@ -221,8 +221,8 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}

func (c *multiTopicConsumer) NackID(msgID MessageID) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
mid := toTrackingMessageID(msgID)
if mid == nil {
c.log.Warnf("invalid message id type %T", msgID)
return
}
Expand Down
Loading

0 comments on commit 75ca0ae

Please sign in to comment.