Skip to content

Commit

Permalink
Fix maxMessageSize not effective even if aligned with broker (#381)
Browse files Browse the repository at this point in the history
### Motivation
- issue 1
    If broker updated `maxMessageSize`, client's `maxMessageSize` will be the same after handshaking.
    However, client still use the default `maxMessageSize` while reading command from connection.
    Lead to consumer can't receive message which payload exceed 5MB.

- issue 2
   According to [PIP-36](https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size), default *Size should be:
   ```
   maxMessageSize = 5MB
   framePaddingSize = 10KB
   maxFrameSize = maxMessageSize + framePaddingSize
   ```
   But they two are confused currently:
   ```
   maxFrameSize = 5MB
   framePaddingSize = 10KB
   maxMessageSize = maxFrameSize - framePaddingSize
   ```

### Modifications

- Use the aligned `maxMessageSize` instead of the default value.
- Correct `maxMessageSize` default value.



### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
wuYin authored Oct 20, 2020
1 parent 02b244e commit 85a9fe8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
10 changes: 5 additions & 5 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
)

const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = 5 * 1024 * 1024
// MaxMessageSize limit message size for transfer
MaxMessageSize = 5 * 1024 * 1024
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxMessageSize limit message size for transfer
MaxMessageSize = MaxFrameSize - MessageFramePadding
magicCrc32c uint16 = 0x0e01
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = MaxMessageSize + MessageFramePadding
magicCrc32c uint16 = 0x0e01
)

// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP

// We have enough to read frame size
frameSize := r.buffer.ReadUint32()
if frameSize > MaxFrameSize {
if r.cnx.maxMessageSize != 0 && int32(frameSize) > (r.cnx.maxMessageSize+MessageFramePadding) {
r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
r.cnx.TriggerClose()
return nil, nil, errors.New("Frame size too big")
Expand Down

0 comments on commit 85a9fe8

Please sign in to comment.