Skip to content

Commit

Permalink
fix: avoid assert panic (apache#73)
Browse files Browse the repository at this point in the history
- add assert check

Fixes apache#64

Change-Id: Ibd355440ccf3b06ac60575a9306cfb66cb80d759
  • Loading branch information
xujianhai666 authored and wolfstudy committed Oct 24, 2019
1 parent ad46ac8 commit 8a2ff05
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions pulsar/impl_partition_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"

"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/util"
"github.com/golang/protobuf/proto"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -293,7 +294,12 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
func (p *partitionProducer) internalFlush(fr *flushRequest) {
p.internalFlushCurrentBatch()

pi := p.pendingQueue.PeekLast().(*pendingItem)
pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
fr.waitGroup.Done()
return
}

pi.sendRequests = append(pi.sendRequests, &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
Expand Down Expand Up @@ -349,12 +355,14 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
pi := p.pendingQueue.Peek().(*pendingItem)
pi, ok := p.pendingQueue.Peek().(*pendingItem)

if pi == nil {
if !ok {
p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
return
} else if pi.sequenceID != response.GetSequenceId() {
}

if pi.sequenceID != response.GetSequenceId() {
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
return
Expand Down

0 comments on commit 8a2ff05

Please sign in to comment.