From 6fcaf266480c5e7fe258a67d028cb5c69b835b39 Mon Sep 17 00:00:00 2001 From: dferstay Date: Wed, 8 Jul 2020 14:17:41 -0700 Subject: [PATCH] Reduce the size of the MessageID structs by one word on 64-bit arch (#316) An int occupies one word of memory; on 64-bit machines, this is 8 bytes. As a result, the messageID struct is 56-bytes: * ledgerID - 8 bytes * entryID - 8 bytes * batchIdx - 8 bytes * partitionIdx - 8 bytes * tracker - 8 bytes * consumer - 16 bytes (1 word for type, 1 word for data address) This commit changes the type of batchIdx and partitionIdx fields to int32 which saves one word of memory and maintains alignment of struct fields. Reducing the size of the MessageID structs is important as they are currently allocated on the heap for every message produced or consumed. Signed-off-by: Daniel Ferstay Co-authored-by: Daniel Ferstay --- pulsar/consumer_impl.go | 2 +- pulsar/consumer_partition.go | 8 ++++---- pulsar/impl_message.go | 18 +++++++++--------- pulsar/impl_message_test.go | 4 ++-- pulsar/producer_partition.go | 6 +++--- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0a3220a78e..20dc1afe8f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -459,7 +459,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID, bool) { return nil, false } - partition := mid.partitionIdx + partition := int(mid.partitionIdx) // did we receive a valid partition index? if partition < 0 || partition >= len(c.consumers) { c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b4498f6c8f..cffc2b43e2 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -83,7 +83,7 @@ type partitionConsumer struct { topic string name string consumerID uint64 - partitionIdx int + partitionIdx int32 // shared channel messageCh chan ConsumerMessage @@ -120,7 +120,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon topic: options.topic, name: options.consumerName, consumerID: client.rpcClient.NewConsumerID(), - partitionIdx: options.partitionIdx, + partitionIdx: int32(options.partitionIdx), eventsCh: make(chan interface{}, 3), queueSize: int32(options.receiverQueueSize), queueCh: make(chan []*message, options.receiverQueueSize), @@ -400,7 +400,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID := newTrackingMessageID( int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), - i, + int32(i), pc.partitionIdx, ackTracker) @@ -923,7 +923,7 @@ func convertToMessageID(id *pb.MessageIdData) *messageID { } if id.BatchIndex != nil { - msgID.batchIdx = int(*id.BatchIndex) + msgID.batchIdx = *id.BatchIndex } return msgID diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 6aa42235a8..d9574cde1e 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -31,8 +31,8 @@ import ( type messageID struct { ledgerID int64 entryID int64 - batchIdx int - partitionIdx int + batchIdx int32 + partitionIdx int32 tracker *ackTracker consumer acker @@ -56,7 +56,7 @@ func (id *messageID) Nack() { func (id *messageID) ack() bool { if id.tracker != nil && id.batchIdx > -1 { - return id.tracker.ack(id.batchIdx) + return id.tracker.ack(int(id.batchIdx)) } return true } @@ -87,8 +87,8 @@ func (id *messageID) Serialize() []byte { msgID := &pb.MessageIdData{ LedgerId: proto.Uint64(uint64(id.ledgerID)), EntryId: proto.Uint64(uint64(id.entryID)), - BatchIndex: proto.Int(id.batchIdx), - Partition: proto.Int(id.partitionIdx), + BatchIndex: proto.Int(int(id.batchIdx)), + Partition: proto.Int(int(id.partitionIdx)), } data, _ := proto.Marshal(msgID) return data @@ -103,13 +103,13 @@ func deserializeMessageID(data []byte) (MessageID, error) { id := newMessageID( int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()), - int(msgID.GetBatchIndex()), - int(msgID.GetPartition()), + msgID.GetBatchIndex(), + msgID.GetPartition(), ) return id, nil } -func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID { +func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID { return &messageID{ ledgerID: ledgerID, entryID: entryID, @@ -118,7 +118,7 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) } } -func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int, +func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, tracker *ackTracker) *messageID { return &messageID{ ledgerID: ledgerID, diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go index 54c3bee092..164cff6fc9 100644 --- a/pulsar/impl_message_test.go +++ b/pulsar/impl_message_test.go @@ -33,8 +33,8 @@ func TestMessageId(t *testing.T) { assert.Equal(t, int64(1), id2.(*messageID).ledgerID) assert.Equal(t, int64(2), id2.(*messageID).entryID) - assert.Equal(t, 3, id2.(*messageID).batchIdx) - assert.Equal(t, 4, id2.(*messageID).partitionIdx) + assert.Equal(t, int32(3), id2.(*messageID).batchIdx) + assert.Equal(t, int32(4), id2.(*messageID).partitionIdx) id, err = DeserializeMessageID(nil) assert.Error(t, err) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e4765fc480..239fcd6b4c 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -69,7 +69,7 @@ type partitionProducer struct { pendingQueue internal.BlockingQueue lastSequenceID int64 - partitionIdx int + partitionIdx int32 } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) ( @@ -105,7 +105,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), lastSequenceID: -1, - partitionIdx: partitionIdx, + partitionIdx: int32(partitionIdx), } if options.Name != "" { @@ -442,7 +442,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) msgID := newMessageID( int64(response.MessageId.GetLedgerId()), int64(response.MessageId.GetEntryId()), - idx, + int32(idx), p.partitionIdx, ) sr.callback(msgID, sr.msg, nil)