From cbf268dd5bff7d40dabc6d6b09919f90513c268b Mon Sep 17 00:00:00 2001 From: Keith Null Date: Tue, 26 May 2020 18:05:31 +0800 Subject: [PATCH] Add check for max message size 1. When creating a connection, try to get maxMessageSize from handshake response command. If it's not set, then use the default maxMessageSize value defined in the client side. 2. When sending a message, check whether the size of payload exceeds maxMessageSize. If so, return error immediately without adding this meesage into sending queue. 3. To implement these, I made some tiny modifications in Connection interface and added a field in its implementation struct. --- pulsar/internal/batch_builder.go | 2 -- pulsar/internal/commands.go | 4 ++++ pulsar/internal/connection.go | 15 ++++++++++++++- pulsar/producer_partition.go | 15 ++++++++++++++- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 80d8a001fc..3b54eba8df 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -29,8 +29,6 @@ import ( ) const ( - // MaxMessageSize limit message size for transfer - MaxMessageSize = 5 * 1024 * 1024 // MaxBatchSize will be the largest size for a batch sent from this particular producer. // This is used as a baseline to allocate a new buffer that can hold the entire batch // without needing costly re-allocations. diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index d9f2a1f783..71256a6d10 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -31,6 +31,10 @@ import ( const ( // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. MaxFrameSize = 5 * 1024 * 1024 + // MessageFramePadding is for metadata and other frame headers + MessageFramePadding = 10 * 1024 + // MaxMessageSize limit message size for transfer + MaxMessageSize = MaxFrameSize - MessageFramePadding magicCrc32c uint16 = 0x0e01 ) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index d02dedad6b..86e1f23c8f 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -71,6 +71,7 @@ type Connection interface { AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) ID() string + GetMaxMessageSize() int32 Close() } @@ -157,6 +158,8 @@ type connection struct { tlsOptions *TLSOptions auth auth.Provider + + maxMessageSize int32 } func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions, @@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool { cmd.Type) return false } - + if cmd.Connected.MaxMessageSize != nil { + c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize) + c.maxMessageSize = *cmd.Connected.MaxMessageSize + } else{ + c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize) + c.maxMessageSize = MaxMessageSize + } c.log.Info("Connection is ready") c.changeState(connectionReady) return true @@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) { func (c *connection) ID() string { return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr()) } + +func (c *connection) GetMaxMessageSize() int32{ + return c.maxMessageSize +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 63f7bf4bc4..e0fbe47e18 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -40,7 +40,10 @@ const ( producerClosed ) -var errFailAddBatch = errors.New("message send failed") +var ( + errFailAddBatch = errors.New("message send failed") + errMessageTooLarge = errors.New("message size exceeds MaxMessageSize") +) type partitionProducer struct { state int32 @@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg := request.msg + // if msg is too large + if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()){ + p.publishSemaphore.Release() + request.callback(nil, request.msg, errMessageTooLarge) + p.log.WithField("size", len(msg.Payload)). + WithField("properties", msg.Properties). + Error("message size exceeds MaxMessageSize") + return + } + deliverAt := msg.DeliverAt if msg.DeliverAfter.Nanoseconds() > 0 { deliverAt = time.Now().Add(msg.DeliverAfter)