Skip to content

Commit

Permalink
persist rangeset topic/channel state
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Sep 17, 2015
1 parent 4022eba commit f64364c
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 57 deletions.
53 changes: 49 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package nsqd

import (
"container/heap"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"sync"
"sync/atomic"
"time"

"github.com/mreiferson/wal"
"github.com/nsqio/nsq/internal/atomic_rename"
"github.com/nsqio/nsq/internal/pqueue"
"github.com/nsqio/nsq/internal/quantile"
)
Expand Down Expand Up @@ -70,7 +77,7 @@ type Channel struct {
}

// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topic *Topic, channelName string, cursor wal.Cursor, ctx *context) *Channel {
func NewChannel(topic *Topic, channelName string, ctx *context) *Channel {
c := &Channel{
topic: topic,
name: channelName,
Expand All @@ -79,7 +86,6 @@ func NewChannel(topic *Topic, channelName string, cursor wal.Cursor, ctx *contex
exitChan: make(chan int),
clients: make(map[int64]Consumer),
ctx: ctx,
cursor: cursor,
}
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
c.e2eProcessingLatencyStream = quantile.New(
Expand All @@ -88,6 +94,26 @@ func NewChannel(topic *Topic, channelName string, cursor wal.Cursor, ctx *contex
)
}

fn := fmt.Sprintf(path.Join(ctx.nsqd.getOpts().DataPath, "meta.%s;%s.dat"), topic.name, c.name)
data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
c.ctx.nsqd.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
}
} else {
err := json.Unmarshal(data, &c.rs)
if err != nil {
c.ctx.nsqd.logf("ERROR: failed to decode channel metadata - %s", err)
}
}

var startIdx uint64
if c.rs.Len() > 0 {
startIdx = uint64(c.rs.Ranges[0].High) + 1
}
cursor, _ := c.topic.wal.GetCursor(startIdx)
c.cursor = cursor

c.initPQ()

go c.messagePump()
Expand Down Expand Up @@ -208,8 +234,27 @@ func (c *Channel) flush() error {
}

finish:
// TODO: (WAL) flush cursor state
return nil
data, err := json.Marshal(&c.rs)
if err != nil {
return err
}

fn := fmt.Sprintf(path.Join(c.ctx.nsqd.getOpts().DataPath, "meta.%s;%s.dat"), c.topic.name, c.name)
tmpFn := fmt.Sprintf("%s.%d.tmp", fn, rand.Int())
f, err := os.OpenFile(tmpFn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}

_, err = f.Write(data)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()

return atomic_rename.Rename(tmpFn, fn)
}

func (c *Channel) Depth() uint64 {
Expand Down
9 changes: 0 additions & 9 deletions nsqd/dqname.go

This file was deleted.

10 changes: 0 additions & 10 deletions nsqd/dqname_windows.go

This file was deleted.

17 changes: 6 additions & 11 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (n *NSQD) LoadMetadata() {
data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
n.logf("ERROR: failed to read nsqd metadata from %s - %s", fn, err)
}
return
}
Expand Down Expand Up @@ -328,8 +328,8 @@ func (n *NSQD) LoadMetadata() {
func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have
// so that upon restart we can get back to the same state
fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
n.logf("NSQ: persisting topic/channel metadata to %s", fileName)
fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
n.logf("NSQ: persisting topic/channel metadata to %s", fn)

js := make(map[string]interface{})
topics := []interface{}{}
Expand Down Expand Up @@ -366,8 +366,8 @@ func (n *NSQD) PersistMetadata() error {
return err
}

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
f, err := os.OpenFile(tmpFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
tmpFn := fmt.Sprintf("%s.%d.tmp", fn, rand.Int())
f, err := os.OpenFile(tmpFn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
Expand All @@ -380,12 +380,7 @@ func (n *NSQD) PersistMetadata() error {
f.Sync()
f.Close()

err = atomic_rename.Rename(tmpFileName, fileName)
if err != nil {
return err
}

return nil
return atomic_rename.Rename(tmpFn, fn)
}

func (n *NSQD) Exit() {
Expand Down
30 changes: 14 additions & 16 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path"
Expand Down Expand Up @@ -83,7 +82,7 @@ func TestStartup(t *testing.T) {
doneExitChan := make(chan int)

opts := NewOptions()
opts.Logger = log.New(os.Stderr, "", log.LstdFlags)
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
opts.MaxBytesPerFile = 10240
_, _, nsqd := mustStartNSQD(opts)
Expand Down Expand Up @@ -117,30 +116,28 @@ func TestStartup(t *testing.T) {
topic.Pub([][]byte{body})
}

log.Printf("pulling from channel")
t.Logf("pulling from channel")
channel1 := topic.GetChannel("ch1")
log.Printf("ch1 depth: %d", channel1.Depth())
t.Logf("ch1 depth: %d", channel1.Depth())

log.Printf("reading %d msgs", iterations/2)
t.Logf("reading %d msgs", iterations/2)
for i := 0; i < iterations/2; i++ {
msg := <-channel1.clientMsgChan
log.Printf("read message %d", i+1)
channel1.StartInFlightTimeout(msg, 0, time.Second*60)
channel1.FinishMessage(0, msg.ID)
t.Logf("read message %d", i+1)
equal(t, msg.Body, body)
}

// for {
// if channel1.Depth() == int64(iterations/2) {
// break
// }
// time.Sleep(50 * time.Millisecond)
// }

// make sure metadata shows the topic
m, err = getMetadata(nsqd)
equal(t, err, nil)
equal(t, len(m.Topics), 1)
equal(t, m.Topics[0].Name, topicName)
equal(t, err, nil)
observedChannelName, err := metaData.Get("topics").GetIndex(0).Get("channels").GetIndex(0).Get("name").String()
equal(t, observedChannelName, "ch1")
equal(t, err, nil)

exitChan <- 1
<-doneExitChan
Expand Down Expand Up @@ -176,13 +173,14 @@ func TestStartup(t *testing.T) {
// read the other half of the messages
for i := 0; i < iterations/2; i++ {
msg := <-channel1.clientMsgChan
log.Printf("read message %d", i+1)
channel1.StartInFlightTimeout(msg, 0, time.Second*60)
channel1.FinishMessage(0, msg.ID)
t.Logf("read message %d", i+1)
equal(t, msg.Body, body)
}

// verify we drained things
// equal(t, len(topic.memoryMsgChan), 0)
// equal(t, topic.wal.Depth(), int64(0))
equal(t, channel1.Depth(), uint64(0))

exitChan <- 1
<-doneExitChan
Expand Down
9 changes: 7 additions & 2 deletions nsqd/range_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
)

type Range struct {
Low, High int64
Low int64 `json:"low"`
High int64 `json:"high"`
}

type RangeSet struct {
Ranges []Range
Ranges []Range `json:"ranges"`
}

func (rs *RangeSet) AddInts(nums ...int64) {
Expand Down Expand Up @@ -181,6 +182,10 @@ func (rs *RangeSet) contains(num int64) bool {
return false
}

func (rs *RangeSet) Len() int {
return len(rs.Ranges)
}

// helpers

func contains(r Range, num int64) bool {
Expand Down
52 changes: 47 additions & 5 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package nsqd

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"strings"
"sync"
"sync/atomic"

"github.com/mreiferson/wal"
"github.com/nsqio/nsq/internal/atomic_rename"
"github.com/nsqio/nsq/internal/quantile"
)

Expand Down Expand Up @@ -54,6 +61,19 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
ctx.nsqd.getOpts().Logger)
}

fn := fmt.Sprintf(path.Join(ctx.nsqd.getOpts().DataPath, "meta.%s.dat"), t.name)
data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
t.ctx.nsqd.logf("ERROR: failed to read topic metadata from %s - %s", fn, err)
}
} else {
err := json.Unmarshal(data, &t.rs)
if err != nil {
t.ctx.nsqd.logf("ERROR: failed to decode topic metadata - %s", err)
}
}

t.ctx.nsqd.Notify(t)

return t
Expand All @@ -78,9 +98,7 @@ func (t *Topic) GetChannel(channelName string) *Channel {
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
channel, ok := t.channelMap[channelName]
if !ok {
// TODO: (WAL) bootstrap idx
cursor, _ := t.wal.GetCursor(0)
channel = NewChannel(t, channelName, cursor, t.ctx)
channel = NewChannel(t, channelName, t.ctx)
t.channelMap[channelName] = channel
t.ctx.nsqd.logf("TOPIC(%s): new channel(%s)", t.name, channel.name)
return channel, true
Expand Down Expand Up @@ -134,8 +152,7 @@ func (t *Topic) Pub(data [][]byte) error {
if err != nil {
return err
}
t.rs.AddRange(Range{int64(startIdx), int64(endIdx)})
t.ctx.nsqd.logf("range: %+v (%d)", t.rs, t.rs.Count())
t.rs.AddRange(Range{High: int64(startIdx), Low: int64(endIdx)})
atomic.AddUint64(&t.messageCount, uint64(len(data)))
return nil
}
Expand Down Expand Up @@ -197,6 +214,31 @@ func (t *Topic) exit(deleted bool) error {
}
}

data, err := json.Marshal(&t.rs)
if err != nil {
return err
}

fn := fmt.Sprintf(path.Join(t.ctx.nsqd.getOpts().DataPath, "meta.%s.dat"), t.name)
tmpFn := fmt.Sprintf("%s.%d.tmp", fn, rand.Int())
f, err := os.OpenFile(tmpFn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}

_, err = f.Write(data)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()

err = atomic_rename.Rename(tmpFn, fn)
if err != nil {
return err
}

return t.wal.Close()
}

Expand Down

0 comments on commit f64364c

Please sign in to comment.