diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b2fae729c..78f1f3ccb8 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -372,8 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { continue } pi := item.(*pendingItem) - // when resending pending batches, we update the sendAt timestamp and put to the back of queue - // to avoid pending item been removed by failTimeoutMessages and cause race condition + // when resending pending batches, we update the sendAt timestamp to record the metric. pi.Lock() pi.sentAt = time.Now() pi.Unlock() @@ -814,19 +813,14 @@ func (p *partitionProducer) internalSingleSend( return } - p.pendingQueue.Put(&pendingItem{ - sentAt: time.Now(), - buffer: buffer, - sequenceID: sid, - sendRequests: []interface{}{sr}, - }) - p._getConn().WriteData(buffer) + p.writeData(buffer, sid, []interface{}{sr}) } type pendingItem struct { sync.Mutex buffer internal.Buffer sequenceID uint64 + createdAt time.Time sentAt time.Time sendRequests []interface{} isDone bool @@ -868,13 +862,19 @@ func (p *partitionProducer) internalFlushCurrentBatch() { return } + p.writeData(batchData, sequenceID, callbacks) +} + +func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) { + now := time.Now() p.pendingQueue.Put(&pendingItem{ - sentAt: time.Now(), - buffer: batchData, + createdAt: now, + sentAt: now, + buffer: buffer, sequenceID: sequenceID, sendRequests: callbacks, }) - p._getConn().WriteData(batchData) + p._getConn().WriteData(buffer) } func (p *partitionProducer) failTimeoutMessages() { @@ -898,7 +898,7 @@ func (p *partitionProducer) failTimeoutMessages() { continue } oldestItem := item.(*pendingItem) - if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 { + if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 { // none of these pending messages have timed out, wait and retry t.Reset(nextWaiting) continue @@ -930,7 +930,7 @@ func (p *partitionProducer) failTimeoutMessages() { pi := m.(*pendingItem) pi.Lock() defer pi.Unlock() - if nextWaiting := diff(pi.sentAt); nextWaiting > 0 { + if nextWaiting := diff(pi.createdAt); nextWaiting > 0 { // current and subsequent items not timeout yet, stop iterating tickerNeedWaiting = nextWaiting return false @@ -995,13 +995,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { if b.BatchData == nil { continue } - p.pendingQueue.Put(&pendingItem{ - sentAt: time.Now(), - buffer: b.BatchData, - sequenceID: b.SequenceID, - sendRequests: b.Callbacks, - }) - p._getConn().WriteData(b.BatchData) + p.writeData(b.BatchData, b.SequenceID, b.Callbacks) } }