Skip to content

Commit

Permalink
nsqd topic/channel: reset health on successful backend write
Browse files Browse the repository at this point in the history
- channel and topic put reset ctx.nsqd.SetHealth
- change nsqd SetHealth/GetError to use atomic.Value; skip allocation in
  SetHealth if attempting to set an already healthy queue to healthy
- nsqd_test.go: change `exp` to `nexp` in `nequal` output
- relates to nsqio#594
  • Loading branch information
judwhite committed Oct 13, 2015
1 parent 0dd5d5c commit 0fe4d2b
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 15 deletions.
4 changes: 2 additions & 2 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ func (c *Channel) put(m *Message) error {
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
c.name, err)
c.ctx.nsqd.SetHealth(err)
return err
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ func (c *Channel) messagePump() {
atomic.StoreInt32(&c.bufferedCount, 1)
c.clientMsgChan <- msg
atomic.StoreInt32(&c.bufferedCount, 0)
// the client will call back to mark as in-flight w/ it's info
// the client will call back to mark as in-flight w/ its info
}

exit:
Expand Down
57 changes: 57 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package nsqd

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -166,5 +169,59 @@ func TestChannelEmptyConsumer(t *testing.T) {
stats := cl.Stats()
equal(t, stats.InFlightCount, int64(0))
}
}

func TestChannelHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 2

_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topic := nsqd.GetTopic("test")

channel := topic.GetChannel("channel")
// cause channel.messagePump to exit so we can set channel.backend without
// a data race. side effect is it closes clientMsgChan, and messagePump is
// never restarted. note this isn't the intended usage of exitChan but gets
// around the data race without more invasive changes to how channel.backend
// is set/loaded.
channel.exitChan <- 1

channel.backend = &errorBackendQueue{}

msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
err := channel.PutMessage(msg)
equal(t, err, nil)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
nequal(t, err, nil)

url := fmt.Sprintf("http://%s/ping", httpAddr)
resp, err := http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 500)
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")

channel.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
}
29 changes: 18 additions & 11 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
flagLoading
)

type errStore struct {
err error
}

type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64
Expand All @@ -48,8 +52,7 @@ type NSQD struct {

dl *dirlock.DirLock
flag int32
errMtx sync.RWMutex
err error
errValue atomic.Value
startTime time.Time

topicMap map[string]*Topic
Expand Down Expand Up @@ -203,13 +206,14 @@ func (n *NSQD) getFlag(f int32) bool {
}

func (n *NSQD) SetHealth(err error) {
n.errMtx.Lock()
defer n.errMtx.Unlock()
n.err = err
if err != nil {
n.setFlag(flagHealthy, false)
n.errValue.Store(errStore{err: err})
} else {
n.setFlag(flagHealthy, true)
if !n.getFlag(flagHealthy) {
n.setFlag(flagHealthy, true)
n.errValue.Store(errStore{err: nil})
}
}
}

Expand All @@ -218,14 +222,17 @@ func (n *NSQD) IsHealthy() bool {
}

func (n *NSQD) GetError() error {
n.errMtx.RLock()
defer n.errMtx.RUnlock()
return n.err
errValue := n.errValue.Load()
if errValue == nil {
return nil
}
return errValue.(errStore).err
}

func (n *NSQD) GetHealth() string {
if !n.IsHealthy() {
return fmt.Sprintf("NOK - %s", n.GetError())
err := n.GetError()
if err != nil {
return fmt.Sprintf("NOK - %s", err)
}
return "OK"
}
Expand Down
26 changes: 25 additions & 1 deletion nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"errors"
"fmt"
"io/ioutil"
"net"
Expand Down Expand Up @@ -39,7 +40,7 @@ func equal(t *testing.T, act, exp interface{}) {
func nequal(t *testing.T, act, exp interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n",
t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n",
filepath.Base(file), line, exp, act)
t.FailNow()
}
Expand Down Expand Up @@ -438,3 +439,26 @@ func TestCluster(t *testing.T) {
producers, _ = data.Get("channel:" + topicName + ":ch").Array()
equal(t, len(producers), 0)
}

func TestSetHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
nsqd := New(opts)

equal(t, nsqd.GetError(), nil)
equal(t, nsqd.IsHealthy(), true)

nsqd.SetHealth(nil)
equal(t, nsqd.GetError(), nil)
equal(t, nsqd.IsHealthy(), true)

nsqd.SetHealth(errors.New("health error"))
nequal(t, nsqd.GetError(), nil)
equal(t, nsqd.GetHealth(), "NOK - health error")
equal(t, nsqd.IsHealthy(), false)

nsqd.SetHealth(nil)
equal(t, nsqd.GetError(), nil)
equal(t, nsqd.GetHealth(), "OK")
equal(t, nsqd.IsHealthy(), true)
}
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func (t *Topic) put(m *Message) error {
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
t.ctx.nsqd.SetHealth(err)
return err
}
}
Expand Down
17 changes: 17 additions & 0 deletions nsqd/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (d *errorBackendQueue) Delete() error { return nil }
func (d *errorBackendQueue) Depth() int64 { return 0 }
func (d *errorBackendQueue) Empty() error { return nil }

type errorRecoveredBackendQueue struct{ errorBackendQueue }

func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil }

func TestHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
Expand Down Expand Up @@ -93,6 +97,19 @@ func TestHealth(t *testing.T) {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")

topic.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = topic.PutMessages([]*Message{msg})
equal(t, err, nil)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
}

func TestDeletes(t *testing.T) {
Expand Down

0 comments on commit 0fe4d2b

Please sign in to comment.