Skip to content

Commit

Permalink
fix: failTimeoutMessages cannot delete outdated messages (#1247)
Browse files Browse the repository at this point in the history
* fix: failTimeoutMessages cannot delete outdated messages

* Fix slice pass

(cherry picked from commit a42cc24)
  • Loading branch information
nodece authored and RobertIndie committed Jul 31, 2024
1 parent 3cbb353 commit 13a7868
Showing 1 changed file with 15 additions and 21 deletions.
36 changes: 15 additions & 21 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
continue
}
pi := item.(*pendingItem)
// when resending pending batches, we update the sendAt timestamp and put to the back of queue
// to avoid pending item been removed by failTimeoutMessages and cause race condition
// when resending pending batches, we update the sendAt timestamp to record the metric.
pi.Lock()
pi.sentAt = time.Now()
pi.Unlock()
Expand Down Expand Up @@ -814,19 +813,14 @@ func (p *partitionProducer) internalSingleSend(
return
}

p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: buffer,
sequenceID: sid,
sendRequests: []interface{}{sr},
})
p._getConn().WriteData(buffer)
p.writeData(buffer, sid, []interface{}{sr})
}

type pendingItem struct {
sync.Mutex
buffer internal.Buffer
sequenceID uint64
createdAt time.Time
sentAt time.Time
sendRequests []interface{}
isDone bool
Expand Down Expand Up @@ -868,13 +862,19 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
return
}

p.writeData(batchData, sequenceID, callbacks)
}

func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) {
now := time.Now()
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: batchData,
createdAt: now,
sentAt: now,
buffer: buffer,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(batchData)
p._getConn().WriteData(buffer)
}

func (p *partitionProducer) failTimeoutMessages() {
Expand All @@ -898,7 +898,7 @@ func (p *partitionProducer) failTimeoutMessages() {
continue
}
oldestItem := item.(*pendingItem)
if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait and retry
t.Reset(nextWaiting)
continue
Expand Down Expand Up @@ -930,7 +930,7 @@ func (p *partitionProducer) failTimeoutMessages() {
pi := m.(*pendingItem)
pi.Lock()
defer pi.Unlock()
if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
if nextWaiting := diff(pi.createdAt); nextWaiting > 0 {
// current and subsequent items not timeout yet, stop iterating
tickerNeedWaiting = nextWaiting
return false
Expand Down Expand Up @@ -995,13 +995,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
if b.BatchData == nil {
continue
}
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: b.BatchData,
sequenceID: b.SequenceID,
sendRequests: b.Callbacks,
})
p._getConn().WriteData(b.BatchData)
p.writeData(b.BatchData, b.SequenceID, b.Callbacks)
}

}
Expand Down

0 comments on commit 13a7868

Please sign in to comment.