Skip to content

Commit

Permalink
nsq: add backoff to Go reader library
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Jan 12, 2013
1 parent 5891dd9 commit 82a05da
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 21 deletions.
146 changes: 126 additions & 20 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ type nsqConn struct {
readTimeout time.Duration
writeTimeout time.Duration
stopper sync.Once
dying chan struct{}
drainReady chan struct{}
dying chan int
drainReady chan int
readyChan chan int
exitChan chan int
backoffCounter int32
}

func newNSQConn(addr string, readTimeout time.Duration, writeTimeout time.Duration) (*nsqConn, error) {
Expand All @@ -97,8 +100,10 @@ func newNSQConn(addr string, readTimeout time.Duration, writeTimeout time.Durati
finishedMessages: make(chan *FinishedMessage),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
dying: make(chan struct{}, 1),
drainReady: make(chan struct{}),
dying: make(chan int, 1),
drainReady: make(chan int),
readyChan: make(chan int, 1),
exitChan: make(chan int),
}

nc.SetWriteDeadline(time.Now().Add(nc.writeTimeout))
Expand Down Expand Up @@ -162,6 +167,8 @@ type Reader struct {
ExitChan chan int // read from this channel to block your main loop

// internal variables
maxBackoffDuration time.Duration
maxBackoffCount int32
maxInFlight int
incomingMessages chan *incomingMessage
nsqConnections map[string]*nsqConn
Expand Down Expand Up @@ -209,6 +216,7 @@ func NewReader(topic string, channel string) (*Reader, error) {
WriteTimeout: time.Second,
maxInFlight: 1,
}
q.SetMaxBackoffDuration(120 * time.Second)
return q, nil
}

Expand Down Expand Up @@ -255,10 +263,19 @@ func (q *Reader) SetMaxInFlight(maxInFlight int) {
q.maxInFlight = maxInFlight

for _, c := range q.nsqConnections {
q.updateReady(c)
select {
case c.readyChan <- 1:
default:
}
}
}

// SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing
func (q *Reader) SetMaxBackoffDuration(duration time.Duration) {
q.maxBackoffDuration = duration
q.maxBackoffCount = int32(math.Max(1, math.Ceil(math.Log2(duration.Seconds()))))
}

// MaxInFlight returns the configured maximum number of messages to allow in-flight.
func (q *Reader) MaxInFlight() int {
return q.maxInFlight
Expand Down Expand Up @@ -381,6 +398,7 @@ func (q *Reader) ConnectToNSQ(addr string) error {

go q.readLoop(connection)
go q.finishLoop(connection)
go q.rdyLoop(connection)

return nil
}
Expand All @@ -396,7 +414,7 @@ func handleError(q *Reader, c *nsqConn, errMsg string) {

func (q *Reader) readLoop(c *nsqConn) {
// prime our ready state
err := q.updateReady(c)
err := q.updateRDY(c)
if err != nil {
handleError(q, c, fmt.Sprintf("[%s] failed to send initial ready - %s", c, err.Error()))
q.stopFinishLoop(c)
Expand All @@ -412,7 +430,7 @@ func (q *Reader) readLoop(c *nsqConn) {
log.Printf("[%s] delaying close of FinishedMesages channel; %d outstanding messages", c, c.messagesInFlight)
}
log.Printf("[%s] stopped read loop ", c)
break
goto exit
}

resp, err := ReadResponse(c)
Expand Down Expand Up @@ -460,7 +478,7 @@ func (q *Reader) readLoop(c *nsqConn) {
err := c.sendCommand(&buf, Nop())
if err != nil {
handleError(q, c, fmt.Sprintf("[%s] error sending NOP - %s", c, err.Error()))
return
goto exit
}
}
case FrameTypeError:
Expand All @@ -469,49 +487,82 @@ func (q *Reader) readLoop(c *nsqConn) {
log.Printf("[%s] unknown message type %d", c, frameType)
}

q.updateReady(c)
select {
case c.readyChan <- 1:
default:
}
}

exit:
log.Printf("[%s] readLoop exiting", c)
}

func (q *Reader) finishLoop(c *nsqConn) {
var buf bytes.Buffer
var backoffCounter int32
var backoffUpdated bool
var backoffDeadline time.Time

for {
select {
case <-c.dying:
log.Printf("[%s] breaking out of finish loop ", c)
// Indicate drainReady because we will not pull any more off finishedMessages
c.drainReady <- struct{}{}
return
c.drainReady <- 1
goto exit
case msg := <-c.finishedMessages:
// Decrement this here so it is correct even if we can't respond to nsqd
atomic.AddInt64(&q.messagesInFlight, -1)
atomic.AddInt64(&c.messagesInFlight, -1)
now := time.Now()

if msg.Success {
if q.VerboseLogging {
log.Printf("[%s] finishing %s", c, msg.Id)
}

err := c.sendCommand(&buf, Finish(msg.Id))
if err != nil {
log.Printf("[%s] error finishing %s - %s", c, msg.Id, err.Error())
q.stopFinishLoop(c)
continue
}

atomic.AddUint64(&c.messagesFinished, 1)
atomic.AddUint64(&q.MessagesFinished, 1)

if backoffCounter > 0 && now.After(backoffDeadline) {
backoffCounter--
backoffUpdated = true
}
} else {
if q.VerboseLogging {
log.Printf("[%s] requeuing %s", c, msg.Id)
}

err := c.sendCommand(&buf, Requeue(msg.Id, msg.RequeueDelayMs))
if err != nil {
log.Printf("[%s] error requeueing %s - %s", c, msg.Id, err.Error())
q.stopFinishLoop(c)
continue
}

atomic.AddUint64(&c.messagesRequeued, 1)
atomic.AddUint64(&q.MessagesRequeued, 1)

if backoffCounter < q.maxBackoffCount && now.After(backoffDeadline) {
backoffCounter++
backoffUpdated = true
}
}

atomic.StoreInt32(&c.backoffCounter, backoffCounter)
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if backoffCounter > 0 && backoffUpdated {
backoffDuration := q.backoffDuration(backoffCounter)
backoffDeadline = now.Add(backoffDuration)
}

if atomic.LoadInt64(&c.messagesInFlight) == 0 &&
Expand All @@ -521,13 +572,16 @@ func (q *Reader) finishLoop(c *nsqConn) {
}
}
}

exit:
log.Printf("[%s] finishLoop exiting", c)
}

func (q *Reader) stopFinishLoop(c *nsqConn) {
c.stopper.Do(func() {
log.Printf("[%s] beginning stopFinishLoop logic", c)
// This doesn't block because dying has buffer of 1
c.dying <- struct{}{}
c.dying <- 1

// Drain the finishedMessages channel
go func() {
Expand All @@ -536,6 +590,7 @@ func (q *Reader) stopFinishLoop(c *nsqConn) {
<-c.finishedMessages
}
}()
close(c.exitChan)
c.Close()
delete(q.nsqConnections, c.String())

Expand Down Expand Up @@ -564,9 +619,54 @@ func (q *Reader) stopFinishLoop(c *nsqConn) {
})
}

func (q *Reader) updateReady(c *nsqConn) error {
var buf bytes.Buffer
func (q *Reader) backoffDuration(count int32) time.Duration {
backoffDuration := time.Second * time.Duration(math.Pow(2, float64(count)))
if backoffDuration > q.maxBackoffDuration {
backoffDuration = q.maxBackoffDuration
}
return backoffDuration
}

func (q *Reader) rdyLoop(c *nsqConn) {
readyChan := c.readyChan
var backoffTimer *time.Timer
var backoffTimerChan <-chan time.Time

for {
select {
case <-backoffTimerChan:
log.Printf("[%s] backoff time expired, continuing with RDY 1...", c)
// while in backoff only ever let 1 message at a time through
q.sendRDY(c, 1)
readyChan = c.readyChan
case <-readyChan:
backoffCounter := atomic.LoadInt32(&c.backoffCounter)

// send ready immediately
if backoffCounter == 0 {
q.updateRDY(c)
continue
}

backoffDuration := q.backoffDuration(backoffCounter)
backoffTimer = time.NewTimer(backoffDuration)
backoffTimerChan = backoffTimer.C
readyChan = nil

log.Printf("[%s] backing off for %.02f seconds", c, backoffDuration.Seconds())
case <-c.exitChan:
if backoffTimer != nil {
backoffTimer.Stop()
}
goto exit
}
}

exit:
log.Printf("[%s] rdyLoop exiting", c)
}

func (q *Reader) updateRDY(c *nsqConn) error {
if atomic.LoadInt32(&c.stopFlag) != 0 {
return nil
}
Expand All @@ -578,12 +678,7 @@ func (q *Reader) updateReady(c *nsqConn) error {
if q.VerboseLogging {
log.Printf("[%s] sending RDY %d (%d remain)", c, mif, remain)
}
atomic.StoreInt64(&c.rdyCount, int64(mif))
err := c.sendCommand(&buf, Ready(mif))
if err != nil {
handleError(q, c, fmt.Sprintf("[%s] error sending RDY %d - %s", c, mif, err.Error()))
return err
}
q.sendRDY(c, mif)
} else {
if q.VerboseLogging {
log.Printf("[%s] skip sending RDY (%d remain out of %d)", c, remain, mif)
Expand All @@ -593,6 +688,17 @@ func (q *Reader) updateReady(c *nsqConn) error {
return nil
}

func (q *Reader) sendRDY(c *nsqConn, count int) error {
var buf bytes.Buffer
atomic.StoreInt64(&c.rdyCount, int64(count))
err := c.sendCommand(&buf, Ready(count))
if err != nil {
handleError(q, c, fmt.Sprintf("[%s] error sending RDY %d - %s", c, count, err.Error()))
return err
}
return nil
}

// Stop will gracefully stop the Reader
func (q *Reader) Stop() {
var buf bytes.Buffer
Expand Down
5 changes: 4 additions & 1 deletion reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ func TestQueuereader(t *testing.T) {
topicName := "reader_test" + strconv.Itoa(int(time.Now().Unix()))
q, _ := NewReader(topicName, "ch")
q.VerboseLogging = true
q.DefaultRequeueDelay = 0 // so that the test can simulate reaching max requeues and a call to LogFailedMessage
// so that the test can simulate reaching max requeues and a call to LogFailedMessage
q.DefaultRequeueDelay = 0
// so that the test wont timeout from backing off
q.SetMaxBackoffDuration(time.Millisecond * 50)

h := &MyTestHandler{
t: t,
Expand Down

0 comments on commit 82a05da

Please sign in to comment.