Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asynchronized send timeout checking without pending queue lock #460

Merged
merged 1 commit into from
Feb 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 81 additions & 37 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.setProducerState(producerReady)

if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
}
go p.runEventsLoop()

return p, nil
Expand Down Expand Up @@ -427,10 +430,6 @@ type pendingItem struct {
}

func (p *partitionProducer) internalFlushCurrentBatch() {
if p.options.SendTimeout > 0 {
p.failTimeoutMessages()
}

batchData, sequenceID, callbacks := p.batchBuilder.Flush()
if batchData == nil {
return
Expand All @@ -446,46 +445,91 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}

func (p *partitionProducer) failTimeoutMessages() {
// since Closing/Closed connection couldn't be reopen, load and compare is safe
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
diff := func(sentAt time.Time) time.Duration {
return p.options.SendTimeout - time.Since(sentAt)
}

item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
return
}
t := time.NewTimer(p.options.SendTimeout)
defer t.Stop()

pi := item.(*pendingItem)
if time.Since(pi.sentAt) < p.options.SendTimeout {
// pending messages not timeout yet
return
}
for range t.C {
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
}

item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
t.Reset(p.options.SendTimeout)
continue
}
oldestItem := item.(*pendingItem)
if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait and retry
t.Reset(nextWaiting)
continue
}

p.log.Infof("Failing %d messages", p.pendingQueue.Size())
for p.pendingQueue.Size() > 0 {
pi = p.pendingQueue.Poll().(*pendingItem)
pi.Lock()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.publishSemaphore.Release()
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
p.log.WithError(errSendTimeout).
WithField("size", size).
WithField("properties", sr.msg.Properties)
// since pending queue is not thread safe because of there is no global iteration lock
// to control poll from pending queue, current goroutine and connection receipt handler
// iterate pending queue at the same time, this maybe a performance trade-off
// see https://github.com/apache/pulsar-client-go/pull/301
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
// double check
t.Reset(p.options.SendTimeout)
continue
}
p.log.Infof("Failing %d messages", viewSize)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)

// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.Poll()
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
if sr.callback != nil {
sr.callback(nil, sr.msg, errSendTimeout)

pi := item.(*pendingItem)
pi.Lock()
if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
// current and subsequent items not timeout yet, stop iterating
t.Reset(nextWaiting)
pi.Unlock()
break
}

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.publishSemaphore.Release()
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
p.log.WithError(errSendTimeout).
WithField("size", size).
WithField("properties", sr.msg.Properties)
}
if sr.callback != nil {
sr.callback(nil, sr.msg, errSendTimeout)
}
}

// flag the send has completed with error, flush make no effect
pi.completed = true
buffersPool.Put(pi.batchData)
pi.Unlock()

// finally reached the last view item, current iteration ends
if pi == lastViewItem {
t.Reset(p.options.SendTimeout)
break
}
}
buffersPool.Put(pi.batchData)
pi.Unlock()
}
}

Expand Down