Skip to content

Commit

Permalink
Add check for max message size
Browse files Browse the repository at this point in the history
1. When creating a connection, try to get maxMessageSize from handshake
response command. If it's not set, then use the default maxMessageSize
value defined in the client side.
2. When sending a message, check whether the size of payload exceeds
maxMessageSize. If so, return error immediately without adding this
meesage into sending queue.
3. To implement these, I made some tiny modifications in Connection
interface and added a field in its implementation struct.
  • Loading branch information
izackwu committed May 26, 2020
1 parent e31d474 commit cbf268d
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 4 deletions.
2 changes: 0 additions & 2 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
)

const (
// MaxMessageSize limit message size for transfer
MaxMessageSize = 5 * 1024 * 1024
// MaxBatchSize will be the largest size for a batch sent from this particular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
Expand Down
4 changes: 4 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = 5 * 1024 * 1024
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxMessageSize limit message size for transfer
MaxMessageSize = MaxFrameSize - MessageFramePadding
magicCrc32c uint16 = 0x0e01
)

Expand Down
15 changes: 14 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Connection interface {
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
Close()
}

Expand Down Expand Up @@ -157,6 +158,8 @@ type connection struct {

tlsOptions *TLSOptions
auth auth.Provider

maxMessageSize int32
}

func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
Expand Down Expand Up @@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool {
cmd.Type)
return false
}

if cmd.Connected.MaxMessageSize != nil {
c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
c.maxMessageSize = *cmd.Connected.MaxMessageSize
} else{
c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize)
c.maxMessageSize = MaxMessageSize
}
c.log.Info("Connection is ready")
c.changeState(connectionReady)
return true
Expand Down Expand Up @@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) {
func (c *connection) ID() string {
return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
}

func (c *connection) GetMaxMessageSize() int32{
return c.maxMessageSize
}
15 changes: 14 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ const (
producerClosed
)

var errFailAddBatch = errors.New("message send failed")
var (
errFailAddBatch = errors.New("message send failed")
errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
)

type partitionProducer struct {
state int32
Expand Down Expand Up @@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

msg := request.msg

// if msg is too large
if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()){
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithField("size", len(msg.Payload)).
WithField("properties", msg.Properties).
Error("message size exceeds MaxMessageSize")
return
}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
Expand Down

0 comments on commit cbf268d

Please sign in to comment.