Skip to content

Commit

Permalink
fix: batch flush method (#476)
Browse files Browse the repository at this point in the history
Signed-off-by: jonyhy96 <hy352144278@gmail.com>

Fixes #475

### Motivation

Fix wrong batch flush method bug.

### Modifications

Check if batchBuilder is multi batches before flush.
  • Loading branch information
jonyhy96 authored Mar 5, 2021
1 parent 86fab51 commit cd210a1
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
Expand Down

0 comments on commit cd210a1

Please sign in to comment.