From d04e8fdf0f5c9b1d40886851d1ef0c223982f7f0 Mon Sep 17 00:00:00 2001 From: yexijun Date: Fri, 22 May 2020 14:37:59 +0800 Subject: [PATCH] fix fail to add batchbuilder --- pulsar/producer_partition.go | 38 +++++++++++++++--------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 30ab0d9ea8..1edb1d26a7 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "sync" "sync/atomic" "time" @@ -39,6 +40,8 @@ const ( producerClosed ) +var errFailAddBatch = errors.New("fail add to batch builder") + type partitionProducer struct { state int32 client *client @@ -265,35 +268,26 @@ func (p *partitionProducer) internalSend(request *sendRequest) { sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1) } - if sendAsBatch { - added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, - msg.ReplicationClusters, deliverAt) - if !added { - // The current batch is full.. flush it and retry - p.internalFlushCurrentBatch() + added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, + msg.ReplicationClusters, deliverAt) + if !added { + // The current batch is full.. flush it and retry + p.internalFlushCurrentBatch() - // after flushing try again to add the current payload - if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, - msg.ReplicationClusters, deliverAt); !ok { - p.log.WithField("size", len(msg.Payload)). - WithField("sequenceID", sequenceID). - WithField("properties", msg.Properties). - Error("unable to add message to batch") - } - } - } else { - // Send individually - if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, - msg.ReplicationClusters, deliverAt); !added { + // after flushing try again to add the current payload + if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, + msg.ReplicationClusters, deliverAt); !ok { + p.publishSemaphore.Release() + request.callback(nil, request.msg, errFailAddBatch) p.log.WithField("size", len(msg.Payload)). WithField("sequenceID", sequenceID). WithField("properties", msg.Properties). - Error("unable to send single message") + Error("unable to add message to batch") + return } - p.internalFlushCurrentBatch() } - if request.flushImmediately { + if !sendAsBatch || request.flushImmediately { p.internalFlushCurrentBatch() } }