Skip to content

Commit 42ded0d

Browse files
Optimize performance by passing MessageID implementations by pointers (#968)
### Motivation Currently there are three implementations of the `MessageID` interface: - `messageID`: 24 bytes - `trackingMessageID`: 64 bytes - `chunkMessageID`: 80 bytes However, for all methods of them, the receiver is a value rather than a pointer. It's inefficient because each time a method is called, the copy would happen. Reference: https://go.dev/tour/methods/8 ### Modifications - Change the receiver from value to pointer for all `MessageID` implementations. - Use pointers as the returned values and function parameters for these implementations everywhere. The `trackingMessageID.Undefined` method is removed because it's never used now. Though it's a public method, the struct and its factory function are not exposed, so I think it's reasonable. Remove the benchmark added in #324. The result is obvious and this test is meaningless. I tried passing the `trackingMessageID` by pointer and the result reduced from 8.548 ns/op to 1.628 ns/op. It's obvious because a pointer is only 8 bytes while a `trackingMessageID` is 64 bytes. The overhead of accessing by pointers is far less than copying the extra bytes.
1 parent 5fa431d commit 42ded0d

16 files changed

+216
-275
lines changed

pulsar/ack_grouping_tracker_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,14 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
184184
func TestTimedTrackerIsDuplicate(t *testing.T) {
185185
tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {})
186186

187-
tracker.add(messageID{batchIdx: 0, batchSize: 3})
188-
tracker.add(messageID{batchIdx: 2, batchSize: 3})
189-
assert.True(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 3}))
190-
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 3}))
191-
assert.True(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 3}))
187+
tracker.add(&messageID{batchIdx: 0, batchSize: 3})
188+
tracker.add(&messageID{batchIdx: 2, batchSize: 3})
189+
assert.True(t, tracker.isDuplicate(&messageID{batchIdx: 0, batchSize: 3}))
190+
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 1, batchSize: 3}))
191+
assert.True(t, tracker.isDuplicate(&messageID{batchIdx: 2, batchSize: 3}))
192192

193193
tracker.flush()
194-
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 3}))
195-
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 3}))
196-
assert.False(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 3}))
194+
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 0, batchSize: 3}))
195+
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 1, batchSize: 3}))
196+
assert.False(t, tracker.isDuplicate(&messageID{batchIdx: 2, batchSize: 3}))
197197
}

pulsar/consumer_impl.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
383383
metadata: metadata,
384384
subProperties: subProperties,
385385
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
386-
startMessageID: trackingMessageID{},
386+
startMessageID: nil,
387387
subscriptionMode: durable,
388388
readCompacted: c.options.ReadCompacted,
389389
interceptors: c.options.Interceptors,
@@ -531,8 +531,8 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
531531
if delay < 0 {
532532
delay = 0
533533
}
534-
msgID, ok := c.messageID(msg.ID())
535-
if !ok {
534+
msgID := c.messageID(msg.ID())
535+
if msgID == nil {
536536
return
537537
}
538538
props := make(map[string]string)
@@ -581,8 +581,8 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
581581

582582
func (c *consumer) Nack(msg Message) {
583583
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
584-
mid, ok := c.messageID(msg.ID())
585-
if !ok {
584+
mid := c.messageID(msg.ID())
585+
if mid == nil {
586586
return
587587
}
588588

@@ -743,22 +743,22 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I
743743
return pb.CommandSubscribe_Latest
744744
}
745745

746-
func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
747-
mid, ok := toTrackingMessageID(msgID)
748-
if !ok {
746+
func (c *consumer) messageID(msgID MessageID) *trackingMessageID {
747+
mid := toTrackingMessageID(msgID)
748+
if mid == nil {
749749
c.log.Warnf("invalid message id type %T", msgID)
750-
return trackingMessageID{}, false
750+
return nil
751751
}
752752

753753
partition := int(mid.partitionIdx)
754754
// did we receive a valid partition index?
755755
if partition < 0 || partition >= len(c.consumers) {
756756
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
757757
partition, len(c.consumers))
758-
return trackingMessageID{}, false
758+
return nil
759759
}
760760

761-
return mid, true
761+
return mid
762762
}
763763

764764
func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {

pulsar/consumer_multitopic.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func (c *multiTopicConsumer) Ack(msg Message) error {
125125

126126
// AckID the consumption of a single message, identified by its MessageID
127127
func (c *multiTopicConsumer) AckID(msgID MessageID) error {
128-
mid, ok := toTrackingMessageID(msgID)
129-
if !ok {
128+
mid := toTrackingMessageID(msgID)
129+
if mid == nil {
130130
c.log.Warnf("invalid message id type %T", msgID)
131131
return errors.New("invalid message id type in multi_consumer")
132132
}
@@ -152,8 +152,8 @@ func (c *multiTopicConsumer) AckCumulative(msg Message) error {
152152
// AckIDCumulative the reception of all the messages in the stream up to (and including)
153153
// the provided message, identified by its MessageID
154154
func (c *multiTopicConsumer) AckIDCumulative(msgID MessageID) error {
155-
mid, ok := toTrackingMessageID(msgID)
156-
if !ok {
155+
mid := toTrackingMessageID(msgID)
156+
if mid == nil {
157157
c.log.Warnf("invalid message id type %T", msgID)
158158
return errors.New("invalid message id type in multi_consumer")
159159
}
@@ -203,8 +203,8 @@ func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message, cus
203203
func (c *multiTopicConsumer) Nack(msg Message) {
204204
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
205205
msgID := msg.ID()
206-
mid, ok := toTrackingMessageID(msgID)
207-
if !ok {
206+
mid := toTrackingMessageID(msgID)
207+
if mid == nil {
208208
c.log.Warnf("invalid message id type %T", msgID)
209209
return
210210
}
@@ -221,8 +221,8 @@ func (c *multiTopicConsumer) Nack(msg Message) {
221221
}
222222

223223
func (c *multiTopicConsumer) NackID(msgID MessageID) {
224-
mid, ok := toTrackingMessageID(msgID)
225-
if !ok {
224+
mid := toTrackingMessageID(msgID)
225+
if mid == nil {
226226
c.log.Warnf("invalid message id type %T", msgID)
227227
return
228228
}

0 commit comments

Comments
 (0)