From f64364c22fd307fd98cc2f334afe068a2cea8897 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 9 Aug 2015 17:19:09 -0700 Subject: [PATCH] persist rangeset topic/channel state --- nsqd/channel.go | 53 ++++++++++++++++++++++++++++++++++++++---- nsqd/dqname.go | 9 ------- nsqd/dqname_windows.go | 10 -------- nsqd/nsqd.go | 17 +++++--------- nsqd/nsqd_test.go | 30 +++++++++++------------- nsqd/range_set.go | 9 +++++-- nsqd/topic.go | 52 +++++++++++++++++++++++++++++++++++++---- 7 files changed, 123 insertions(+), 57 deletions(-) delete mode 100644 nsqd/dqname.go delete mode 100644 nsqd/dqname_windows.go diff --git a/nsqd/channel.go b/nsqd/channel.go index 87affd9d6..ba7faeaaf 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -2,13 +2,20 @@ package nsqd import ( "container/heap" + "encoding/json" "errors" + "fmt" + "io/ioutil" "math" + "math/rand" + "os" + "path" "sync" "sync/atomic" "time" "github.com/mreiferson/wal" + "github.com/nsqio/nsq/internal/atomic_rename" "github.com/nsqio/nsq/internal/pqueue" "github.com/nsqio/nsq/internal/quantile" ) @@ -70,7 +77,7 @@ type Channel struct { } // NewChannel creates a new instance of the Channel type and returns a pointer -func NewChannel(topic *Topic, channelName string, cursor wal.Cursor, ctx *context) *Channel { +func NewChannel(topic *Topic, channelName string, ctx *context) *Channel { c := &Channel{ topic: topic, name: channelName, @@ -79,7 +86,6 @@ func NewChannel(topic *Topic, channelName string, cursor wal.Cursor, ctx *contex exitChan: make(chan int), clients: make(map[int64]Consumer), ctx: ctx, - cursor: cursor, } if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { c.e2eProcessingLatencyStream = quantile.New( @@ -88,6 +94,26 @@ func NewChannel(topic *Topic, channelName string, cursor wal.Cursor, ctx *contex ) } + fn := fmt.Sprintf(path.Join(ctx.nsqd.getOpts().DataPath, "meta.%s;%s.dat"), topic.name, c.name) + data, err := ioutil.ReadFile(fn) + if err != nil { + if !os.IsNotExist(err) { + c.ctx.nsqd.logf("ERROR: failed to read channel metadata from %s - %s", fn, err) + } + } else { + err := json.Unmarshal(data, &c.rs) + if err != nil { + c.ctx.nsqd.logf("ERROR: failed to decode channel metadata - %s", err) + } + } + + var startIdx uint64 + if c.rs.Len() > 0 { + startIdx = uint64(c.rs.Ranges[0].High) + 1 + } + cursor, _ := c.topic.wal.GetCursor(startIdx) + c.cursor = cursor + c.initPQ() go c.messagePump() @@ -208,8 +234,27 @@ func (c *Channel) flush() error { } finish: - // TODO: (WAL) flush cursor state - return nil + data, err := json.Marshal(&c.rs) + if err != nil { + return err + } + + fn := fmt.Sprintf(path.Join(c.ctx.nsqd.getOpts().DataPath, "meta.%s;%s.dat"), c.topic.name, c.name) + tmpFn := fmt.Sprintf("%s.%d.tmp", fn, rand.Int()) + f, err := os.OpenFile(tmpFn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + + _, err = f.Write(data) + if err != nil { + f.Close() + return err + } + f.Sync() + f.Close() + + return atomic_rename.Rename(tmpFn, fn) } func (c *Channel) Depth() uint64 { diff --git a/nsqd/dqname.go b/nsqd/dqname.go deleted file mode 100644 index 54350086f..000000000 --- a/nsqd/dqname.go +++ /dev/null @@ -1,9 +0,0 @@ -// +build !windows - -package nsqd - -func getBackendName(topicName, channelName string) string { - // backend names, for uniqueness, automatically include the topic... : - backendName := topicName + ":" + channelName - return backendName -} diff --git a/nsqd/dqname_windows.go b/nsqd/dqname_windows.go deleted file mode 100644 index 22f4323c1..000000000 --- a/nsqd/dqname_windows.go +++ /dev/null @@ -1,10 +0,0 @@ -// +build windows - -package nsqd - -// On Windows, file names cannot contain colons. -func getBackendName(topicName, channelName string) string { - // backend names, for uniqueness, automatically include the topic... ; - backendName := topicName + ";" + channelName - return backendName -} diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 1af845e53..56f9f6b63 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -292,7 +292,7 @@ func (n *NSQD) LoadMetadata() { data, err := ioutil.ReadFile(fn) if err != nil { if !os.IsNotExist(err) { - n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err) + n.logf("ERROR: failed to read nsqd metadata from %s - %s", fn, err) } return } @@ -328,8 +328,8 @@ func (n *NSQD) LoadMetadata() { func (n *NSQD) PersistMetadata() error { // persist metadata about what topics/channels we have // so that upon restart we can get back to the same state - fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) - n.logf("NSQ: persisting topic/channel metadata to %s", fileName) + fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + n.logf("NSQ: persisting topic/channel metadata to %s", fn) js := make(map[string]interface{}) topics := []interface{}{} @@ -366,8 +366,8 @@ func (n *NSQD) PersistMetadata() error { return err } - tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) - f, err := os.OpenFile(tmpFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + tmpFn := fmt.Sprintf("%s.%d.tmp", fn, rand.Int()) + f, err := os.OpenFile(tmpFn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { return err } @@ -380,12 +380,7 @@ func (n *NSQD) PersistMetadata() error { f.Sync() f.Close() - err = atomic_rename.Rename(tmpFileName, fileName) - if err != nil { - return err - } - - return nil + return atomic_rename.Rename(tmpFn, fn) } func (n *NSQD) Exit() { diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index fdf74e82e..63b4473cb 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "net" "os" "path" @@ -83,7 +82,7 @@ func TestStartup(t *testing.T) { doneExitChan := make(chan int) opts := NewOptions() - opts.Logger = log.New(os.Stderr, "", log.LstdFlags) + opts.Logger = newTestLogger(t) opts.MemQueueSize = 100 opts.MaxBytesPerFile = 10240 _, _, nsqd := mustStartNSQD(opts) @@ -117,30 +116,28 @@ func TestStartup(t *testing.T) { topic.Pub([][]byte{body}) } - log.Printf("pulling from channel") + t.Logf("pulling from channel") channel1 := topic.GetChannel("ch1") - log.Printf("ch1 depth: %d", channel1.Depth()) + t.Logf("ch1 depth: %d", channel1.Depth()) - log.Printf("reading %d msgs", iterations/2) + t.Logf("reading %d msgs", iterations/2) for i := 0; i < iterations/2; i++ { msg := <-channel1.clientMsgChan - log.Printf("read message %d", i+1) + channel1.StartInFlightTimeout(msg, 0, time.Second*60) + channel1.FinishMessage(0, msg.ID) + t.Logf("read message %d", i+1) equal(t, msg.Body, body) } - // for { - // if channel1.Depth() == int64(iterations/2) { - // break - // } - // time.Sleep(50 * time.Millisecond) - // } - // make sure metadata shows the topic m, err = getMetadata(nsqd) equal(t, err, nil) equal(t, len(m.Topics), 1) equal(t, m.Topics[0].Name, topicName) equal(t, err, nil) + observedChannelName, err := metaData.Get("topics").GetIndex(0).Get("channels").GetIndex(0).Get("name").String() + equal(t, observedChannelName, "ch1") + equal(t, err, nil) exitChan <- 1 <-doneExitChan @@ -176,13 +173,14 @@ func TestStartup(t *testing.T) { // read the other half of the messages for i := 0; i < iterations/2; i++ { msg := <-channel1.clientMsgChan - log.Printf("read message %d", i+1) + channel1.StartInFlightTimeout(msg, 0, time.Second*60) + channel1.FinishMessage(0, msg.ID) + t.Logf("read message %d", i+1) equal(t, msg.Body, body) } // verify we drained things - // equal(t, len(topic.memoryMsgChan), 0) - // equal(t, topic.wal.Depth(), int64(0)) + equal(t, channel1.Depth(), uint64(0)) exitChan <- 1 <-doneExitChan diff --git a/nsqd/range_set.go b/nsqd/range_set.go index 1c15e39bb..429a6ecd4 100644 --- a/nsqd/range_set.go +++ b/nsqd/range_set.go @@ -5,11 +5,12 @@ import ( ) type Range struct { - Low, High int64 + Low int64 `json:"low"` + High int64 `json:"high"` } type RangeSet struct { - Ranges []Range + Ranges []Range `json:"ranges"` } func (rs *RangeSet) AddInts(nums ...int64) { @@ -181,6 +182,10 @@ func (rs *RangeSet) contains(num int64) bool { return false } +func (rs *RangeSet) Len() int { + return len(rs.Ranges) +} + // helpers func contains(r Range, num int64) bool { diff --git a/nsqd/topic.go b/nsqd/topic.go index 0b0f3d972..465a80b99 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -1,12 +1,19 @@ package nsqd import ( + "encoding/json" "errors" + "fmt" + "io/ioutil" + "math/rand" + "os" + "path" "strings" "sync" "sync/atomic" "github.com/mreiferson/wal" + "github.com/nsqio/nsq/internal/atomic_rename" "github.com/nsqio/nsq/internal/quantile" ) @@ -54,6 +61,19 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi ctx.nsqd.getOpts().Logger) } + fn := fmt.Sprintf(path.Join(ctx.nsqd.getOpts().DataPath, "meta.%s.dat"), t.name) + data, err := ioutil.ReadFile(fn) + if err != nil { + if !os.IsNotExist(err) { + t.ctx.nsqd.logf("ERROR: failed to read topic metadata from %s - %s", fn, err) + } + } else { + err := json.Unmarshal(data, &t.rs) + if err != nil { + t.ctx.nsqd.logf("ERROR: failed to decode topic metadata - %s", err) + } + } + t.ctx.nsqd.Notify(t) return t @@ -78,9 +98,7 @@ func (t *Topic) GetChannel(channelName string) *Channel { func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { channel, ok := t.channelMap[channelName] if !ok { - // TODO: (WAL) bootstrap idx - cursor, _ := t.wal.GetCursor(0) - channel = NewChannel(t, channelName, cursor, t.ctx) + channel = NewChannel(t, channelName, t.ctx) t.channelMap[channelName] = channel t.ctx.nsqd.logf("TOPIC(%s): new channel(%s)", t.name, channel.name) return channel, true @@ -134,8 +152,7 @@ func (t *Topic) Pub(data [][]byte) error { if err != nil { return err } - t.rs.AddRange(Range{int64(startIdx), int64(endIdx)}) - t.ctx.nsqd.logf("range: %+v (%d)", t.rs, t.rs.Count()) + t.rs.AddRange(Range{High: int64(startIdx), Low: int64(endIdx)}) atomic.AddUint64(&t.messageCount, uint64(len(data))) return nil } @@ -197,6 +214,31 @@ func (t *Topic) exit(deleted bool) error { } } + data, err := json.Marshal(&t.rs) + if err != nil { + return err + } + + fn := fmt.Sprintf(path.Join(t.ctx.nsqd.getOpts().DataPath, "meta.%s.dat"), t.name) + tmpFn := fmt.Sprintf("%s.%d.tmp", fn, rand.Int()) + f, err := os.OpenFile(tmpFn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + + _, err = f.Write(data) + if err != nil { + f.Close() + return err + } + f.Sync() + f.Close() + + err = atomic_rename.Rename(tmpFn, fn) + if err != nil { + return err + } + return t.wal.Close() }