diff --git a/reader.go b/reader.go index c7259f33..7dda65a5 100644 --- a/reader.go +++ b/reader.go @@ -80,8 +80,11 @@ type nsqConn struct { readTimeout time.Duration writeTimeout time.Duration stopper sync.Once - dying chan struct{} - drainReady chan struct{} + dying chan int + drainReady chan int + readyChan chan int + exitChan chan int + backoffCounter int32 } func newNSQConn(addr string, readTimeout time.Duration, writeTimeout time.Duration) (*nsqConn, error) { @@ -97,8 +100,10 @@ func newNSQConn(addr string, readTimeout time.Duration, writeTimeout time.Durati finishedMessages: make(chan *FinishedMessage), readTimeout: readTimeout, writeTimeout: writeTimeout, - dying: make(chan struct{}, 1), - drainReady: make(chan struct{}), + dying: make(chan int, 1), + drainReady: make(chan int), + readyChan: make(chan int, 1), + exitChan: make(chan int), } nc.SetWriteDeadline(time.Now().Add(nc.writeTimeout)) @@ -162,6 +167,8 @@ type Reader struct { ExitChan chan int // read from this channel to block your main loop // internal variables + maxBackoffDuration time.Duration + maxBackoffCount int32 maxInFlight int incomingMessages chan *incomingMessage nsqConnections map[string]*nsqConn @@ -209,6 +216,7 @@ func NewReader(topic string, channel string) (*Reader, error) { WriteTimeout: time.Second, maxInFlight: 1, } + q.SetMaxBackoffDuration(120 * time.Second) return q, nil } @@ -255,10 +263,19 @@ func (q *Reader) SetMaxInFlight(maxInFlight int) { q.maxInFlight = maxInFlight for _, c := range q.nsqConnections { - q.updateReady(c) + select { + case c.readyChan <- 1: + default: + } } } +// SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing +func (q *Reader) SetMaxBackoffDuration(duration time.Duration) { + q.maxBackoffDuration = duration + q.maxBackoffCount = int32(math.Max(1, math.Ceil(math.Log2(duration.Seconds())))) +} + // MaxInFlight returns the configured maximum number of messages to allow in-flight. func (q *Reader) MaxInFlight() int { return q.maxInFlight @@ -381,6 +398,7 @@ func (q *Reader) ConnectToNSQ(addr string) error { go q.readLoop(connection) go q.finishLoop(connection) + go q.rdyLoop(connection) return nil } @@ -396,7 +414,7 @@ func handleError(q *Reader, c *nsqConn, errMsg string) { func (q *Reader) readLoop(c *nsqConn) { // prime our ready state - err := q.updateReady(c) + err := q.updateRDY(c) if err != nil { handleError(q, c, fmt.Sprintf("[%s] failed to send initial ready - %s", c, err.Error())) q.stopFinishLoop(c) @@ -412,7 +430,7 @@ func (q *Reader) readLoop(c *nsqConn) { log.Printf("[%s] delaying close of FinishedMesages channel; %d outstanding messages", c, c.messagesInFlight) } log.Printf("[%s] stopped read loop ", c) - break + goto exit } resp, err := ReadResponse(c) @@ -460,7 +478,7 @@ func (q *Reader) readLoop(c *nsqConn) { err := c.sendCommand(&buf, Nop()) if err != nil { handleError(q, c, fmt.Sprintf("[%s] error sending NOP - %s", c, err.Error())) - return + goto exit } } case FrameTypeError: @@ -469,49 +487,82 @@ func (q *Reader) readLoop(c *nsqConn) { log.Printf("[%s] unknown message type %d", c, frameType) } - q.updateReady(c) + select { + case c.readyChan <- 1: + default: + } } + +exit: + log.Printf("[%s] readLoop exiting", c) } func (q *Reader) finishLoop(c *nsqConn) { var buf bytes.Buffer + var backoffCounter int32 + var backoffUpdated bool + var backoffDeadline time.Time for { select { case <-c.dying: log.Printf("[%s] breaking out of finish loop ", c) // Indicate drainReady because we will not pull any more off finishedMessages - c.drainReady <- struct{}{} - return + c.drainReady <- 1 + goto exit case msg := <-c.finishedMessages: // Decrement this here so it is correct even if we can't respond to nsqd atomic.AddInt64(&q.messagesInFlight, -1) atomic.AddInt64(&c.messagesInFlight, -1) + now := time.Now() if msg.Success { if q.VerboseLogging { log.Printf("[%s] finishing %s", c, msg.Id) } + err := c.sendCommand(&buf, Finish(msg.Id)) if err != nil { log.Printf("[%s] error finishing %s - %s", c, msg.Id, err.Error()) q.stopFinishLoop(c) continue } + atomic.AddUint64(&c.messagesFinished, 1) atomic.AddUint64(&q.MessagesFinished, 1) + + if backoffCounter > 0 && now.After(backoffDeadline) { + backoffCounter-- + backoffUpdated = true + } } else { if q.VerboseLogging { log.Printf("[%s] requeuing %s", c, msg.Id) } + err := c.sendCommand(&buf, Requeue(msg.Id, msg.RequeueDelayMs)) if err != nil { log.Printf("[%s] error requeueing %s - %s", c, msg.Id, err.Error()) q.stopFinishLoop(c) continue } + atomic.AddUint64(&c.messagesRequeued, 1) atomic.AddUint64(&q.MessagesRequeued, 1) + + if backoffCounter < q.maxBackoffCount && now.After(backoffDeadline) { + backoffCounter++ + backoffUpdated = true + } + } + + atomic.StoreInt32(&c.backoffCounter, backoffCounter) + // prevent many async failures/successes from immediately resulting in + // max backoff/normal rate (by ensuring that we dont continually incr/decr + // the counter during a backoff period) + if backoffCounter > 0 && backoffUpdated { + backoffDuration := q.backoffDuration(backoffCounter) + backoffDeadline = now.Add(backoffDuration) } if atomic.LoadInt64(&c.messagesInFlight) == 0 && @@ -521,13 +572,16 @@ func (q *Reader) finishLoop(c *nsqConn) { } } } + +exit: + log.Printf("[%s] finishLoop exiting", c) } func (q *Reader) stopFinishLoop(c *nsqConn) { c.stopper.Do(func() { log.Printf("[%s] beginning stopFinishLoop logic", c) // This doesn't block because dying has buffer of 1 - c.dying <- struct{}{} + c.dying <- 1 // Drain the finishedMessages channel go func() { @@ -536,6 +590,7 @@ func (q *Reader) stopFinishLoop(c *nsqConn) { <-c.finishedMessages } }() + close(c.exitChan) c.Close() delete(q.nsqConnections, c.String()) @@ -564,9 +619,54 @@ func (q *Reader) stopFinishLoop(c *nsqConn) { }) } -func (q *Reader) updateReady(c *nsqConn) error { - var buf bytes.Buffer +func (q *Reader) backoffDuration(count int32) time.Duration { + backoffDuration := time.Second * time.Duration(math.Pow(2, float64(count))) + if backoffDuration > q.maxBackoffDuration { + backoffDuration = q.maxBackoffDuration + } + return backoffDuration +} + +func (q *Reader) rdyLoop(c *nsqConn) { + readyChan := c.readyChan + var backoffTimer *time.Timer + var backoffTimerChan <-chan time.Time + + for { + select { + case <-backoffTimerChan: + log.Printf("[%s] backoff time expired, continuing with RDY 1...", c) + // while in backoff only ever let 1 message at a time through + q.sendRDY(c, 1) + readyChan = c.readyChan + case <-readyChan: + backoffCounter := atomic.LoadInt32(&c.backoffCounter) + + // send ready immediately + if backoffCounter == 0 { + q.updateRDY(c) + continue + } + + backoffDuration := q.backoffDuration(backoffCounter) + backoffTimer = time.NewTimer(backoffDuration) + backoffTimerChan = backoffTimer.C + readyChan = nil + + log.Printf("[%s] backing off for %.02f seconds", c, backoffDuration.Seconds()) + case <-c.exitChan: + if backoffTimer != nil { + backoffTimer.Stop() + } + goto exit + } + } +exit: + log.Printf("[%s] rdyLoop exiting", c) +} + +func (q *Reader) updateRDY(c *nsqConn) error { if atomic.LoadInt32(&c.stopFlag) != 0 { return nil } @@ -578,12 +678,7 @@ func (q *Reader) updateReady(c *nsqConn) error { if q.VerboseLogging { log.Printf("[%s] sending RDY %d (%d remain)", c, mif, remain) } - atomic.StoreInt64(&c.rdyCount, int64(mif)) - err := c.sendCommand(&buf, Ready(mif)) - if err != nil { - handleError(q, c, fmt.Sprintf("[%s] error sending RDY %d - %s", c, mif, err.Error())) - return err - } + q.sendRDY(c, mif) } else { if q.VerboseLogging { log.Printf("[%s] skip sending RDY (%d remain out of %d)", c, remain, mif) @@ -593,6 +688,17 @@ func (q *Reader) updateReady(c *nsqConn) error { return nil } +func (q *Reader) sendRDY(c *nsqConn, count int) error { + var buf bytes.Buffer + atomic.StoreInt64(&c.rdyCount, int64(count)) + err := c.sendCommand(&buf, Ready(count)) + if err != nil { + handleError(q, c, fmt.Sprintf("[%s] error sending RDY %d - %s", c, count, err.Error())) + return err + } + return nil +} + // Stop will gracefully stop the Reader func (q *Reader) Stop() { var buf bytes.Buffer diff --git a/reader_test.go b/reader_test.go index 620ab346..20ee0b60 100644 --- a/reader_test.go +++ b/reader_test.go @@ -66,7 +66,10 @@ func TestQueuereader(t *testing.T) { topicName := "reader_test" + strconv.Itoa(int(time.Now().Unix())) q, _ := NewReader(topicName, "ch") q.VerboseLogging = true - q.DefaultRequeueDelay = 0 // so that the test can simulate reaching max requeues and a call to LogFailedMessage + // so that the test can simulate reaching max requeues and a call to LogFailedMessage + q.DefaultRequeueDelay = 0 + // so that the test wont timeout from backing off + q.SetMaxBackoffDuration(time.Millisecond * 50) h := &MyTestHandler{ t: t,