Skip to content

Commit

Permalink
Reduce the size of the MessageID structs by one word on 64-bit arch (#…
Browse files Browse the repository at this point in the history
…316)

An int occupies one word of memory; on 64-bit machines, this is 8 bytes.

As a result, the messageID struct is 56-bytes:
* ledgerID - 8 bytes
* entryID - 8 bytes
* batchIdx - 8 bytes
* partitionIdx - 8 bytes
* tracker - 8 bytes
* consumer - 16 bytes (1 word for type, 1 word for data address)

This commit changes the type of batchIdx and partitionIdx fields to int32
which saves one word of memory and maintains alignment of struct fields.

Reducing the size of the MessageID structs is important as they are
currently allocated on the heap for every message produced or consumed.

Signed-off-by: Daniel Ferstay <dferstay@splunk.com>

Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
  • Loading branch information
dferstay and Daniel Ferstay authored Jul 8, 2020
1 parent 6a80299 commit 6fcaf26
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
return nil, false
}

partition := mid.partitionIdx
partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
Expand Down
8 changes: 4 additions & 4 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type partitionConsumer struct {
topic string
name string
consumerID uint64
partitionIdx int
partitionIdx int32

// shared channel
messageCh chan ConsumerMessage
Expand Down Expand Up @@ -120,7 +120,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: options.partitionIdx,
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 3),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
Expand Down Expand Up @@ -400,7 +400,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
msgID := newTrackingMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
i,
int32(i),
pc.partitionIdx,
ackTracker)

Expand Down Expand Up @@ -923,7 +923,7 @@ func convertToMessageID(id *pb.MessageIdData) *messageID {
}

if id.BatchIndex != nil {
msgID.batchIdx = int(*id.BatchIndex)
msgID.batchIdx = *id.BatchIndex
}

return msgID
Expand Down
18 changes: 9 additions & 9 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
type messageID struct {
ledgerID int64
entryID int64
batchIdx int
partitionIdx int
batchIdx int32
partitionIdx int32

tracker *ackTracker
consumer acker
Expand All @@ -56,7 +56,7 @@ func (id *messageID) Nack() {

func (id *messageID) ack() bool {
if id.tracker != nil && id.batchIdx > -1 {
return id.tracker.ack(id.batchIdx)
return id.tracker.ack(int(id.batchIdx))
}
return true
}
Expand Down Expand Up @@ -87,8 +87,8 @@ func (id *messageID) Serialize() []byte {
msgID := &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(id.ledgerID)),
EntryId: proto.Uint64(uint64(id.entryID)),
BatchIndex: proto.Int(id.batchIdx),
Partition: proto.Int(id.partitionIdx),
BatchIndex: proto.Int(int(id.batchIdx)),
Partition: proto.Int(int(id.partitionIdx)),
}
data, _ := proto.Marshal(msgID)
return data
Expand All @@ -103,13 +103,13 @@ func deserializeMessageID(data []byte) (MessageID, error) {
id := newMessageID(
int64(msgID.GetLedgerId()),
int64(msgID.GetEntryId()),
int(msgID.GetBatchIndex()),
int(msgID.GetPartition()),
msgID.GetBatchIndex(),
msgID.GetPartition(),
)
return id, nil
}

func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID {
return &messageID{
ledgerID: ledgerID,
entryID: entryID,
Expand All @@ -118,7 +118,7 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int)
}
}

func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int,
func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32,
tracker *ackTracker) *messageID {
return &messageID{
ledgerID: ledgerID,
Expand Down
4 changes: 2 additions & 2 deletions pulsar/impl_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestMessageId(t *testing.T) {

assert.Equal(t, int64(1), id2.(*messageID).ledgerID)
assert.Equal(t, int64(2), id2.(*messageID).entryID)
assert.Equal(t, 3, id2.(*messageID).batchIdx)
assert.Equal(t, 4, id2.(*messageID).partitionIdx)
assert.Equal(t, int32(3), id2.(*messageID).batchIdx)
assert.Equal(t, int32(4), id2.(*messageID).partitionIdx)

id, err = DeserializeMessageID(nil)
assert.Error(t, err)
Expand Down
6 changes: 3 additions & 3 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type partitionProducer struct {
pendingQueue internal.BlockingQueue
lastSequenceID int64

partitionIdx int
partitionIdx int32
}

func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) (
Expand Down Expand Up @@ -105,7 +105,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: partitionIdx,
partitionIdx: int32(partitionIdx),
}

if options.Name != "" {
Expand Down Expand Up @@ -442,7 +442,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
msgID := newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
idx,
int32(idx),
p.partitionIdx,
)
sr.callback(msgID, sr.msg, nil)
Expand Down

0 comments on commit 6fcaf26

Please sign in to comment.