Skip to content

Commit

Permalink
Fix handling of unencodable messages
Browse files Browse the repository at this point in the history
Move `Key.Encode()` and `Value.Encode()` calls slightly earlier (to
`groupAndFilter`) where we have access to the batch in order to be able to
remove them from consideration on error. Otherwise failed messages would not be
removed from the batch and could end up returned twice.

Add cache members to the ProducerMessage struct to store the results until we
actually need them.

Fixes #449.

This is perhaps not the most elegant solution. However it is correct and a
better solution would be a lot more invasive. This will do in order to ship the
fix in a 1.5.1 patch release.
  • Loading branch information
eapache committed Sep 24, 2015
1 parent 6f369be commit 2840a37
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type ProducerMessage struct {

retries int
flags flagSet

keyCache, valueCache []byte
}

func (m *ProducerMessage) byteSize() int {
Expand All @@ -135,6 +137,8 @@ func (m *ProducerMessage) byteSize() int {
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
m.keyCache = nil
m.valueCache = nil
}

// ProducerError is the type of error generated when the producer fails to deliver a message.
Expand Down Expand Up @@ -660,6 +664,7 @@ func (f *flusher) run() {
}

func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
var err error
msgSets := make(map[string]map[int32][]*ProducerMessage)

for i, msg := range batch {
Expand All @@ -679,6 +684,22 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32]
continue
}

if msg.Key != nil {
if msg.keyCache, err = msg.Key.Encode(); err != nil {
f.parent.returnError(msg, err)
batch[i] = nil
continue
}
}

if msg.Value != nil {
if msg.valueCache, err = msg.Value.Encode(); err != nil {
f.parent.returnError(msg, err)
batch[i] = nil
continue
}
}

partitionSet := msgSets[msg.Topic]
if partitionSet == nil {
partitionSet = make(map[int32][]*ProducerMessage)
Expand Down Expand Up @@ -786,21 +807,6 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
setToSend := new(MessageSet)
setSize := 0
for _, msg := range msgSet {
var keyBytes, valBytes []byte
var err error
if msg.Key != nil {
if keyBytes, err = msg.Key.Encode(); err != nil {
p.returnError(msg, err)
continue
}
}
if msg.Value != nil {
if valBytes, err = msg.Value.Encode(); err != nil {
p.returnError(msg, err)
continue
}
}

if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
// compression causes message-sets to be wrapped as single messages, which have tighter
// size requirements, so we have to respect those limits
Expand All @@ -815,7 +821,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
}
setSize += msg.byteSize()

setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache})
empty = false
}

Expand Down

0 comments on commit 2840a37

Please sign in to comment.