Skip to content

Commit

Permalink
fix fail to add batchbuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
yexijun committed May 22, 2020
1 parent 7055076 commit d04e8fd
Showing 1 changed file with 16 additions and 22 deletions.
38 changes: 16 additions & 22 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
Expand All @@ -39,6 +40,8 @@ const (
producerClosed
)

var errFailAddBatch = errors.New("fail add to batch builder")

type partitionProducer struct {
state int32
client *client
Expand Down Expand Up @@ -265,35 +268,26 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
}

if sendAsBatch {
added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()
added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()

// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
msg.ReplicationClusters, deliverAt); !ok {
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
}
}
} else {
// Send individually
if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
msg.ReplicationClusters, deliverAt); !added {
// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
msg.ReplicationClusters, deliverAt); !ok {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errFailAddBatch)
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
Error("unable to send single message")
Error("unable to add message to batch")
return
}
p.internalFlushCurrentBatch()
}

if request.flushImmediately {
if !sendAsBatch || request.flushImmediately {
p.internalFlushCurrentBatch()
}
}
Expand Down

0 comments on commit d04e8fd

Please sign in to comment.