diff --git a/nsqd/channel.go b/nsqd/channel.go index 42a1df378..a016edbd1 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -320,10 +320,10 @@ func (c *Channel) put(m *Message) error { b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) + c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err) - c.ctx.nsqd.SetHealth(err) return err } } @@ -571,7 +571,7 @@ func (c *Channel) messagePump() { atomic.StoreInt32(&c.bufferedCount, 1) c.clientMsgChan <- msg atomic.StoreInt32(&c.bufferedCount, 0) - // the client will call back to mark as in-flight w/ it's info + // the client will call back to mark as in-flight w/ its info } exit: diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 14fbcd6b1..b620a3dd4 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -1,6 +1,9 @@ package nsqd import ( + "fmt" + "io/ioutil" + "net/http" "os" "strconv" "testing" @@ -166,5 +169,59 @@ func TestChannelEmptyConsumer(t *testing.T) { stats := cl.Stats() equal(t, stats.InFlightCount, int64(0)) } +} +func TestChannelHealth(t *testing.T) { + opts := NewOptions() + opts.Logger = newTestLogger(t) + opts.MemQueueSize = 2 + + _, httpAddr, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topic := nsqd.GetTopic("test") + + channel := topic.GetChannel("channel") + // cause channel.messagePump to exit so we can set channel.backend without + // a data race. side effect is it closes clientMsgChan, and messagePump is + // never restarted. note this isn't the intended usage of exitChan but gets + // around the data race without more invasive changes to how channel.backend + // is set/loaded. + channel.exitChan <- 1 + + channel.backend = &errorBackendQueue{} + + msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + err := channel.PutMessage(msg) + equal(t, err, nil) + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = channel.PutMessage(msg) + equal(t, err, nil) + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = channel.PutMessage(msg) + nequal(t, err, nil) + + url := fmt.Sprintf("http://%s/ping", httpAddr) + resp, err := http.Get(url) + equal(t, err, nil) + equal(t, resp.StatusCode, 500) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + equal(t, string(body), "NOK - never gonna happen") + + channel.backend = &errorRecoveredBackendQueue{} + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = channel.PutMessage(msg) + equal(t, err, nil) + + resp, err = http.Get(url) + equal(t, err, nil) + equal(t, resp.StatusCode, 200) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + equal(t, string(body), "OK") } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index b747a0826..e9c346dd3 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -38,6 +38,10 @@ const ( flagLoading ) +type errStore struct { + err error +} + type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -48,8 +52,7 @@ type NSQD struct { dl *dirlock.DirLock flag int32 - errMtx sync.RWMutex - err error + errValue atomic.Value startTime time.Time topicMap map[string]*Topic @@ -203,13 +206,14 @@ func (n *NSQD) getFlag(f int32) bool { } func (n *NSQD) SetHealth(err error) { - n.errMtx.Lock() - defer n.errMtx.Unlock() - n.err = err if err != nil { n.setFlag(flagHealthy, false) + n.errValue.Store(errStore{err: err}) } else { - n.setFlag(flagHealthy, true) + if !n.getFlag(flagHealthy) { + n.setFlag(flagHealthy, true) + n.errValue.Store(errStore{err: nil}) + } } } @@ -218,14 +222,17 @@ func (n *NSQD) IsHealthy() bool { } func (n *NSQD) GetError() error { - n.errMtx.RLock() - defer n.errMtx.RUnlock() - return n.err + errValue := n.errValue.Load() + if errValue == nil { + return nil + } + return errValue.(errStore).err } func (n *NSQD) GetHealth() string { - if !n.IsHealthy() { - return fmt.Sprintf("NOK - %s", n.GetError()) + err := n.GetError() + if err != nil { + return fmt.Sprintf("NOK - %s", err) } return "OK" } diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index e3d45ce49..83541fd8e 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -1,6 +1,7 @@ package nsqd import ( + "errors" "fmt" "io/ioutil" "net" @@ -39,7 +40,7 @@ func equal(t *testing.T, act, exp interface{}) { func nequal(t *testing.T, act, exp interface{}) { if reflect.DeepEqual(exp, act) { _, file, line, _ := runtime.Caller(1) - t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", + t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act) t.FailNow() } @@ -438,3 +439,26 @@ func TestCluster(t *testing.T) { producers, _ = data.Get("channel:" + topicName + ":ch").Array() equal(t, len(producers), 0) } + +func TestSetHealth(t *testing.T) { + opts := NewOptions() + opts.Logger = newTestLogger(t) + nsqd := New(opts) + + equal(t, nsqd.GetError(), nil) + equal(t, nsqd.IsHealthy(), true) + + nsqd.SetHealth(nil) + equal(t, nsqd.GetError(), nil) + equal(t, nsqd.IsHealthy(), true) + + nsqd.SetHealth(errors.New("health error")) + nequal(t, nsqd.GetError(), nil) + equal(t, nsqd.GetHealth(), "NOK - health error") + equal(t, nsqd.IsHealthy(), false) + + nsqd.SetHealth(nil) + equal(t, nsqd.GetError(), nil) + equal(t, nsqd.GetHealth(), "OK") + equal(t, nsqd.IsHealthy(), true) +} diff --git a/nsqd/topic.go b/nsqd/topic.go index e4bad6413..21b7d9e9f 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -190,11 +190,11 @@ func (t *Topic) put(m *Message) error { b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) + t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf( "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) - t.ctx.nsqd.SetHealth(err) return err } } diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index ef540dc29..8340cbf15 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -59,6 +59,10 @@ func (d *errorBackendQueue) Delete() error { return nil } func (d *errorBackendQueue) Depth() int64 { return 0 } func (d *errorBackendQueue) Empty() error { return nil } +type errorRecoveredBackendQueue struct{ errorBackendQueue } + +func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil } + func TestHealth(t *testing.T) { opts := NewOptions() opts.Logger = newTestLogger(t) @@ -93,6 +97,19 @@ func TestHealth(t *testing.T) { body, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() equal(t, string(body), "NOK - never gonna happen") + + topic.backend = &errorRecoveredBackendQueue{} + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = topic.PutMessages([]*Message{msg}) + equal(t, err, nil) + + resp, err = http.Get(url) + equal(t, err, nil) + equal(t, resp.StatusCode, 200) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + equal(t, string(body), "OK") } func TestDeletes(t *testing.T) {