diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index a4d5e5f77e..1adfba469a 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -28,13 +28,13 @@ import ( ) const ( - // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. - MaxFrameSize = 5 * 1024 * 1024 + // MaxMessageSize limit message size for transfer + MaxMessageSize = 5 * 1024 * 1024 // MessageFramePadding is for metadata and other frame headers MessageFramePadding = 10 * 1024 - // MaxMessageSize limit message size for transfer - MaxMessageSize = MaxFrameSize - MessageFramePadding - magicCrc32c uint16 = 0x0e01 + // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. + MaxFrameSize = MaxMessageSize + MessageFramePadding + magicCrc32c uint16 = 0x0e01 ) // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go index 60c179d127..803532475f 100644 --- a/pulsar/internal/connection_reader.go +++ b/pulsar/internal/connection_reader.go @@ -73,7 +73,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP // We have enough to read frame size frameSize := r.buffer.ReadUint32() - if frameSize > MaxFrameSize { + if r.cnx.maxMessageSize != 0 && int32(frameSize) > (r.cnx.maxMessageSize+MessageFramePadding) { r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize) r.cnx.TriggerClose() return nil, nil, errors.New("Frame size too big")