diff --git a/health/ALGORITHM.md b/health/ALGORITHM.md new file mode 100644 index 0000000000..a61583370f --- /dev/null +++ b/health/ALGORITHM.md @@ -0,0 +1,148 @@ +# The ALLCALL leader election algorithm. + +Jason E. Aten + +February 2017 + +definition, with example value. +----------- + +Let heartBeat = 3 sec. This is how frequently +we will assess cluster health by +sending out an allcall ping. + +givens +-------- + +* Given: Let each server have a numeric integer rank, that is distinct +and unique to that server. If necessary an extremely long +true random number is used to break ties between server ranks, so +that we may assert, with probability 1, that all ranks are distinct. +Server can be put into a strict total order. + +* Rule: The lower the rank is preferred for being the leader. + + +ALLCALL Algorithm +=========================== + +### I. In a continuous loop + +The server always accepts and respond +to allcall broadcasts from other cluster members. + +The allcall() ping asks each member of +the server cluster to reply. Replies +provide the responder's own assigned rank +and identity. + +### II. Election + +Election are computed locally, after accumulating +responses. + +After issuing an allcall, the server +listens for a heartbeat interval. + +At the end of the interval, it sorts +the respondents by rank. Since +the replies are on a broadcast +channel, more than one simultaneous +allcall reply may be incorporated into +the set of respondents. + +The lowest ranking server is elected +as leader. + +## Safety/Convergence: ALLCALL converges to one leader + +Suppose two nodes are partitioned and so both are leaders on +their own side of the network. Then suppose the network +is joined again, so the two leaders are brought together +by a healing of the network, or by adding a new link +between the networks. The two nodes exchange Ids and +ranks via responding to allcalls, and compute +who is the new leader. + +Hence the two leader situation persists for at +most one heartbeat term after the network join. + +## Liveness: a leader will be chosen + +Given the total order among nodes, exactly one +will be lowest rank and thus be the preferred +leader. If the leader fails, the next +heartbeat of allcall will omit that +server from the candidate list, and +the next ranking server will be chosen. + +Hence, with at least one live +node, the system can run for at most one +heartbeat term before electing a leader. +Since there is a total order on +all live (non-failed) servers, only +one will be chosen. + +## commentary + +ALLCALL does not guarantee that there will +never be more than one leader. Availability +in the face of network partition is +desirable in many cases, and ALLCALL is +appropriate for these. This is congruent +with Nats design as an always-on system. + +ALLCALL does not guarantee that a +leader will always be present, but +with live nodes it does provide +that the cluster will have a leader +after one heartbeat term has +been initiated and completed. + +By design, ALLCALL functions well +in a cluster with any number of nodes. +One and two nodes, or an even number +of nodes, will work just fine. + +Compared to quorum based elections +like raft and paxos, where an odd +number of at least three +nodes is required to make progress, +this can be very desirable. + +ALLCALL is appropriate for AP, +rather than CP, style systems, where +availability is more important +than having a single writer. When +writes are idempotent or deduplicated +downstream, this is typically preferred. + +prior art +---------- + +ALLCALL is a simplified version of +the well known Bully Algorithm[1][2] +for election in a distributed system +of arbitrary graph. + +ALLCALL does less bullying, and lets +nodes arrive at their own conclusions. +The essential broadcast and ranking +mechanism, however, is identical. + +[1] Hector Garcia-Molina, Elections in a +Distributed Computing System, IEEE +Transactions on Computers, +Vol. C-31, No. 1, January (1982) 48–59 + +[2] https://en.wikipedia.org/wiki/Bully_algorithm + +implementation +------------ + +ALLCALL is implented on top of +the Nats (https://nats.io) system, see the health/ +subdirectory of + +https://github.com/nats-io/gnatsd + diff --git a/health/LICENSE b/health/LICENSE new file mode 100644 index 0000000000..53e972301e --- /dev/null +++ b/health/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2017 Jason E. Aten + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/health/agent.go b/health/agent.go new file mode 100644 index 0000000000..d29e04d988 --- /dev/null +++ b/health/agent.go @@ -0,0 +1,69 @@ +package health + +import ( + "fmt" + "net" + "time" + + "github.com/nats-io/gnatsd/server" +) + +// Agent implements the InternalClient interface. +// It provides health status checks and +// leader election from among the candidate +// gnatsd instances in a cluster. +type Agent struct { + opts *server.Options + mship *Membership +} + +// NewAgent makes a new Agent. +func NewAgent(opts *server.Options) *Agent { + return &Agent{ + opts: opts, + } +} + +// Name should identify the internal client for logging. +func (h *Agent) Name() string { + return "health-agent" +} + +// Start makes an internal +// entirely in-process client that monitors +// cluster health and manages group +// membership functions. +// +func (h *Agent) Start( + info server.Info, + opts server.Options, + logger server.Logger, + +) (net.Conn, error) { + + cli, srv, err := NewInternalClientPair() + if err != nil { + return nil, fmt.Errorf("NewInternalClientPair() returned error: %s", err) + } + + rank := opts.HealthRank + beat := opts.HealthBeat + lease := opts.HealthLease + + cfg := &MembershipCfg{ + MaxClockSkew: time.Second, + BeatDur: beat, + LeaseTime: lease, + MyRank: rank, + CliConn: cli, + Log: logger, + } + h.mship = NewMembership(cfg) + go h.mship.Start() + return srv, nil +} + +// Stop halts the background goroutine. +func (h *Agent) Stop() { + h.mship.Stop() +} diff --git a/health/aloc.go b/health/aloc.go new file mode 100644 index 0000000000..d0999001d3 --- /dev/null +++ b/health/aloc.go @@ -0,0 +1,89 @@ +package health + +import ( + "encoding/json" + "os" + "time" + + "github.com/nats-io/go-nats" +) + +// AgentLoc conveys to interested parties +// the Id and location of one gnatsd +// server in the cluster. +type AgentLoc struct { + ID string `json:"serverId"` + Host string `json:"host"` + Port int `json:"port"` + + // Are we the leader? + IsLeader bool `json:"leader"` + + // LeaseExpires is zero for any + // non-leader. For the leader, + // LeaseExpires tells you when + // the leaders lease expires. + LeaseExpires time.Time `json:"leaseExpires"` + + // lower rank is leader until lease + // expires. Ties are broken by ID. + // Rank should be assignable on the + // gnatsd command line with -rank to + // let the operator prioritize + // leadership for certain hosts. + Rank int `json:"rank"` + + // Pid or process id is the only + // way to tell apart two processes + // sometimes, if they share the + // same nats server. + // + // Pid is the one difference between + // a nats.ServerLoc and a health.AgentLoc. + // + Pid int `json:"pid"` +} + +func (s *AgentLoc) String() string { + by, err := json.Marshal(s) + panicOn(err) + return string(by) +} + +func (s *AgentLoc) fromBytes(by []byte) error { + return json.Unmarshal(by, s) +} + +func alocEqual(a, b *AgentLoc) bool { + aless := AgentLocLessThan(a, b) + bless := AgentLocLessThan(b, a) + return !aless && !bless +} + +func slocEqualIgnoreLease(a, b *AgentLoc) bool { + a0 := *a + b0 := *b + a0.LeaseExpires = time.Time{} + a0.IsLeader = false + b0.LeaseExpires = time.Time{} + b0.IsLeader = false + + aless := AgentLocLessThan(&a0, &b0) + bless := AgentLocLessThan(&b0, &a0) + return !aless && !bless +} + +// the 2 types should be kept in sync. +// We return a brand new &AgentLoc{} +// with contents filled from loc. +func natsLocConvert(loc *nats.ServerLoc) *AgentLoc { + return &AgentLoc{ + ID: loc.ID, + Host: loc.Host, + Port: loc.Port, + IsLeader: loc.IsLeader, + LeaseExpires: loc.LeaseExpires, + Rank: loc.Rank, + Pid: os.Getpid(), + } +} diff --git a/health/btree.go b/health/btree.go new file mode 100644 index 0000000000..f5562591f9 --- /dev/null +++ b/health/btree.go @@ -0,0 +1,161 @@ +package health + +import ( + "bytes" + "fmt" + "sync" + + "github.com/google/btree" +) + +// ranktree is an in-memory, sorted, +// balanced tree that is implemented +// as a left-leaning red-black tree. +// It holds AgentLoc +// from candidate servers in the cluster, +// sorting them based on +// AgentLocLessThan() so they are in +// priority order and deduplicated. +type ranktree struct { + *btree.BTree + tex sync.Mutex +} + +// Less is how the agents are sorted. +// The Less method is expected by the btree. +// See the AgentLocLessThan for standalone +// implementation. +func (a AgentLoc) Less(than btree.Item) bool { + b := than.(AgentLoc) + return AgentLocLessThan(&a, &b) +} + +// insert is idemopotent so it is safe +// to insert the same sloc multiple times and +// duplicates will be ignored. +func (t *ranktree) insert(j AgentLoc) { + t.tex.Lock() + t.ReplaceOrInsert(j) + t.tex.Unlock() +} + +// present locks, Has does not. +func (t *ranktree) present(j AgentLoc) bool { + t.tex.Lock() + b := t.Has(j) + t.tex.Unlock() + return b +} + +func (t *ranktree) minrank() (min AgentLoc) { + t.tex.Lock() + min = t.Min().(AgentLoc) + t.tex.Unlock() + + return +} + +func (t *ranktree) deleteSloc(j AgentLoc) { + t.tex.Lock() + t.Delete(j) + t.tex.Unlock() +} + +func newRanktree() *ranktree { + return &ranktree{ + BTree: btree.New(2), + } +} + +func (t *ranktree) String() string { + t.tex.Lock() + + s := "[" + t.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + cur := item.(AgentLoc) + s += cur.String() + "," + return true + }) + t.tex.Unlock() + + // replace last comma with matching bracket + n := len(s) + if n > 1 { + s = s[:n-1] + } + return s + "]" +} + +func (t *ranktree) clone() *ranktree { + r := newRanktree() + t.tex.Lock() + + t.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + cur := item.(AgentLoc) + r.insert(cur) + return true + }) + t.tex.Unlock() + return r +} + +func (t *ranktree) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%s", t) + return buf.Bytes(), nil +} + +// Len() is inherted from the btree, +// but isn't protected by mutex. Use +// size to avoid races. +func (t *ranktree) size() int { + t.tex.Lock() + n := t.Len() + t.tex.Unlock() + return n +} + +// return a minus b, where a and b are sets. +func setDiff(a, b *members) *members { + + res := a.DedupTree.clone() + a.DedupTree.tex.Lock() + b.DedupTree.tex.Lock() + + b.DedupTree.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + v := item.(AgentLoc) + res.deleteSloc(v) + return true // keep iterating + }) + + b.DedupTree.tex.Unlock() + a.DedupTree.tex.Unlock() + return &members{DedupTree: res} +} + +func setsEqual(a, b *members) bool { + a.DedupTree.tex.Lock() + b.DedupTree.tex.Lock() + defer b.DedupTree.tex.Unlock() + defer a.DedupTree.tex.Unlock() + + alen := a.DedupTree.Len() + if alen != b.DedupTree.Len() { + return false + } + // INVAR: len(a) == len(b) + if alen == 0 { + return true + } + + missing := false + a.DedupTree.AscendLessThan(AgentLoc{}, func(item btree.Item) bool { + v := item.(AgentLoc) + if !b.DedupTree.Has(v) { + missing = true + return false // stop iterating + } + return true // keep iterating + }) + return !missing +} diff --git a/health/btree_test.go b/health/btree_test.go new file mode 100644 index 0000000000..f214821c10 --- /dev/null +++ b/health/btree_test.go @@ -0,0 +1,66 @@ +package health + +import ( + "testing" +) + +func Test201BtreeInsertDisplay(t *testing.T) { + s1 := AgentLoc{ID: "abc"} + s2 := AgentLoc{ID: "xyz"} + r := newRanktree() + r.insert(s2) + r.insert(s1) + + sz := r.size() + if sz != 2 { + t.Fatalf("expected 2, saw sz=%v", sz) + } + s := r.String() + if s == "[]" { + t.Fatalf("missing serialization of set elements") + } + expect := `[{"serverId":"abc","host":"","port":0,"leader":false,"leaseExpires":"0001-01-01T00:00:00Z","rank":0,"pid":0},{"serverId":"xyz","host":"","port":0,"leader":false,"leaseExpires":"0001-01-01T00:00:00Z","rank":0,"pid":0}]` + if s != expect { + t.Fatalf("serial json didn't match expectations.\n expect:'%s'\n\n observe:'%s'", expect, s) + } +} + +func Test202BtreeEqual(t *testing.T) { + s1 := AgentLoc{ID: "abc"} + s2 := AgentLoc{ID: "xyz"} + r := newRanktree() + r.insert(s2) + r.insert(s1) + + s := r.clone() + same := setsEqual(&members{DedupTree: s}, &members{DedupTree: r}) + if !same { + t.Fatalf("expected setsEqual to be true") + } +} + +func Test203SetDiff(t *testing.T) { + s1 := AgentLoc{ID: "abc"} + s2 := AgentLoc{ID: "def"} + s3 := AgentLoc{ID: "ghi"} + s4 := AgentLoc{ID: "jkl"} + + r1 := newRanktree() + r1.insert(s1) + r1.insert(s2) + r1.insert(s3) + r1.insert(s4) + + r2 := newRanktree() + r2.insert(s1) + r2.insert(s2) + + diff := setDiff(&members{DedupTree: r1}, &members{DedupTree: r2}) + if diff.DedupTree.size() != 2 { + t.Fatalf("setdiff was not the right size") + } + x := diff.DedupTree.minrank() + if !alocEqual(&x, &s3) { + t.Fatalf("setdiff was not the right element") + } +} diff --git a/health/config.go b/health/config.go new file mode 100644 index 0000000000..f089e30132 --- /dev/null +++ b/health/config.go @@ -0,0 +1,96 @@ +package health + +import ( + "log" + "net" + "time" + + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" +) + +// deafTrue means the node will won't ping or pong. +const deafTrue = 1 + +// deafFalse means there is no simulated network +// parition, and pings/pings proceed normally. +const deafFalse = 0 + +// MembershipCfg configures the +// Membership service, which is +// the health-agent implementation. +type MembershipCfg struct { + + // max we allow for clocks to be out of sync. + // default to 1 second if not set. + MaxClockSkew time.Duration + + // how often we heartbeat. defaults to 100msec + // if not set. + BeatDur time.Duration + + // NatsURL example "nats://127.0.0.1:4222" + NatsURL string + + // defaults to "_nats.cluster.members." + SysMemberPrefix string + + // LeaseTime is the minimum time the + // leader is elected for. Defaults to 10 sec. + LeaseTime time.Duration + + // provide a default until the server gives us rank + MyRank int + + // optional, if provided we will use this connection on + // the client side. + CliConn net.Conn + + // where we log stuff. + Log server.Logger + + // for testing under network partition + deaf int64 + + // how much history to save + historyCount int +} + +// SetDefaults fills in default values. +func (cfg *MembershipCfg) SetDefaults() { + if cfg.LeaseTime == 0 { + cfg.LeaseTime = time.Second * 12 + } + if cfg.SysMemberPrefix == "" { + cfg.SysMemberPrefix = "_nats.cluster.members." + } + if cfg.BeatDur == 0 { + cfg.BeatDur = 3000 * time.Millisecond + } + if cfg.MaxClockSkew == 0 { + cfg.MaxClockSkew = time.Second + } + if cfg.NatsURL == "" { + cfg.NatsURL = "nats://127.0.0.1:4222" + } + if cfg.Log == nil { + // stderr + cfg.Log = logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + } +} + +// constants controlling log levels: + +const colors = false +const micros, pid = true, true +const trace = false + +//const debug = true +const debug = false + +// Dial allows us to replace a client's dial of +// an external TCP socket with an already established +// internal TCP connection. +func (cfg *MembershipCfg) Dial(network, address string) (net.Conn, error) { + return cfg.CliConn, nil +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 0000000000..434ef897b0 --- /dev/null +++ b/health/health.go @@ -0,0 +1,945 @@ +package health + +import ( + "encoding/json" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/nats-io/go-nats" +) + +// sysMemberPrefix creates a namespace +// for system cluster membership communication. +// This prefix aims to avoid collisions +// with user-level topics. Only system +// processes / internal clients should +// write to these topics, but everyone +// is welcome to listen on them. +// +// note: `_nats` is for now, can easily +// changed to be `_SYS` later once +// we're sure everything is working. +// +const sysMemberPrefix = "_nats.cluster.members." + +// Membership tracks the nats server cluster +// membership, issuing health checks and +// choosing a leader. +type Membership struct { + Cfg MembershipCfg + + // the pongCollector holds + // all the pongs received in + // response to allcall pings + // in the most recent heartbeat + // session. + pc *pongCollector + + // actually elected leaders, should + // change only after a lease term. + elec *leadHolder + nc *nats.Conn + myLoc AgentLoc + pid int + + subjAllCall string + subjAllReply string + subjMemberLost string + subjMemberAdded string + subjMembership string + + halt *halter + mu sync.Mutex + stopping bool + + needReconnect chan bool +} + +func (m *Membership) trace(f string, arg ...interface{}) { + m.Cfg.Log.Tracef(fmt.Sprintf("my.Port:%v. ", m.myLoc.Port)+f, arg...) +} + +func (m *Membership) dlog(f string, arg ...interface{}) { + m.Cfg.Log.Debugf(fmt.Sprintf("my.Port:%v. ", m.myLoc.Port)+f, arg...) +} + +func (m *Membership) getMyLocWithAnyLease() AgentLoc { + m.mu.Lock() + myLoc := m.myLoc + m.mu.Unlock() + + lead := m.elec.getLeader() + if slocEqualIgnoreLease(&lead, &myLoc) { + myLoc.LeaseExpires = lead.LeaseExpires + myLoc.IsLeader = true + } + return myLoc +} + +func (m *Membership) getMyLocWithZeroLease() AgentLoc { + m.mu.Lock() + myLoc := m.myLoc + m.mu.Unlock() + myLoc.LeaseExpires = time.Time{} + myLoc.IsLeader = false + return myLoc +} + +// deaf means we don't ping or pong. +// It is used to simulate network +// partition and healing. +// +func (m *Membership) deaf() bool { + v := atomic.LoadInt64(&m.Cfg.deaf) + return v == deafTrue +} + +func (m *Membership) setDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, deafTrue) +} + +func (m *Membership) unDeaf() { + atomic.StoreInt64(&m.Cfg.deaf, deafFalse) +} + +// NewMembership creates a new Membership. +func NewMembership(cfg *MembershipCfg) *Membership { + m := &Membership{ + Cfg: *cfg, + halt: newHalter(), + pid: os.Getpid(), + // needReconnect should be sent on, not closed. + needReconnect: make(chan bool), + } + m.pc = m.newPongCollector() + m.elec = m.newLeadHolder(cfg.historyCount) + return m +} + +// leadHolder holds who is the current leader, +// and what their lease is. Used to synchronize +// access between various goroutines. +type leadHolder struct { + mu sync.Mutex + sloc AgentLoc + + myID string + myRank int + myLocHasBeenSet bool + + history *ringBuf + histsz int + + m *Membership +} + +func (m *Membership) newLeadHolder(histsz int) *leadHolder { + if histsz == 0 { + histsz = 100 + } + return &leadHolder{ + history: newRingBuf(histsz), + histsz: histsz, + m: m, + } +} + +func (e *leadHolder) setMyLoc(myLoc *AgentLoc) { + e.mu.Lock() + if e.myLocHasBeenSet { + panic("no double set!") + } + e.myLocHasBeenSet = true + e.myID = myLoc.ID + e.myRank = myLoc.Rank + e.mu.Unlock() +} + +// getLeader retreives the stored e.sloc value. +func (e *leadHolder) getLeader() AgentLoc { + e.mu.Lock() + defer e.mu.Unlock() + return e.sloc +} + +// setLeader aims to copy sloc and store it +// for future getLeader() calls to access. +// +// However we reject any attempt to replace +// a leader with a one that doesn't rank lower, where rank +// includes the LeaseExpires time +// (see the AgentLocLessThan() function). +// +// If we accept sloc +// we return slocWon true. If we reject sloc then +// we return slocWon false. In short, we will only +// accept sloc if AgentLocLessThan(sloc, e.sloc), +// and we return AgentLocLessThan(sloc, e.sloc). +// +// If we return slocWon false, alt contains the +// value we favored, which is the current value +// of our retained e.sloc. If we return true, +// then alt contains a copy of sloc. We +// return a value in alt to avoid data races. +// +func (e *leadHolder) setLeader(sloc AgentLoc, now time.Time) (slocWon bool, alt AgentLoc) { + e.mu.Lock() + defer e.mu.Unlock() + + if sloc.ID == "" { + e.m.trace(`setLeader returning false because sloc==nil or sloc.ID==""`) + return false, e.sloc + } + + // check on expired leases: any new leader must + // have a lease that is not expired. + nowu := now.UnixNano() + cure := e.sloc.LeaseExpires.UnixNano() + newe := sloc.LeaseExpires.UnixNano() + + curExpired := cure <= nowu + newExpired := newe <= nowu + bothExpired := curExpired && newExpired + neitherExpired := !curExpired && !newExpired + + var newWon, oldWon bool + + switch { + case bothExpired: + e.m.trace("22222 setLeader finds both expired") + return false, e.sloc + + case neitherExpired: + newWon = AgentLocLessThan(&sloc, &e.sloc) + oldWon = AgentLocLessThan(&e.sloc, &sloc) + + case newExpired: + e.m.trace("55555 setLeader is returning false because new has expired lease.") + return false, e.sloc + + case curExpired: + newWon = true + oldWon = false + e.m.trace("44444 setLeader finds old lease expired") + } + + switch { + case !newWon && !oldWon: + // they are equal, pick the longer lease + // so we allow lease renewal + if sloc.LeaseExpires.After(e.sloc.LeaseExpires) { + slocWon = true + alt = sloc + e.sloc = sloc + + e.m.trace("999999 setLeader: same leader, > lease, renewing lease for '%s'", &e.sloc) + } else { + slocWon = false + alt = e.sloc + + e.m.trace("000000 setLeader is failing to update the leader, rejecting the new contendor. sloc='%s' >= prev:'%s'", &sloc, &e.sloc) + } + case newWon: + slocWon = true + alt = sloc + e.sloc = sloc + + e.m.trace("11111 setLeader updated the leader, accepting new proposal. sloc='%s' < prev:'%s'", &sloc, &e.sloc) + + default: + //oldWon + slocWon = false + alt = e.sloc + } + + // update history + if slocWon { + histcp := sloc + e.history.Append(&histcp) + } + + return +} + +func (e *leadHolder) copyLeadHistory() *ringBuf { + e.mu.Lock() + r := e.history.clone() + e.mu.Unlock() + return r +} + +func (e *leadHolder) getLeaderAsBytes() []byte { + lead := e.getLeader() + by, err := json.Marshal(&lead) + panicOn(err) + return by +} + +// Stop blocks until the Membership goroutine +// acknowledges the shutdown request. +func (m *Membership) Stop() { + m.mu.Lock() + if m.stopping { + m.mu.Unlock() + return + } + m.stopping = true + m.mu.Unlock() + m.halt.ReqStop.Close() + <-m.halt.Done.Chan +} + +// Start launches the Membership goroutine. +func (m *Membership) Start() error { + + m.Cfg.SetDefaults() + + err := m.setupNatsClient() + if err != nil { + m.halt.Done.Close() + return err + } + go m.start() + return nil +} + +func (m *Membership) start() { + + nc := m.nc + pc := m.pc + + defer func() { + m.halt.Done.Close() + }() + + m.Cfg.Log.Debugf("health-agent: Listening on [%s]\n", m.subjAllCall) + + prevCount, curCount := 0, 0 + prevMember := newMembers() + var curMember *members + var curLead AgentLoc + + var err error + var now time.Time + var expired bool + var prevLead AgentLoc + var nextLeadReportTm time.Time + + k := -1 + for { + k++ + // NB: replies to an + // allcall will only update + // the pongCollectors.from set, + // and won't change + // what the current leader + // is in elec. + m.trace("issuing k-th (k=%v) allcall", k) + err = m.allcall() + if err != nil { + // err could be: "write on closed buffer" + // typically means we are shutting down. + + m.trace("health-agent: "+ + "error on allcall, "+ + "shutting down the "+ + "health-agent: %s", + err) + return + } + + m.trace("SLEEPING for a heartbeat of %v", m.Cfg.BeatDur) + select { + case <-time.After(m.Cfg.BeatDur): + // continue below, latest heartbeat session done. + case <-m.needReconnect: + err := m.setupNatsClient() + if err != nil { + m.Cfg.Log.Debugf("health-agent: "+ + "fatal error: could not reconnect to, "+ + "our url '%s', error: %s", + m.Cfg.NatsURL, err) + + m.halt.ReqStop.Close() + return + } + case <-m.halt.ReqStop.Chan: + return + } + lastSeenLead := m.elec.getLeader() + + // cur responses should be back by now + // and we can compare prev and cur. + curCount, curMember = pc.getSetAndClear() + now = time.Now().UTC() + m.trace("k-th (k=%v) before doing leaderLeaseCheck, curMember='%s'", k, curMember) + + expired, curLead = curMember.leaderLeaseCheck( + now, + m.Cfg.LeaseTime, + lastSeenLead, + m.Cfg.MaxClockSkew, + m, + ) + + if expired { + // record in our history + won, alt := m.elec.setLeader(curLead, now) + if !won { + m.trace("k-th (k=%v) round "+ + "conclusion of trying to "+ + "setLeader: rejected '%s'"+ + " in favor of '%s'", + k, curLead, &alt) + curLead = alt + } else { + m.trace("k-th (k=%v) round "+ + "conclusion of trying to "+ + "setLeader: accepted as "+ + "new lead '%s'", + k, curLead) + } + } + + // logging + loc, _ := m.getNatsServerLocation() + if loc != nil { + if loc.ID == curLead.ID && + curLead.Pid == m.pid { + + if now.After(nextLeadReportTm) || + prevLead.ID == "" || + prevLead.ID != curLead.ID { + + left := curLead.LeaseExpires.Sub(now) + m.dlog("health-agent: "+ + "I am LEAD, my ID: '%s', "+ + "rank %v port %v host %v "+ + "pid %v. lease expires "+ + "in %s", + loc.ID, + loc.Rank, + loc.Port, + loc.Host, + m.pid, + left) + + nextLeadReportTm = now.Add(left).Add(m.Cfg.MaxClockSkew) + } + } else { + if prevLead.ID == loc.ID && + prevLead.Pid == m.pid { + + m.dlog("health-agent: "+ + "I am no longer lead, "+ + "new LEAD is '%s', rank %v. "+ + "port %v host %v pid %v lease expires in %s", + curLead.ID, + curLead.Rank, + curLead.Port, + curLead.Host, + curLead.Pid, + curLead.LeaseExpires.Sub(now)) + + } else { + if nextLeadReportTm.IsZero() || + now.After(nextLeadReportTm) { + + left := curLead.LeaseExpires.Sub(now) + if curLead.ID == "" { + m.dlog("health-agent: "+ + "I am '%s'/rank=%v. "+ + "port %v. lead is unknown.", + m.myLoc.ID, + m.myLoc.Rank, + m.myLoc.Port) + + } else { + m.dlog("health-agent: "+ + "I am not lead. lead is '%s', "+ + "rank %v host %v port %v pid %v for %v", + curLead.ID, + curLead.Rank, + curLead.Host, + curLead.Port, + curLead.Pid, + left) + + } + nextLeadReportTm = now.Add(left).Add(m.Cfg.MaxClockSkew) + } + } + } + } + + lost := setDiff(prevMember, curMember) + gained := setDiff(curMember, prevMember) + same := setsEqual(prevMember, curMember) + + if same { + // nothing more to do. + // This is the common case when nothing changes. + } else { + lostBytes := lost.mustJSONBytes() + if !lost.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberLost, lostBytes) + // ignore errors on purpose; + // don't crash mid-health-report + // if at all possible. + } + } + gainedBytes := gained.mustJSONBytes() + if !gained.setEmpty() { + if !m.deaf() { + nc.Publish(m.subjMemberAdded, gainedBytes) + // same error approach as above. + } + } + } + if curCount < prevCount { + m.dlog("health-agent: ---- "+ + "PAGE PAGE PAGE!! we went "+ + "down a server, from %v -> %v. "+ + "lost: '%s'", + prevCount, + curCount, + lost) + + } else if curCount > prevCount && prevCount > 0 { + m.dlog("health-agent: ++++ "+ + "MORE ROBUSTNESS GAINED; "+ + "we went from %v -> %v. "+ + "gained: '%s'", + prevCount, + curCount, + gained) + + } + + if expired { + curBytes := curMember.mustJSONBytes() + if !m.deaf() { + nc.Publish(m.subjMembership, curBytes) + } + } + + // done with compare, now loop + prevCount = curCount + prevMember = curMember + prevLead = curLead + } +} + +func pong(nc *nats.Conn, subj string, msg []byte) { + nc.Publish(subj, msg) + nc.Flush() + // ignore errors, probably shutting down. + // e.g. "nats: connection closed on shutdown." + // or "nats: connection closed" +} + +// allcall sends out a health ping on the +// subjAllCall topic. +// +// The ping consists of sending the AgentLoc +// for the current leader, which provides lease +// and full contact info for the leader. +// +// This gives a round-trip connectivity check. +// +func (m *Membership) allcall() error { + lead := m.elec.getLeader() + m.trace("ISSUING ALLCALL on '%s' with leader '%s'\n", m.subjAllCall, &lead) + + leadby, err := json.Marshal(&lead) + panicOn(err) + + // allcall broadcasts the current leader + lease + return m.nc.PublishRequest(m.subjAllCall, m.subjAllReply, leadby) +} + +// pongCollector collects the responses +// from an allcall request. +type pongCollector struct { + replies int + + fromNoTime *members + + mu sync.Mutex + mship *Membership +} + +func (m *Membership) newPongCollector() *pongCollector { + return &pongCollector{ + fromNoTime: newMembers(), + mship: m, + } +} + +func (pc *pongCollector) insert(sloc AgentLoc) { + // insert into both our trees, one + // keeping the lease time, the other not. + cp := sloc + cp.LeaseExpires = time.Time{} + cp.IsLeader = false + pc.fromNoTime.insert(cp) +} + +// acumulate pong responses +func (pc *pongCollector) receivePong(msg *nats.Msg) { + pc.mu.Lock() + + pc.replies++ + + var loc AgentLoc + err := loc.fromBytes(msg.Data) + if err == nil { + pc.insert(loc) + } else { + panic(err) + } + pc.mship.trace("PONG COLLECTOR RECEIVED ALLCALL REPLY '%s'", &loc) + + pc.mu.Unlock() +} + +func (pc *pongCollector) clear() { + pc.mu.Lock() + pc.fromNoTime.clear() + pc.mu.Unlock() +} + +// getSet returns the count and set so far, then +// clears the set, emptying it, and then adding +// back just myLoc +func (pc *pongCollector) getSetAndClear() (int, *members) { + + mem := pc.fromNoTime.clone() + pc.clear() + + // we don't necessarily need to seed, + // since we'll hear our own allcall. + // But we may want to seed in case + // the connection to gnatsd is down + // This happens under test when we + // are deaf. Hence we seed so we + // can elect ourselves when the + // connection to gnatsd is not available. + // + // add myLoc to pc.from as a part of the reset: + myLoc := pc.mship.getMyLocWithZeroLease() + pc.insert(myLoc) + + pc.mship.trace("in getSetAndClear, here are the contents of mem.DedupTree: '%s'", mem.DedupTree) + + // return the old member set + return mem.DedupTree.Len(), mem +} + +// leaderLeaseCheck evaluates the lease as of now, +// and returns the leader or best candiate. Returns +// expired == true if any prior leader lease has +// lapsed. In this case we return the best new +// leader with its IsLeader bit set and its +// LeaseExpires set to now + lease. +// +// If expired == false then the we return +// the current leader in lead. +// +// PRE: there are only 0 or 1 leaders in m.DedupTree +// who have a non-zero LeaseExpires field. +// +// If m.DedupTree is empty, we return (true, nil). +// +// This method is where the actual "election" +// happens. See the AgentLocLessThan() +// function below for exactly how +// we rank candidates. +// +func (mems *members) leaderLeaseCheck( + now time.Time, + leaseLen time.Duration, + prevLead AgentLoc, + maxClockSkew time.Duration, + m *Membership, + +) (expired bool, lead AgentLoc) { + + if prevLead.LeaseExpires.Add(maxClockSkew).After(now) { + // honor the leases until they expire + m.trace("leaderLeaseCheck: honoring outstanding lease") + return false, prevLead + } + + if mems.DedupTree.Len() == 0 { + m.trace("leaderLeaseCheck: m.DedupTree.Len is 0") + return false, prevLead + } + + // INVAR: any lease has expired. + expired = true + lead = mems.DedupTree.minrank() + lead.IsLeader = true + lead.LeaseExpires = now.Add(leaseLen).UTC() + + return +} + +type byRankThenID struct { + s []*AgentLoc + now time.Time +} + +func (p byRankThenID) Len() int { return len(p.s) } +func (p byRankThenID) Swap(i, j int) { p.s[i], p.s[j] = p.s[j], p.s[i] } + +// Less must be stable and computable locally yet +// applicable globally: it is how we choose a leader +// in a stable fashion. +func (p byRankThenID) Less(i, j int) bool { + return AgentLocLessThan(p.s[i], p.s[j]) +} + +// AgentLocLessThan returns true iff i < j, in +// terms of leader preference where lowest is +// more electable/preferred as leader. +func AgentLocLessThan(i, j *AgentLoc) bool { + + // recognize empty AgentLoc and sort them high, not low. + iempt := i.ID == "" + jempt := j.ID == "" + if iempt && jempt { + return false // "" == "" + } + if jempt { + return true // "123" < "" + } + if iempt { + return false // "" > "123" + } + + if i.Rank != j.Rank { + return i.Rank < j.Rank + } + if i.ID != j.ID { + return lessThanString(i.ID, j.ID) + } + if i.Host != j.Host { + return lessThanString(i.Host, j.Host) + } + if i.Port != j.Port { + return i.Port < j.Port + } + if i.Pid != j.Pid { + return i.Pid < j.Pid + } + itm := i.LeaseExpires.UnixNano() + jtm := j.LeaseExpires.UnixNano() + return itm > jtm // want the later expiration to have priority +} + +// return i < j where empty strings are big not small. +func lessThanString(i, j string) bool { + iempt := i == "" + jempt := j == "" + if iempt || jempt { + if jempt { + return true // "123" < "" + } + return false + } + return i < j +} + +func (m *Membership) setupNatsClient() error { + pc := m.pc + + discon := func(nc *nats.Conn) { + select { + case m.needReconnect <- true: + case <-m.halt.ReqStop.Chan: + return + } + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + // We don't want to get connected to + // some different server in the pool, + // so any reconnect, if needed, will + // need to be handled manually by us by + // attempting to contact the + // exact same address as we are + // configured with; see the m.needReconnect + // channel. + // Otherwise we are monitoring + // the health of the wrong server. + // + optrecon := nats.NoReconnect() + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsURL, opts...) + if err != nil { + msg := fmt.Errorf("Can't connect to "+ + "nats on url '%s': %v", + m.Cfg.NatsURL, + err) + m.Cfg.Log.Errorf(msg.Error()) + return msg + } + m.nc = nc + loc, err := m.getNatsServerLocation() + if err != nil { + return err + } + m.setLoc(loc) + m.Cfg.Log.Debugf("health-agent: HELLOWORLD: "+ + "I am '%s' at '%v:%v'. "+ + "rank %v", + m.myLoc.ID, + m.myLoc.Host, + m.myLoc.Port, + m.myLoc.Rank) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + + nc.Subscribe(m.subjAllReply, func(msg *nats.Msg) { + if m.deaf() { + return + } + pc.receivePong(msg) + }) + + // allcall says: "who is out there? Are you a lead?" + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + m.trace("ALLCALL RECEIVED. msg:'%s'", string(msg.Data)) + if m.deaf() { + return + } + + // sanity check that we haven't moved. + loc, err := m.getNatsServerLocation() + if err != nil { + return // try again next time. + } + + // did we accidentally change + // server locacations? + // Yikes, we don't want to do that! + // We are supposed to be monitoring + // just our own server. + if m.locDifferent(loc) { + panic(fmt.Sprintf("\n very bad! health-agent "+ + "changed locations! "+ + "first: '%s',\n\nvs\n now:'%s'\n", + &m.myLoc, + loc)) + } + // Done with sanity check. + // INVAR: we haven't moved, and + // loc matches m.myLoc. + + locWithLease := m.getMyLocWithAnyLease() + + hp, err := json.Marshal(&locWithLease) + panicOn(err) + if !m.deaf() { + m.trace("REPLYING TO ALLCALL on '%s' with my details: '%s'", msg.Reply, &locWithLease) + pong(nc, msg.Reply, hp) + } + }) + + // reporting + nc.Subscribe(m.subjMemberLost, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: "+ + "Received on [%s]: '%s'", + msg.Subject, + string(msg.Data)) + }) + + // reporting + nc.Subscribe(m.subjMemberAdded, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: Received on [%s]: '%s'", + msg.Subject, string(msg.Data)) + }) + + // reporting + nc.Subscribe(m.subjMembership, func(msg *nats.Msg) { + if m.deaf() { + return + } + m.Cfg.Log.Tracef("health-agent: "+ + "Received on [%s]: '%s'", + msg.Subject, + string(msg.Data)) + }) + + return nil +} + +func (m *Membership) locDifferent(b *nats.ServerLoc) bool { + m.mu.Lock() + defer m.mu.Unlock() + if b.ID != m.myLoc.ID { + return true + } + if b.Rank != m.myLoc.Rank { + return true + } + if b.Host != m.myLoc.Host { + return true + } + if b.Port != m.myLoc.Port { + return true + } + return false +} + +func (m *Membership) setLoc(b *nats.ServerLoc) { + m.mu.Lock() + m.myLoc.ID = b.ID + m.myLoc.Rank = b.Rank + m.myLoc.Host = b.Host + m.myLoc.Port = b.Port + m.myLoc.Pid = os.Getpid() + m.mu.Unlock() + m.elec.setMyLoc(&m.myLoc) +} + +func (m *Membership) getNatsServerLocation() (*nats.ServerLoc, error) { + loc, err := m.nc.ServerLocation() + if err != nil { + return nil, err + } + // fill in the rank because server + // doesn't have the rank correct under + // various test scenarios where we + // spin up an embedded gnatsd. + // + // This is still correct in non-test, + // since the health-agent will + // have read from the command line + // -rank options and then + // configured Cfg.MyRank when running + // embedded as an internal client. + loc.Rank = m.Cfg.MyRank + return loc, nil +} diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 0000000000..a55230ccaf --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,623 @@ +package health + +import ( + "encoding/json" + "fmt" + "log" + "math/rand" + "net" + "testing" + "time" + + cv "github.com/glycerine/goconvey/convey" + "github.com/nats-io/gnatsd/logger" + "github.com/nats-io/gnatsd/server" + gnatsd "github.com/nats-io/gnatsd/test" + "github.com/nats-io/go-nats" +) + +const testPort = 8198 +const DefaultTimeout = 2 * time.Second + +// warning: don't assume testPort is correct now... +// we use a dynamic port to avoid test conflicts. +var cliOpts = nats.Options{ + Url: fmt.Sprintf("nats://localhost:%d", testPort), + AllowReconnect: true, + MaxReconnect: 10, + ReconnectWait: 10 * time.Millisecond, + Timeout: DefaultTimeout, +} + +// DefaultTestOptions are default options for the unit tests. +// Warning: don't assume testPort is correct now... +// we use a dynamic port to avoid test conflicts. +var serverOpts = server.Options{ + Host: "localhost", + Port: testPort, + NoLog: true, + NoSigs: true, + MaxControlLine: 256, +} + +func Test101StressTestManyClients(t *testing.T) { + + cv.Convey("when stress testing with 50 clients coming up and shutting down, we should survive and prosper", t, func() { + + tport, ls := getAvailPort() + ls.Close() + s := RunServerOnPort(tport) + defer s.Shutdown() + + n := 50 + var ms []*Membership + for i := 0; i < n; i++ { + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsURL: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: i, // ranks 0..n-1 + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + // the test here is basically that we didn't crash + // or hang. So if we got here, success. + cv.So(true, cv.ShouldBeTrue) + }) +} + +func Test102ConvergenceToOneLowRankLeaderAndLiveness(t *testing.T) { + + cv.Convey("Given a cluster of one server with rank 0, no matter what other servers arrive thinking they are the leader (say, after a partition is healed), as long as those other nodes have rank 1, our rank 0 process will persist in leading and all other arrivals will give up their leadership claims (after their leases expire). In addition to safety, this is also a liveness check: After a single lease term + clockskew, a leader will have been chosen.", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + n := 10 + tot := 50 + pause := make([]int, n) + for i := 0; i < n; i++ { + pause[i] = 20 + rand.Intn(50) + tot += pause[i] + } + + var ms []*Membership + for i := 0; i < n; i++ { + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 800 * time.Millisecond, + BeatDur: 200 * time.Millisecond, + NatsURL: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: i, // ranks 0,1,2,3,... + deaf: deafTrue, // don't ping or pong + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + if i == 0 { + cfg.deaf = deafFalse + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + _ = aLogger + // to follow the prints, uncomment: + cfg.Log = aLogger + } + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer func() { + m.Stop() + }() + } + + // let them all get past init phase. + time.Sleep(5 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // verify liveness, a leader exists. + //p("verifying everyone thinks there is a leader:") + for i := 0; i < n; i++ { + h := ms[i].elec.copyLeadHistory() + //fmt.Printf("verifying %v thinks there is a leader. avail = %v\n", i, h.Avail()) + cv.So(h.Avail(), cv.ShouldBeGreaterThan, 0) + } + + // bring in jobs after their random pause time + for i := 0; i < n; i++ { + dur := time.Duration(pause[i]) * time.Millisecond + //p("%v on i = %v/dur=%v ", time.Now().UTC(), i, dur) + time.Sleep(dur) + ms[i].unDeaf() + } + + // let the laggards get in a few more cycles, so + // we get enough history to evaluate. + time.Sleep(20 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // check that the history from the rank 0 + // always shows rank 0 as lead. + h := ms[0].elec.copyLeadHistory() + av := h.Avail() + //p("ms[0].myLoc.Port = %v", ms[0].myLoc.Port) + cv.So(ms[0].myLoc.ID, cv.ShouldNotEqual, "") + cv.So(av, cv.ShouldBeGreaterThan, 9) + //p("av: available history len = %v", av) + + // prints first: + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + _ = sloc + //fmt.Printf("history print i = %v. sloc.ID=%v / sloc.Rank=%v, port=%v\n", i, sloc.ID, sloc.Rank, sloc.Port) + } + // checks second: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //fmt.Printf("history check ID at i = %v. sloc.ID=%v\n", i, sloc.ID) + cv.So(sloc.ID, cv.ShouldEqual, ms[0].myLoc.ID) + // ports will be the only thing different when + // running off of the one gnatsd that has the + // same rank and ID for all clients. + cv.So(sloc.Port, cv.ShouldEqual, ms[0].myLoc.Port) + } + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //p("history check Rank at i = %v. sloc.Rank=%v", i, sloc.Rank) + cv.So(sloc.Rank, cv.ShouldEqual, 0) + } + + // check that the other ranks have + // histories that converge + // on the rank 0 process quickly + for j := 1; j < n; j++ { + h := ms[j].elec.copyLeadHistory() + av := h.Avail() + //p("ms[j=%v].myLoc.Port = %v has history av = %v", j, ms[j].myLoc.Port, av) + cv.So(ms[j].myLoc.ID, cv.ShouldNotEqual, "") + cv.So(av, cv.ShouldBeGreaterThan, 12) + //p("av: available history len = %v", av) + + // prints first: + + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + _ = sloc + //fmt.Printf("history print i = %v. sloc.ID=%v / sloc.Rank=%v, port=%v\n", i, sloc.ID, sloc.Rank, sloc.Port) + } + // checks second: + + // after the preample of heartbeats, everybody + // should have chosen the rank 0 leader. + // start scanning from 10,... + for i := 10; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //fmt.Printf("j=%v, history check ID at i = %v. sloc.Port=%v/rank %v vs. ms[0].myLoc.Port=%v/rank %v\n", j, i, sloc.Port, sloc.Rank, ms[0].myLoc.Port, ms[0].myLoc.Rank) + + // ports will be the only thing different when + // running off of the one gnatsd that has the + // same rank and ID for all clients. + cv.So(sloc.Port, cv.ShouldEqual, ms[0].myLoc.Port) + } + + for i := 10; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //p("j=%v history check Rank at i = %v. sloc.Rank=%v", j, i, sloc.Rank) + cv.So(sloc.Rank, cv.ShouldEqual, 0) + } + } + }) +} + +func Test103TiedRanksUseIdAndDoNotAlternate(t *testing.T) { + + cv.Convey("Given a cluster of two servers with rank 0 and different IDs, one should win after the initial period, and they should not alternate leadership as they carry forward.", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + n := 2 + + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + var ms []*Membership + for i := 0; i < n; i++ { + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 400 * time.Millisecond, + BeatDur: 100 * time.Millisecond, + NatsURL: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: 0, + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + cfg.Log = aLogger + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + ms = append(ms, m) + defer m.Stop() + } + + // let them get past init phase. + time.Sleep(2 * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // verify liveness, a leader exists. + p("at %v, verifying everyone thinks there is a leader:", time.Now().UTC()) + for i := 0; i < n; i++ { + h := ms[i].elec.copyLeadHistory() + //fmt.Printf("verifying %v thinks there is a leader, avail history len= %v\n", i, h.Avail()) + cv.So(h.Avail(), cv.ShouldBeGreaterThan, 0) + } + + rounds := 10 + // sleep for rounds lease cycles - check for alternation + time.Sleep(time.Duration(rounds+1) * (ms[0].Cfg.LeaseTime + ms[0].Cfg.MaxClockSkew)) + + // who should be winner after lease expiration... + zeroWins := AgentLocLessThan(&ms[0].myLoc, &ms[1].myLoc) + p("zeroWins: %v, [0].myLoc=%v [1].myLoc=%v", zeroWins, &ms[0].myLoc, &ms[1].myLoc) + winner := &ms[1].myLoc + if zeroWins { + winner = &ms[0].myLoc + } + + for j := 0; j < n; j++ { + + // check that the history doesn't alternate + // between ports / servers. + h := ms[j].elec.copyLeadHistory() + av := h.Avail() + p("ms[j=%v].myLoc.ID = %v", j, ms[j].myLoc.ID) + p("av: j=%v, available history len = %v", j, av) + cv.So(av, cv.ShouldBeGreaterThan, rounds) + + // prints first: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + _ = sloc + //fmt.Printf("server j=%v, history print i = %v. / sloc.Port=%v, winner.Port=%v\n", j, i, sloc.Port, winner.Port) + } + } + for j := 0; j < n; j++ { + + // check that the history doesn't alternate + // between ports / servers. + h := ms[j].elec.copyLeadHistory() + av := h.Avail() + + // checks second: + for i := 0; i < av; i++ { + sloc := h.A[h.Kth(i)].(*AgentLoc) + //fmt.Printf("server j=%v, history check ID at i = %v. sloc.Port=%v, winner.Port=%v\n", j, i, sloc.Port, winner.Port) + cv.So(sloc.Port, cv.ShouldEqual, winner.Port) + } + + } + }) +} + +func RunServerOnPort(port int) *server.Server { + opts := serverOpts + opts.Port = port + return gnatsd.RunServer(&opts) +} + +func StartClusterOnPort(port, clusterPort int) *server.Server { + opts := serverOpts + opts.Port = port + + opts.Cluster = server.ClusterOpts{ + Port: clusterPort, + Host: opts.Host, + } + return gnatsd.RunServer(&opts) +} +func AddToClusterOnPort( + port, clusterPort int, routesStr string, +) *server.Server { + + opts := serverOpts + opts.Port = port + opts.Routes = server.RoutesFromStr(routesStr) + opts.Cluster = server.ClusterOpts{ + Port: clusterPort, + Host: opts.Host, + } + return gnatsd.RunServer(&opts) +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func Test104ReceiveOwnSends(t *testing.T) { + + cv.Convey("If we transmit on a topic we are subscribed to, then we should receive our own send.", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + cli, srv, err := NewInternalClientPair() + panicOn(err) + s.InternalCliRegisterCallback(srv) + + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsURL: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: 0, + } + + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + cfg.Log = aLogger + + m := NewMembership(cfg) + + // like m.Start() but manually: + m.Cfg.SetDefaults() + + // unroll setupNatsClient... + + discon := func(nc *nats.Conn) { + m.Cfg.Log.Tracef("health-agent: Disconnected from nats!") + } + optdis := nats.DisconnectHandler(discon) + norand := nats.DontRandomize() + + recon := func(nc *nats.Conn) { + loc, err := nc.ServerLocation() + panicOn(err) + m.Cfg.Log.Tracef("health-agent: Reconnect to nats!: loc = '%s'", loc) + } + optrecon := nats.ReconnectHandler(recon) + + opts := []nats.Option{optdis, optrecon, norand} + if m.Cfg.CliConn != nil { + opts = append(opts, nats.Dialer(&m.Cfg)) + } + + nc, err := nats.Connect(m.Cfg.NatsURL, opts...) + panicOn(err) + + loc, err := nc.ServerLocation() + panicOn(err) + loc.Rank = m.Cfg.MyRank + m.setLoc(loc) + + m.subjAllCall = sysMemberPrefix + "allcall" + m.subjAllReply = sysMemberPrefix + "allreply" + m.subjMemberLost = sysMemberPrefix + "lost" + m.subjMemberAdded = sysMemberPrefix + "added" + m.subjMembership = sysMemberPrefix + "list" + m.nc = nc + + gotAllCall := make(chan bool) + repliedInAllCall := make(chan bool) + gotAllRep := make(chan bool) + nc.Subscribe(m.subjAllReply, func(msg *nats.Msg) { + p("I received on subjAllReply: msg='%#v'", string(msg.Data)) + close(gotAllRep) + }) + + nc.Subscribe(m.subjAllCall, func(msg *nats.Msg) { + close(gotAllCall) + p("test104, port %v, at 999 allcall received '%s'", m.myLoc.Port, string(msg.Data)) + loc, err := nc.ServerLocation() + panicOn(err) + hp, err := json.Marshal(loc) + panicOn(err) + + p("test104, port %v, at 222 in allcall handler: replying to allcall with our loc: '%s'", m.myLoc.Port, loc) + pong(nc, msg.Reply, hp) + close(repliedInAllCall) + }) + + now := time.Now().UTC() + // send on subjAllCall + sl := AgentLoc{ + ID: "abc", + Host: "here", + Port: 99, + Rank: -100, + LeaseExpires: now.Add(time.Hour), + } + won, _ := m.elec.setLeader(sl, now) + if !won { + panic("must be able to set leader") + } + m.allcall() + <-gotAllCall + <-repliedInAllCall + // expect to have gotAllRep closed. + <-gotAllRep + }) +} + +func Test105OnlyConnectToOriginalGnatsd(t *testing.T) { + + cv.Convey("If a heath-agent is disconnected from gnatsd, it should only ever reconnect to that same gnatsd--the server whose health it is responsible for monitoring.", t, func() { + tport, ls := getAvailPort() + ls.Close() + tport2, ls2 := getAvailPort() + ls2.Close() + + cluster1Port, lsn1 := getAvailPort() + cluster2Port, lsn2 := getAvailPort() + // now that we've bound different available ports, + // we can close the listeners to free these up. + lsn1.Close() + lsn2.Close() + routesString := fmt.Sprintf("nats://127.0.0.1:%v", cluster1Port) + s := StartClusterOnPort(tport, cluster1Port) + s2 := AddToClusterOnPort(tport2, cluster2Port, routesString) + defer s2.Shutdown() + + cli, srv, err := NewInternalClientPair() + panicOn(err) + s.InternalCliRegisterCallback(srv) + cfg := &MembershipCfg{ + CliConn: cli, + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 30 * time.Millisecond, + BeatDur: 10 * time.Millisecond, + NatsURL: fmt.Sprintf("nats://localhost:%v", tport), + } + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + cfg.Log = aLogger + + m := NewMembership(cfg) + err = m.Start() + panicOn(err) + defer m.Stop() + + _, err = m.nc.ServerLocation() + panicOn(err) + + time.Sleep(100 * time.Millisecond) + s.Shutdown() + // allow time for any unwanted + // auto-reconnect to be attempted; + // we are testing that no reconnect + // happens. + time.Sleep(300 * time.Millisecond) + + _, err = m.nc.ServerLocation() + p("attempting to contact failed server; err is '%v'", err) + // this *should* *have* *failed* + cv.So(err, cv.ShouldNotBeNil) + cv.So(err.Error(), cv.ShouldEqual, "nats: invalid connection") + select { + case <-m.halt.Done.Chan: + p("good: Membership shut itself down.") + } + }) +} + +func getAvailPort() (int, net.Listener) { + l, _ := net.Listen("tcp", ":0") + r := l.Addr() + return r.(*net.TCPAddr).Port, l +} + +func Test106AgentLocLessThan(t *testing.T) { + + cv.Convey("To properly handle empty AgentLoc, AgentLocLessThan should sort sloc with and ID as smaller (more preferred) compared to an sloc (AgentLoc) with an empty string ID, even if their ranks are different.", t, func() { + var s1, s2 AgentLoc + s1.ID = "a" + s1.Rank = 1 + cv.So(AgentLocLessThan(&s1, &s2), cv.ShouldBeTrue) + }) +} + +func Test107OneNodeAloneWaitsLeaseTermBeforeRenewal(t *testing.T) { + + cv.Convey("Given a cluster of one server, it should elect itself leader and then wait a full lease term before considering who to elect again", t, func() { + + tport, ls := getAvailPort() + ls.Close() + + s := RunServerOnPort(tport) + defer func() { + p("starting gnatsd shutdown...") + s.Shutdown() + }() + + cfg := &MembershipCfg{ + MaxClockSkew: 1 * time.Nanosecond, + LeaseTime: 3000 * time.Millisecond, + BeatDur: 1000 * time.Millisecond, + NatsURL: fmt.Sprintf("nats://localhost:%v", tport), + MyRank: 0, + historyCount: 10000, + } + + cli, srv, err := NewInternalClientPair() + panicOn(err) + + s.InternalCliRegisterCallback(srv) + cfg.CliConn = cli + + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + _ = aLogger + // to follow the prints, uncomment: + cfg.Log = aLogger + + m := NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + defer func() { + m.Stop() + }() + + // let it get past init phase. + time.Sleep(3 * (m.Cfg.LeaseTime + m.Cfg.MaxClockSkew)) + + h := m.elec.copyLeadHistory() + av := h.Avail() + fmt.Printf("verifying at most 3 leader changes: %v\n", av) + cv.So(av, cv.ShouldBeLessThan, 4) + + }) +} diff --git a/health/healthcmd/main.go b/health/healthcmd/main.go new file mode 100644 index 0000000000..8f60470445 --- /dev/null +++ b/health/healthcmd/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strconv" + + "github.com/nats-io/gnatsd/health" + "github.com/nats-io/gnatsd/logger" +) + +// healthcmd runs an allcall election from a standalone +// command line nats client. It exercises the same +// gnatsd/health library code that runs as an internal client +// in process with gnatsd. + +func usage() { + log.Fatalf("use: healthcmd {host}:port {rank}\n") +} + +func main() { + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + rank := 0 + var err error + if len(args) >= 2 { + rank, err = strconv.Atoi(args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "2nd arg should be our numeric rank") + } + } + + const colors = false + const micros, pid = true, true + const trace = false + const debug = true + aLogger := logger.NewStdLogger(micros, debug, trace, colors, pid, log.LUTC) + + cfg := &health.MembershipCfg{ + NatsURL: "nats://" + args[0], // "nats://127.0.0.1:4222" + MyRank: rank, + Log: aLogger, + } + m := health.NewMembership(cfg) + err = m.Start() + if err != nil { + panic(err) + } + + select {} +} diff --git a/health/icc.go b/health/icc.go new file mode 100644 index 0000000000..d338331cd6 --- /dev/null +++ b/health/icc.go @@ -0,0 +1,64 @@ +package health + +import ( + "net" +) + +// Icc allows the server to +// detect a net.Conn as +// an internal client connection +// by checking if it implements the +// LocalInternalClient interface. +// +type Icc struct { + *net.TCPConn +} + +// IsInternal satisfy LocalInternalClient interface +func (c *Icc) IsInternal() {} + +// NewInternalClientPair constructs a client/server +// pair that wrap tcp endpoints in Icc to let +// the server recognized them as internal. +// +func NewInternalClientPair() (cli, srv *Icc, err error) { + + lsn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return + } + + srvDone := make(chan struct{}) + go func() { + s, err2 := lsn.Accept() + if err2 == nil { + srv = &Icc{TCPConn: s.(*net.TCPConn)} + } else { + err = err2 + } + lsn.Close() + close(srvDone) + }() + + addr := lsn.Addr() + c, err3 := net.Dial(addr.Network(), addr.String()) + <-srvDone + + if err3 != nil { + err = err3 + if srv != nil { + srv.Close() + srv = nil + } + return + } + cli = &Icc{TCPConn: c.(*net.TCPConn)} + // INVAR: cli ok. + if err != nil { + cli.Close() + cli = nil + return + } + // INVAR: srv ok. + return +} diff --git a/health/icc_test.go b/health/icc_test.go new file mode 100644 index 0000000000..4ffae665c4 --- /dev/null +++ b/health/icc_test.go @@ -0,0 +1,48 @@ +package health + +import ( + "net" + "testing" + + "github.com/nats-io/gnatsd/server" +) + +func TestIccTypeSwitchWorks(t *testing.T) { + var nc net.Conn = &Icc{} + _, isIcc := nc.(server.LocalInternalClient) + if !isIcc { + t.Fatalf("nc was not LocalInternalClient, as it should be!") + } +} + +func TestIccAsNetConn(t *testing.T) { + + // write to a, read from b + a, b, err := NewInternalClientPair() + if err != nil { + panic(err) + } + + msg := "hello-world" + + n, err := a.Write([]byte(msg)) + if err != nil { + t.Errorf("err = %v", err) + } + if n != len(msg) { + t.Errorf("Write truncated at %v < %v", n, len(msg)) + } + + readbuf := make([]byte, len(msg)) + m, err := b.Read(readbuf) + if err != nil { + t.Errorf("err = %v", err) + } + if m != n { + t.Errorf("Read truncated at %v !=n %v", m, n) + } + back := string(readbuf[:m]) + if back != msg { + t.Errorf("msg corrupted, wrote '%v', read '%v'", msg, back) + } +} diff --git a/health/idem.go b/health/idem.go new file mode 100644 index 0000000000..a2c739f53a --- /dev/null +++ b/health/idem.go @@ -0,0 +1,106 @@ +package health + +import ( + "fmt" + "sync" +) + +// Copyright (c) 2017 Jason E. Aten, Ph.D. +// https://github.com/glycerine/idem +// MIT license. + +// idemCloseChan can have Close() called on it +// multiple times, and it will only close +// Chan once. +type idemCloseChan struct { + Chan chan bool + closed bool + mut sync.Mutex +} + +// Reinit re-allocates the Chan, assinging +// a new channel and reseting the state +// as if brand new. +func (c *idemCloseChan) Reinit() { + c.mut.Lock() + defer c.mut.Unlock() + c.Chan = make(chan bool) + c.closed = false +} + +// newIdemCloseChan makes a new idemCloseChan. +func newIdemCloseChan() *idemCloseChan { + return &idemCloseChan{ + Chan: make(chan bool), + } +} + +var errAlreadyClosed = fmt.Errorf("Chan already closed") + +// Close returns errAlreadyClosed if it has been +// called before. It never closes IdemClose.Chan more +// than once, so it is safe to ignore the returned +// error value. Close() is safe for concurrent access by multiple +// goroutines. Close returns nil after the first time +// it is called. +func (c *idemCloseChan) Close() error { + c.mut.Lock() + defer c.mut.Unlock() + if !c.closed { + close(c.Chan) + c.closed = true + return nil + } + return errAlreadyClosed +} + +// IsClosed tells you if Chan is already closed or not. +func (c *idemCloseChan) IsClosed() bool { + c.mut.Lock() + defer c.mut.Unlock() + return c.closed +} + +// halter helps shutdown a goroutine +type halter struct { + // The owning goutine should call Done.Close() as its last + // actual once it has received the ReqStop() signal. + Done idemCloseChan + + // Other goroutines call ReqStop.Close() in order + // to request that the owning goroutine stop immediately. + // The owning goroutine should select on ReqStop.Chan + // in order to recognize shutdown requests. + ReqStop idemCloseChan +} + +func newHalter() *halter { + return &halter{ + Done: *newIdemCloseChan(), + ReqStop: *newIdemCloseChan(), + } +} + +// RequestStop closes the h.ReqStop channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) RequestStop() { + h.ReqStop.Close() +} + +// MarkDone closes the h.Done channel +// if it has not already done so. Safe for +// multiple goroutine access. +func (h *halter) MarkDone() { + h.Done.Close() +} + +// IsStopRequested returns true iff h.ReqStop has been Closed(). +func (h *halter) IsStopRequested() bool { + return h.ReqStop.IsClosed() +} + +// IsDone returns true iff h.Done has been Closed(). +func (h *halter) IsDone() bool { + return h.Done.IsClosed() +} diff --git a/health/rbuf.go b/health/rbuf.go new file mode 100644 index 0000000000..7330aca246 --- /dev/null +++ b/health/rbuf.go @@ -0,0 +1,139 @@ +package health + +// https://github.com/glycerine/rbuf +// copyright (c) 2014, Jason E. Aten +// license: MIT + +import "io" + +// ringBuf: +// +// a fixed-size circular ring buffer. Just what it says. +// +type ringBuf struct { + A []interface{} + N int // MaxViewInBytes, the size of A + Beg int // start of data in A + Readable int // number of bytes available to read in A +} + +// newRingBuf constructs a new ringBuf. +func newRingBuf(maxViewInBytes int) *ringBuf { + n := maxViewInBytes + r := &ringBuf{ + N: n, + Beg: 0, + Readable: 0, + } + r.A = make([]interface{}, n, n) + + return r +} + +// clone makes a copy of b. +func (b *ringBuf) clone() *ringBuf { + a := &ringBuf{} + for i := range b.A { + a.A = append(a.A, b.A[i]) + } + a.N = b.N + a.Beg = b.Beg + a.Readable = b.Readable + return a +} + +// Reset quickly forgets any data stored in the ring buffer. The +// data is still there, but the ring buffer will ignore it and +// overwrite those buffers as new data comes in. +func (b *ringBuf) Reset() { + b.Beg = 0 + b.Readable = 0 +} + +// Advance(): non-standard, but better than Next(), +// because we don't have to unwrap our buffer and pay the cpu time +// for the copy that unwrapping may need. +// Useful in conjuction/after ReadWithoutAdvance() above. +func (b *ringBuf) Advance(n int) { + if n <= 0 { + return + } + if n > b.Readable { + n = b.Readable + } + b.Readable -= n + b.Beg = (b.Beg + n) % b.N +} + +func intMin(a, b int) int { + if a < b { + return a + } + return b +} + +func (b *ringBuf) Avail() int { + return b.Readable +} + +// returns the earliest index, or -1 if +// the ring is empty +func (b *ringBuf) First() int { + if b.Readable == 0 { + return -1 + } + return b.Beg +} + +// returns the index of the last element, +// or -1 if the ring is empty. +func (b *ringBuf) Last() int { + if b.Readable == 0 { + return -1 + } + + last := b.Beg + b.Readable - 1 + if last < b.N { + // we fit without wrapping + return last + } + + return last % b.N +} + +// Kth presents the contents of the +// ring as a strictly linear sequence, +// so the user doesn't need to think +// about modular arithmetic. Here k indexes from +// [0, f.Readable-1], assuming f.Avail() +// is greater than 0. Kth() returns an +// actual index where the logical k-th +// element, starting from f.Beg, resides. +// f.Beg itself lives at k = 0. If k is +// out of bounds, or the ring is empty, +// -1 is returned. +func (b *ringBuf) Kth(k int) int { + if b.Readable == 0 || k < 0 || k >= b.Readable { + return -1 + } + return (b.Beg + k) % b.N +} + +// +// Append returns an error if there is no more +// space in the ring. Otherwise it returns nil +// and writes p into the ring in last position. +// +func (b *ringBuf) Append(p interface{}) error { + writeCapacity := b.N - b.Readable + if writeCapacity <= 0 { + // we are all full up already. + return io.ErrShortWrite + } + + writeStart := (b.Beg + b.Readable) % b.N + b.A[writeStart] = p + + b.Readable++ + return nil +} diff --git a/health/setutil.go b/health/setutil.go new file mode 100644 index 0000000000..a3ca10e289 --- /dev/null +++ b/health/setutil.go @@ -0,0 +1,63 @@ +package health + +import ( + "fmt" +) + +// utilities and sets stuff + +// p is a shortcut for a call to fmt.Printf that implicitly starts +// and ends its message with a newline. +func p(format string, stuff ...interface{}) { + fmt.Printf("\n "+format+"\n", stuff...) +} + +func panicOn(err error) { + if err != nil { + panic(err) + } +} + +type members struct { + DedupTree *ranktree `json:"Mem"` +} + +func (m *members) insert(s AgentLoc) { + m.DedupTree.insert(s) +} +func (m *members) clear() { + m.DedupTree = newRanktree() +} + +func (m *members) minrank() AgentLoc { + return m.DedupTree.minrank() +} + +func (m *members) clone() *members { + cp := newMembers() + if m.DedupTree == nil { + return cp + } + cp.DedupTree = m.DedupTree.clone() + return cp +} + +func (m *members) setEmpty() bool { + return m.DedupTree.Len() == 0 +} + +func (m *members) String() string { + return string(m.mustJSONBytes()) +} + +func newMembers() *members { + return &members{ + DedupTree: newRanktree(), + } +} + +func (m *members) mustJSONBytes() []byte { + by, err := m.DedupTree.MarshalJSON() + panicOn(err) + return by +} diff --git a/logger/log.go b/logger/log.go index 485ae12e7e..e9c63696b6 100644 --- a/logger/log.go +++ b/logger/log.go @@ -22,8 +22,7 @@ type Logger struct { } // NewStdLogger creates a logger with output directed to Stderr -func NewStdLogger(time, debug, trace, colors, pid bool) *Logger { - flags := 0 +func NewStdLogger(time, debug, trace, colors, pid bool, flags int) *Logger { if time { flags = log.LstdFlags | log.Lmicroseconds } @@ -49,14 +48,13 @@ func NewStdLogger(time, debug, trace, colors, pid bool) *Logger { } // NewFileLogger creates a logger with output directed to a file -func NewFileLogger(filename string, time, debug, trace, pid bool) *Logger { +func NewFileLogger(filename string, time, debug, trace, pid bool, flags int) *Logger { fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE f, err := os.OpenFile(filename, fileflags, 0660) if err != nil { log.Fatalf("error opening file: %v", err) } - flags := 0 if time { flags = log.LstdFlags | log.Lmicroseconds } diff --git a/logger/log_test.go b/logger/log_test.go index 0bb9dfe4ad..afd3085ca7 100644 --- a/logger/log_test.go +++ b/logger/log_test.go @@ -13,7 +13,7 @@ import ( ) func TestStdLogger(t *testing.T) { - logger := NewStdLogger(false, false, false, false, false) + logger := NewStdLogger(false, false, false, false, false, 0) flags := logger.logger.Flags() if flags != 0 { @@ -30,7 +30,7 @@ func TestStdLogger(t *testing.T) { } func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { - logger := NewStdLogger(true, true, true, false, false) + logger := NewStdLogger(true, true, true, false, false, 0) flags := logger.logger.Flags() if flags != log.LstdFlags|log.Lmicroseconds { @@ -48,42 +48,42 @@ func TestStdLoggerWithDebugTraceAndTime(t *testing.T) { func TestStdLoggerNotice(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false, false, false) + logger := NewStdLogger(false, false, false, false, false, 0) logger.Noticef("foo") }, "[INF] foo\n") } func TestStdLoggerNoticeWithColor(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false, true, false) + logger := NewStdLogger(false, false, false, true, false, 0) logger.Noticef("foo") }, "[\x1b[32mINF\x1b[0m] foo\n") } func TestStdLoggerDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, true, false, false, false) + logger := NewStdLogger(false, true, false, false, false, 0) logger.Debugf("foo %s", "bar") }, "[DBG] foo bar\n") } func TestStdLoggerDebugWithOutDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false, false, false) + logger := NewStdLogger(false, false, false, false, false, 0) logger.Debugf("foo") }, "") } func TestStdLoggerTrace(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, true, false, false) + logger := NewStdLogger(false, false, true, false, false, 0) logger.Tracef("foo") }, "[TRC] foo\n") } func TestStdLoggerTraceWithOutDebug(t *testing.T) { expectOutput(t, func() { - logger := NewStdLogger(false, false, false, false, false) + logger := NewStdLogger(false, false, false, false, false, 0) logger.Tracef("foo") }, "") } @@ -101,7 +101,7 @@ func TestFileLogger(t *testing.T) { } file.Close() - logger := NewFileLogger(file.Name(), false, false, false, false) + logger := NewFileLogger(file.Name(), false, false, false, false, 0) logger.Noticef("foo") buf, err := ioutil.ReadFile(file.Name()) @@ -122,7 +122,7 @@ func TestFileLogger(t *testing.T) { } file.Close() - logger = NewFileLogger(file.Name(), true, true, true, true) + logger = NewFileLogger(file.Name(), true, true, true, true, 0) logger.Errorf("foo") buf, err = ioutil.ReadFile(file.Name()) diff --git a/main.go b/main.go index 425372390f..5fe761f596 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,10 @@ import ( "net" "net/url" "os" + "time" "github.com/nats-io/gnatsd/auth" + "github.com/nats-io/gnatsd/health" "github.com/nats-io/gnatsd/logger" "github.com/nats-io/gnatsd/server" ) @@ -52,6 +54,11 @@ Cluster Options: --no_advertise Advertise known cluster IPs to clients --connect_retries For implicit routes, number of connect retries +Cluster Health Monitor: + --health Run the health monitoring/leader election agent + --lease Duration of leader leases. default: 12s + --beat Time between heartbeats (want 3-4/lease). default: 3s + --rank Smaller rank gives priority in leader election Common Options: -h, --help Show this message @@ -118,6 +125,10 @@ func main() { flag.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") + flag.BoolVar(&opts.HealthAgent, "health", false, "Run the health agent, elect a leader.") + flag.IntVar(&opts.HealthRank, "rank", 7, "leader election priority: the smaller the rank, the more preferred the server is as a leader. Negative ranks are allowed. Ties are broken by the random ServerId.") + flag.DurationVar(&opts.HealthLease, "lease", time.Second*12, "leader lease duration (should allow 3-4 beats within a lease)") + flag.DurationVar(&opts.HealthBeat, "beat", time.Second*3, "heart beat every this often (should get 3-4 beats within a lease)") flag.Usage = func() { fmt.Printf("%s\n", usageStr) @@ -175,6 +186,10 @@ func main() { server.PrintAndDie(err.Error()) } + if opts.HealthAgent { + opts.InternalCli = append(opts.InternalCli, health.NewAgent(&opts)) + } + // Create the server with appropriate options. s := server.New(&opts) @@ -220,7 +235,7 @@ func configureLogger(s *server.Server, opts *server.Options) { var log server.Logger if opts.LogFile != "" { - log = logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace, true) + log = logger.NewFileLogger(opts.LogFile, opts.Logtime, opts.Debug, opts.Trace, true, 0) } else if opts.RemoteSyslog != "" { log = logger.NewRemoteSysLogger(opts.RemoteSyslog, opts.Debug, opts.Trace) } else if opts.Syslog { @@ -233,7 +248,7 @@ func configureLogger(s *server.Server, opts *server.Options) { if err != nil || (stat.Mode()&os.ModeCharDevice) == 0 { colors = false } - log = logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, colors, true) + log = logger.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, colors, true, 0) } s.SetLogger(log, opts.Debug, opts.Trace) diff --git a/server/client.go b/server/client.go index 5a64a79c35..901749d0ef 100644 --- a/server/client.go +++ b/server/client.go @@ -20,6 +20,9 @@ const ( CLIENT = iota // ROUTER is another router in the cluster. ROUTER + // INTERNALCLI is an internal client. + // An example is the health-agent. + INTERNALCLI ) const ( @@ -216,6 +219,8 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) + case INTERNALCLI: + c.ncs = fmt.Sprintf("internal:0 - hid:%d", c.cid) } } @@ -407,6 +412,8 @@ func (c *client) processErr(errStr string) { c.Errorf("Client Error %s", errStr) case ROUTER: c.Errorf("Route Error %s", errStr) + case INTERNALCLI: + c.Errorf("InternalClient Error %s", errStr) } c.closeConnection() } diff --git a/server/icli.go b/server/icli.go new file mode 100644 index 0000000000..3b5374b543 --- /dev/null +++ b/server/icli.go @@ -0,0 +1,89 @@ +package server + +import ( + "net" + "sync" +) + +// LocalInternalClient is a trait interface. +// The net.Conn implementations of +// internal clients provided over +// the accept() callback (see Start below) +// should implement it to tell the server to ignore +// TLS and auth for internal clients. +// +type LocalInternalClient interface { + IsInternal() +} + +// iCli tracks the internal +// clients. +// +type iCli struct { + configured []InternalClient + mu sync.Mutex +} + +// InternalClient provides +// a plugin-like interface, +// supporting internal clients that live +// in-process with the Server +// on their own goroutines. +// +// An example of an internal client +// is the health monitoring client. +// In order to be effective, its lifetime +// must exactly match that of the +// server it monitors. +// +type InternalClient interface { + + // Name should return a readable + // human name for the InternalClient; + // it will be invoked as a part of + // startup/shutdown/error logging. + // + Name() string + + // Start should run the client on + // a background goroutine. + // + // The Server s will invoke Start() + // as a part of its own init and setup. + // + // The info and opts pointers will be + // viewable from an already locked Server + // instance, and so can be read without + // worrying about data races. + // + // Any returned error will be logged. + // This will not prevent the Server + // from calling Stop() on termination, + // and Stop() must be expected (and + // not block) no matter what. + // + // By returning an net.Conn the client + // provides the server with the + // equivalent of a Listen/Accept created + // net.Conn for communication with + // the client. + // + // The iclient should log using logger. + // + Start(info Info, + opts Options, + logger Logger) (net.Conn, error) + + // Stop should shutdown the goroutine(s) + // of the internal client. + // The Server will invoke Stop() as a part + // of its own shutdown process, *even* if + // Start() failed to start the background + // goroutine. Authors should take care + // to allow Stop() to be called even + // on a failed start. + // + // Stop is expected not to block for long. + // + Stop() +} diff --git a/server/log.go b/server/log.go index 26176d3eb8..5713bf21d9 100644 --- a/server/log.go +++ b/server/log.go @@ -72,7 +72,7 @@ func (s *Server) ReOpenLogFile() { Noticef("File log re-open ignored, not a file logger") } else { fileLog := logger.NewFileLogger(s.opts.LogFile, - s.opts.Logtime, s.opts.Debug, s.opts.Trace, true) + s.opts.Logtime, s.opts.Debug, s.opts.Trace, true, 0) s.SetLogger(fileLog, s.opts.Debug, s.opts.Trace) Noticef("File log re-opened") } diff --git a/server/monitor.go b/server/monitor.go index 6cce144b7f..dc473cc204 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -473,10 +473,11 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { v.TotalConnections = s.totalClients v.Routes = len(s.routes) v.Remotes = len(s.remotes) - v.InMsgs = s.inMsgs - v.InBytes = s.inBytes - v.OutMsgs = s.outMsgs - v.OutBytes = s.outBytes + // atomic loads avoid data races with client.go:298,951 + v.InMsgs = atomic.LoadInt64(&s.inMsgs) + v.InBytes = atomic.LoadInt64(&s.inBytes) + v.OutMsgs = atomic.LoadInt64(&s.outMsgs) + v.OutBytes = atomic.LoadInt64(&s.outBytes) v.SlowConsumers = s.slowConsumers v.Subscriptions = s.sl.Count() s.httpReqStats[VarzPath]++ diff --git a/server/opts.go b/server/opts.go index 8b97fa451a..3a7a0e8e2f 100644 --- a/server/opts.go +++ b/server/opts.go @@ -83,6 +83,12 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` WriteDeadline time.Duration `json:"-"` + + InternalCli []InternalClient `json:"-"` + HealthAgent bool `json:"health_agent"` + HealthRank int `json:"health_rank"` + HealthLease time.Duration `json:"health_lease"` + HealthBeat time.Duration `json:"health_beat"` } // Configuration file authorization section. @@ -225,6 +231,14 @@ func ProcessConfigFile(configFile string) (*Options, error) { opts.PingInterval = time.Duration(int(v.(int64))) * time.Second case "ping_max": opts.MaxPingsOut = int(v.(int64)) + case "health_rank": + opts.HealthRank = int(v.(int64)) + case "health_lease": + opts.HealthLease = v.(time.Duration) + case "health_beat": + opts.HealthBeat = v.(time.Duration) + case "health_agent": + opts.HealthAgent = v.(bool) case "tls": tlsm := v.(map[string]interface{}) tc, err := parseTLS(tlsm) diff --git a/server/server.go b/server/server.go index 7d616d2b0d..b8d4d808f2 100644 --- a/server/server.go +++ b/server/server.go @@ -39,6 +39,7 @@ type Info struct { MaxPayload int `json:"max_payload"` IP string `json:"ip,omitempty"` ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + ServerRank int `json:"server_rank"` // lowest rank wins leader election. // Used internally for quick look-ups. clientConnectURLs map[string]struct{} @@ -77,6 +78,7 @@ type Server struct { grRunning bool grWG sync.WaitGroup // to wait on various go routines cproto int64 // number of clients supporting async INFO + icli iCli // in-process internal clients } // Make sure all are 64bits for atomic use @@ -118,6 +120,7 @@ func New(opts *Options) *Server { trace: opts.Trace, done: make(chan bool, 1), start: time.Now(), + icli: iCli{configured: opts.InternalCli}, } s.mu.Lock() @@ -267,10 +270,53 @@ func (s *Server) Start() { s.StartProfiler() } + // Run the internal clients in + // s.icli.configured. + // + // Retain only those started + // successfully in s.icli.running. + // + s.icli.mu.Lock() + go func(info Info, opts Options) { + defer s.icli.mu.Unlock() + + // wait for server to be accepting clients + select { + case <-s.rcQuit: + return + case <-clientListenReady: + } + + n := len(s.icli.configured) + if n == 0 { + return + } + Debugf("Starting the %v internal client(s).", n) + for _, ic := range s.icli.configured { + srv, err := ic.Start(info, opts, log.logger) + if err == nil { + if srv != nil { + s.InternalCliRegisterCallback(srv) + } + Noticef("InternalClient ['%s'] started.", ic.Name()) + } else { + Errorf("InternalClient ['%s'] failed to Start(): %s", ic.Name(), err) + } + } + }(s.info, *s.opts) + // Wait for clients. s.AcceptLoop(clientListenReady) } +// InternalCliRegisterCallback is public only for testing. +func (s *Server) InternalCliRegisterCallback(srv net.Conn) { + s.startGoRoutine(func() { + s.createClient(srv) + s.grWG.Done() + }) +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -287,6 +333,13 @@ func (s *Server) Shutdown() { s.grRunning = false s.grMu.Unlock() + // stop any internal clients + s.icli.mu.Lock() + for _, ic := range s.icli.configured { + ic.Stop() + } + s.icli.mu.Unlock() + conns := make(map[uint64]*client) // Copy off the clients @@ -531,6 +584,11 @@ func (s *Server) startMonitoring(secure bool) { func (s *Server) createClient(conn net.Conn) *client { c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()} + _, isInternal := conn.(LocalInternalClient) + if isInternal { + c.typ = INTERNALCLI + } + // Grab JSON info string s.mu.Lock() info := s.infoJSON @@ -548,7 +606,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.Debugf("Client connection created") // Check for Auth - if authRequired { + if !isInternal && authRequired { c.setAuthTimer(secondsToDuration(s.opts.AuthTimeout)) } @@ -582,7 +640,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Lock() // Check for TLS - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("Starting TLS client connection handshake") c.nc = tls.Server(c.nc, s.opts.TLSConfig) conn := c.nc.(*tls.Conn) @@ -613,7 +671,7 @@ func (s *Server) createClient(conn net.Conn) *client { return c } - if tlsRequired { + if !isInternal && tlsRequired { // Rewrap bw c.bw = bufio.NewWriterSize(c.nc, startBufSize) } @@ -621,12 +679,14 @@ func (s *Server) createClient(conn net.Conn) *client { // Do final client initialization // Set the Ping timer - c.setPingTimer() + if !isInternal { + c.setPingTimer() + } // Spin up the read loop. s.startGoRoutine(func() { c.readLoop() }) - if tlsRequired { + if !isInternal && tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) @@ -755,6 +815,8 @@ func (s *Server) checkAuth(c *client) bool { return s.checkClientAuth(c) case ROUTER: return s.checkRouterAuth(c) + case INTERNALCLI: + return true default: return false } diff --git a/vendor/github.com/google/btree/LICENSE b/vendor/github.com/google/btree/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/vendor/github.com/google/btree/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/google/btree/README.md b/vendor/github.com/google/btree/README.md new file mode 100644 index 0000000000..6062a4dacd --- /dev/null +++ b/vendor/github.com/google/btree/README.md @@ -0,0 +1,12 @@ +# BTree implementation for Go + +![Travis CI Build Status](https://api.travis-ci.org/google/btree.svg?branch=master) + +This package provides an in-memory B-Tree implementation for Go, useful as +an ordered, mutable data structure. + +The API is based off of the wonderful +http://godoc.org/github.com/petar/GoLLRB/llrb, and is meant to allow btree to +act as a drop-in replacement for gollrb trees. + +See http://godoc.org/github.com/google/btree for documentation. diff --git a/vendor/github.com/google/btree/btree.go b/vendor/github.com/google/btree/btree.go new file mode 100644 index 0000000000..eb74b1d39c --- /dev/null +++ b/vendor/github.com/google/btree/btree.go @@ -0,0 +1,821 @@ +// Copyright 2014 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package btree implements in-memory B-Trees of arbitrary degree. +// +// btree implements an in-memory B-Tree for use as an ordered data structure. +// It is not meant for persistent storage solutions. +// +// It has a flatter structure than an equivalent red-black or other binary tree, +// which in some cases yields better memory usage and/or performance. +// See some discussion on the matter here: +// http://google-opensource.blogspot.com/2013/01/c-containers-that-save-memory-and-time.html +// Note, though, that this project is in no way related to the C++ B-Tree +// implementation written about there. +// +// Within this tree, each node contains a slice of items and a (possibly nil) +// slice of children. For basic numeric values or raw structs, this can cause +// efficiency differences when compared to equivalent C++ template code that +// stores values in arrays within the node: +// * Due to the overhead of storing values as interfaces (each +// value needs to be stored as the value itself, then 2 words for the +// interface pointing to that value and its type), resulting in higher +// memory use. +// * Since interfaces can point to values anywhere in memory, values are +// most likely not stored in contiguous blocks, resulting in a higher +// number of cache misses. +// These issues don't tend to matter, though, when working with strings or other +// heap-allocated structures, since C++-equivalent structures also must store +// pointers and also distribute their values across the heap. +// +// This implementation is designed to be a drop-in replacement to gollrb.LLRB +// trees, (http://github.com/petar/gollrb), an excellent and probably the most +// widely used ordered tree implementation in the Go ecosystem currently. +// Its functions, therefore, exactly mirror those of +// llrb.LLRB where possible. Unlike gollrb, though, we currently don't +// support storing multiple equivalent values. +package btree + +import ( + "fmt" + "io" + "sort" + "strings" + "sync" +) + +// Item represents a single object in the tree. +type Item interface { + // Less tests whether the current item is less than the given argument. + // + // This must provide a strict weak ordering. + // If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only + // hold one of either a or b in the tree). + Less(than Item) bool +} + +const ( + DefaultFreeListSize = 32 +) + +var ( + nilItems = make(items, 16) + nilChildren = make(children, 16) +) + +// FreeList represents a free list of btree nodes. By default each +// BTree has its own FreeList, but multiple BTrees can share the same +// FreeList. +// Two Btrees using the same freelist are safe for concurrent write access. +type FreeList struct { + mu sync.Mutex + freelist []*node +} + +// NewFreeList creates a new free list. +// size is the maximum size of the returned free list. +func NewFreeList(size int) *FreeList { + return &FreeList{freelist: make([]*node, 0, size)} +} + +func (f *FreeList) newNode() (n *node) { + f.mu.Lock() + index := len(f.freelist) - 1 + if index < 0 { + f.mu.Unlock() + return new(node) + } + n = f.freelist[index] + f.freelist[index] = nil + f.freelist = f.freelist[:index] + f.mu.Unlock() + return +} + +func (f *FreeList) freeNode(n *node) { + f.mu.Lock() + if len(f.freelist) < cap(f.freelist) { + f.freelist = append(f.freelist, n) + } + f.mu.Unlock() +} + +// ItemIterator allows callers of Ascend* to iterate in-order over portions of +// the tree. When this function returns false, iteration will stop and the +// associated Ascend* function will immediately return. +type ItemIterator func(i Item) bool + +// New creates a new B-Tree with the given degree. +// +// New(2), for example, will create a 2-3-4 tree (each node contains 1-3 items +// and 2-4 children). +func New(degree int) *BTree { + return NewWithFreeList(degree, NewFreeList(DefaultFreeListSize)) +} + +// NewWithFreeList creates a new B-Tree that uses the given node free list. +func NewWithFreeList(degree int, f *FreeList) *BTree { + if degree <= 1 { + panic("bad degree") + } + return &BTree{ + degree: degree, + cow: ©OnWriteContext{freelist: f}, + } +} + +// items stores items in a node. +type items []Item + +// insertAt inserts a value into the given index, pushing all subsequent values +// forward. +func (s *items) insertAt(index int, item Item) { + *s = append(*s, nil) + if index < len(*s) { + copy((*s)[index+1:], (*s)[index:]) + } + (*s)[index] = item +} + +// removeAt removes a value at a given index, pulling all subsequent values +// back. +func (s *items) removeAt(index int) Item { + item := (*s)[index] + copy((*s)[index:], (*s)[index+1:]) + (*s)[len(*s)-1] = nil + *s = (*s)[:len(*s)-1] + return item +} + +// pop removes and returns the last element in the list. +func (s *items) pop() (out Item) { + index := len(*s) - 1 + out = (*s)[index] + (*s)[index] = nil + *s = (*s)[:index] + return +} + +// truncate truncates this instance at index so that it contains only the +// first index items. index must be less than or equal to length. +func (s *items) truncate(index int) { + var toClear items + *s, toClear = (*s)[:index], (*s)[index:] + for len(toClear) > 0 { + toClear = toClear[copy(toClear, nilItems):] + } +} + +// find returns the index where the given item should be inserted into this +// list. 'found' is true if the item already exists in the list at the given +// index. +func (s items) find(item Item) (index int, found bool) { + i := sort.Search(len(s), func(i int) bool { + return item.Less(s[i]) + }) + if i > 0 && !s[i-1].Less(item) { + return i - 1, true + } + return i, false +} + +// children stores child nodes in a node. +type children []*node + +// insertAt inserts a value into the given index, pushing all subsequent values +// forward. +func (s *children) insertAt(index int, n *node) { + *s = append(*s, nil) + if index < len(*s) { + copy((*s)[index+1:], (*s)[index:]) + } + (*s)[index] = n +} + +// removeAt removes a value at a given index, pulling all subsequent values +// back. +func (s *children) removeAt(index int) *node { + n := (*s)[index] + copy((*s)[index:], (*s)[index+1:]) + (*s)[len(*s)-1] = nil + *s = (*s)[:len(*s)-1] + return n +} + +// pop removes and returns the last element in the list. +func (s *children) pop() (out *node) { + index := len(*s) - 1 + out = (*s)[index] + (*s)[index] = nil + *s = (*s)[:index] + return +} + +// truncate truncates this instance at index so that it contains only the +// first index children. index must be less than or equal to length. +func (s *children) truncate(index int) { + var toClear children + *s, toClear = (*s)[:index], (*s)[index:] + for len(toClear) > 0 { + toClear = toClear[copy(toClear, nilChildren):] + } +} + +// node is an internal node in a tree. +// +// It must at all times maintain the invariant that either +// * len(children) == 0, len(items) unconstrained +// * len(children) == len(items) + 1 +type node struct { + items items + children children + cow *copyOnWriteContext +} + +func (n *node) mutableFor(cow *copyOnWriteContext) *node { + if n.cow == cow { + return n + } + out := cow.newNode() + if cap(out.items) >= len(n.items) { + out.items = out.items[:len(n.items)] + } else { + out.items = make(items, len(n.items), cap(n.items)) + } + copy(out.items, n.items) + // Copy children + if cap(out.children) >= len(n.children) { + out.children = out.children[:len(n.children)] + } else { + out.children = make(children, len(n.children), cap(n.children)) + } + copy(out.children, n.children) + return out +} + +func (n *node) mutableChild(i int) *node { + c := n.children[i].mutableFor(n.cow) + n.children[i] = c + return c +} + +// split splits the given node at the given index. The current node shrinks, +// and this function returns the item that existed at that index and a new node +// containing all items/children after it. +func (n *node) split(i int) (Item, *node) { + item := n.items[i] + next := n.cow.newNode() + next.items = append(next.items, n.items[i+1:]...) + n.items.truncate(i) + if len(n.children) > 0 { + next.children = append(next.children, n.children[i+1:]...) + n.children.truncate(i + 1) + } + return item, next +} + +// maybeSplitChild checks if a child should be split, and if so splits it. +// Returns whether or not a split occurred. +func (n *node) maybeSplitChild(i, maxItems int) bool { + if len(n.children[i].items) < maxItems { + return false + } + first := n.mutableChild(i) + item, second := first.split(maxItems / 2) + n.items.insertAt(i, item) + n.children.insertAt(i+1, second) + return true +} + +// insert inserts an item into the subtree rooted at this node, making sure +// no nodes in the subtree exceed maxItems items. Should an equivalent item be +// be found/replaced by insert, it will be returned. +func (n *node) insert(item Item, maxItems int) Item { + i, found := n.items.find(item) + if found { + out := n.items[i] + n.items[i] = item + return out + } + if len(n.children) == 0 { + n.items.insertAt(i, item) + return nil + } + if n.maybeSplitChild(i, maxItems) { + inTree := n.items[i] + switch { + case item.Less(inTree): + // no change, we want first split node + case inTree.Less(item): + i++ // we want second split node + default: + out := n.items[i] + n.items[i] = item + return out + } + } + return n.mutableChild(i).insert(item, maxItems) +} + +// get finds the given key in the subtree and returns it. +func (n *node) get(key Item) Item { + i, found := n.items.find(key) + if found { + return n.items[i] + } else if len(n.children) > 0 { + return n.children[i].get(key) + } + return nil +} + +// min returns the first item in the subtree. +func min(n *node) Item { + if n == nil { + return nil + } + for len(n.children) > 0 { + n = n.children[0] + } + if len(n.items) == 0 { + return nil + } + return n.items[0] +} + +// max returns the last item in the subtree. +func max(n *node) Item { + if n == nil { + return nil + } + for len(n.children) > 0 { + n = n.children[len(n.children)-1] + } + if len(n.items) == 0 { + return nil + } + return n.items[len(n.items)-1] +} + +// toRemove details what item to remove in a node.remove call. +type toRemove int + +const ( + removeItem toRemove = iota // removes the given item + removeMin // removes smallest item in the subtree + removeMax // removes largest item in the subtree +) + +// remove removes an item from the subtree rooted at this node. +func (n *node) remove(item Item, minItems int, typ toRemove) Item { + var i int + var found bool + switch typ { + case removeMax: + if len(n.children) == 0 { + return n.items.pop() + } + i = len(n.items) + case removeMin: + if len(n.children) == 0 { + return n.items.removeAt(0) + } + i = 0 + case removeItem: + i, found = n.items.find(item) + if len(n.children) == 0 { + if found { + return n.items.removeAt(i) + } + return nil + } + default: + panic("invalid type") + } + // If we get to here, we have children. + if len(n.children[i].items) <= minItems { + return n.growChildAndRemove(i, item, minItems, typ) + } + child := n.mutableChild(i) + // Either we had enough items to begin with, or we've done some + // merging/stealing, because we've got enough now and we're ready to return + // stuff. + if found { + // The item exists at index 'i', and the child we've selected can give us a + // predecessor, since if we've gotten here it's got > minItems items in it. + out := n.items[i] + // We use our special-case 'remove' call with typ=maxItem to pull the + // predecessor of item i (the rightmost leaf of our immediate left child) + // and set it into where we pulled the item from. + n.items[i] = child.remove(nil, minItems, removeMax) + return out + } + // Final recursive call. Once we're here, we know that the item isn't in this + // node and that the child is big enough to remove from. + return child.remove(item, minItems, typ) +} + +// growChildAndRemove grows child 'i' to make sure it's possible to remove an +// item from it while keeping it at minItems, then calls remove to actually +// remove it. +// +// Most documentation says we have to do two sets of special casing: +// 1) item is in this node +// 2) item is in child +// In both cases, we need to handle the two subcases: +// A) node has enough values that it can spare one +// B) node doesn't have enough values +// For the latter, we have to check: +// a) left sibling has node to spare +// b) right sibling has node to spare +// c) we must merge +// To simplify our code here, we handle cases #1 and #2 the same: +// If a node doesn't have enough items, we make sure it does (using a,b,c). +// We then simply redo our remove call, and the second time (regardless of +// whether we're in case 1 or 2), we'll have enough items and can guarantee +// that we hit case A. +func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove) Item { + if i > 0 && len(n.children[i-1].items) > minItems { + // Steal from left child + child := n.mutableChild(i) + stealFrom := n.mutableChild(i - 1) + stolenItem := stealFrom.items.pop() + child.items.insertAt(0, n.items[i-1]) + n.items[i-1] = stolenItem + if len(stealFrom.children) > 0 { + child.children.insertAt(0, stealFrom.children.pop()) + } + } else if i < len(n.items) && len(n.children[i+1].items) > minItems { + // steal from right child + child := n.mutableChild(i) + stealFrom := n.mutableChild(i + 1) + stolenItem := stealFrom.items.removeAt(0) + child.items = append(child.items, n.items[i]) + n.items[i] = stolenItem + if len(stealFrom.children) > 0 { + child.children = append(child.children, stealFrom.children.removeAt(0)) + } + } else { + if i >= len(n.items) { + i-- + } + child := n.mutableChild(i) + // merge with right child + mergeItem := n.items.removeAt(i) + mergeChild := n.children.removeAt(i + 1) + child.items = append(child.items, mergeItem) + child.items = append(child.items, mergeChild.items...) + child.children = append(child.children, mergeChild.children...) + n.cow.freeNode(mergeChild) + } + return n.remove(item, minItems, typ) +} + +type direction int + +const ( + descend = direction(-1) + ascend = direction(+1) +) + +// iterate provides a simple method for iterating over elements in the tree. +// +// When ascending, the 'start' should be less than 'stop' and when descending, +// the 'start' should be greater than 'stop'. Setting 'includeStart' to true +// will force the iterator to include the first item when it equals 'start', +// thus creating a "greaterOrEqual" or "lessThanEqual" rather than just a +// "greaterThan" or "lessThan" queries. +func (n *node) iterate(dir direction, start, stop Item, includeStart bool, hit bool, iter ItemIterator) (bool, bool) { + var ok bool + switch dir { + case ascend: + for i := 0; i < len(n.items); i++ { + if start != nil && n.items[i].Less(start) { + continue + } + if len(n.children) > 0 { + if hit, ok = n.children[i].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + if !includeStart && !hit && start != nil && !start.Less(n.items[i]) { + hit = true + continue + } + hit = true + if stop != nil && !n.items[i].Less(stop) { + return hit, false + } + if !iter(n.items[i]) { + return hit, false + } + } + if len(n.children) > 0 { + if hit, ok = n.children[len(n.children)-1].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + case descend: + for i := len(n.items) - 1; i >= 0; i-- { + if start != nil && !n.items[i].Less(start) { + if !includeStart || hit || start.Less(n.items[i]) { + continue + } + } + if len(n.children) > 0 { + if hit, ok = n.children[i+1].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + if stop != nil && !stop.Less(n.items[i]) { + return hit, false // continue + } + hit = true + if !iter(n.items[i]) { + return hit, false + } + } + if len(n.children) > 0 { + if hit, ok = n.children[0].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + } + return hit, true +} + +// Used for testing/debugging purposes. +func (n *node) print(w io.Writer, level int) { + fmt.Fprintf(w, "%sNODE:%v\n", strings.Repeat(" ", level), n.items) + for _, c := range n.children { + c.print(w, level+1) + } +} + +// BTree is an implementation of a B-Tree. +// +// BTree stores Item instances in an ordered structure, allowing easy insertion, +// removal, and iteration. +// +// Write operations are not safe for concurrent mutation by multiple +// goroutines, but Read operations are. +type BTree struct { + degree int + length int + root *node + cow *copyOnWriteContext +} + +// copyOnWriteContext pointers determine node ownership... a tree with a write +// context equivalent to a node's write context is allowed to modify that node. +// A tree whose write context does not match a node's is not allowed to modify +// it, and must create a new, writable copy (IE: it's a Clone). +// +// When doing any write operation, we maintain the invariant that the current +// node's context is equal to the context of the tree that requested the write. +// We do this by, before we descend into any node, creating a copy with the +// correct context if the contexts don't match. +// +// Since the node we're currently visiting on any write has the requesting +// tree's context, that node is modifiable in place. Children of that node may +// not share context, but before we descend into them, we'll make a mutable +// copy. +type copyOnWriteContext struct { + freelist *FreeList +} + +// Clone clones the btree, lazily. Clone should not be called concurrently, +// but the original tree (t) and the new tree (t2) can be used concurrently +// once the Clone call completes. +// +// The internal tree structure of b is marked read-only and shared between t and +// t2. Writes to both t and t2 use copy-on-write logic, creating new nodes +// whenever one of b's original nodes would have been modified. Read operations +// should have no performance degredation. Write operations for both t and t2 +// will initially experience minor slow-downs caused by additional allocs and +// copies due to the aforementioned copy-on-write logic, but should converge to +// the original performance characteristics of the original tree. +func (t *BTree) Clone() (t2 *BTree) { + // Create two entirely new copy-on-write contexts. + // This operation effectively creates three trees: + // the original, shared nodes (old b.cow) + // the new b.cow nodes + // the new out.cow nodes + cow1, cow2 := *t.cow, *t.cow + out := *t + t.cow = &cow1 + out.cow = &cow2 + return &out +} + +// maxItems returns the max number of items to allow per node. +func (t *BTree) maxItems() int { + return t.degree*2 - 1 +} + +// minItems returns the min number of items to allow per node (ignored for the +// root node). +func (t *BTree) minItems() int { + return t.degree - 1 +} + +func (c *copyOnWriteContext) newNode() (n *node) { + n = c.freelist.newNode() + n.cow = c + return +} + +func (c *copyOnWriteContext) freeNode(n *node) { + if n.cow == c { + // clear to allow GC + n.items.truncate(0) + n.children.truncate(0) + n.cow = nil + c.freelist.freeNode(n) + } +} + +// ReplaceOrInsert adds the given item to the tree. If an item in the tree +// already equals the given one, it is removed from the tree and returned. +// Otherwise, nil is returned. +// +// nil cannot be added to the tree (will panic). +func (t *BTree) ReplaceOrInsert(item Item) Item { + if item == nil { + panic("nil item being added to BTree") + } + if t.root == nil { + t.root = t.cow.newNode() + t.root.items = append(t.root.items, item) + t.length++ + return nil + } else { + t.root = t.root.mutableFor(t.cow) + if len(t.root.items) >= t.maxItems() { + item2, second := t.root.split(t.maxItems() / 2) + oldroot := t.root + t.root = t.cow.newNode() + t.root.items = append(t.root.items, item2) + t.root.children = append(t.root.children, oldroot, second) + } + } + out := t.root.insert(item, t.maxItems()) + if out == nil { + t.length++ + } + return out +} + +// Delete removes an item equal to the passed in item from the tree, returning +// it. If no such item exists, returns nil. +func (t *BTree) Delete(item Item) Item { + return t.deleteItem(item, removeItem) +} + +// DeleteMin removes the smallest item in the tree and returns it. +// If no such item exists, returns nil. +func (t *BTree) DeleteMin() Item { + return t.deleteItem(nil, removeMin) +} + +// DeleteMax removes the largest item in the tree and returns it. +// If no such item exists, returns nil. +func (t *BTree) DeleteMax() Item { + return t.deleteItem(nil, removeMax) +} + +func (t *BTree) deleteItem(item Item, typ toRemove) Item { + if t.root == nil || len(t.root.items) == 0 { + return nil + } + t.root = t.root.mutableFor(t.cow) + out := t.root.remove(item, t.minItems(), typ) + if len(t.root.items) == 0 && len(t.root.children) > 0 { + oldroot := t.root + t.root = t.root.children[0] + t.cow.freeNode(oldroot) + } + if out != nil { + t.length-- + } + return out +} + +// AscendRange calls the iterator for every value in the tree within the range +// [greaterOrEqual, lessThan), until iterator returns false. +func (t *BTree) AscendRange(greaterOrEqual, lessThan Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, greaterOrEqual, lessThan, true, false, iterator) +} + +// AscendLessThan calls the iterator for every value in the tree within the range +// [first, pivot), until iterator returns false. +func (t *BTree) AscendLessThan(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, nil, pivot, false, false, iterator) +} + +// AscendGreaterOrEqual calls the iterator for every value in the tree within +// the range [pivot, last], until iterator returns false. +func (t *BTree) AscendGreaterOrEqual(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, pivot, nil, true, false, iterator) +} + +// Ascend calls the iterator for every value in the tree within the range +// [first, last], until iterator returns false. +func (t *BTree) Ascend(iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, nil, nil, false, false, iterator) +} + +// DescendRange calls the iterator for every value in the tree within the range +// [lessOrEqual, greaterThan), until iterator returns false. +func (t *BTree) DescendRange(lessOrEqual, greaterThan Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, lessOrEqual, greaterThan, true, false, iterator) +} + +// DescendLessOrEqual calls the iterator for every value in the tree within the range +// [pivot, first], until iterator returns false. +func (t *BTree) DescendLessOrEqual(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, pivot, nil, true, false, iterator) +} + +// DescendGreaterThan calls the iterator for every value in the tree within +// the range (pivot, last], until iterator returns false. +func (t *BTree) DescendGreaterThan(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, nil, pivot, false, false, iterator) +} + +// Descend calls the iterator for every value in the tree within the range +// [last, first], until iterator returns false. +func (t *BTree) Descend(iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, nil, nil, false, false, iterator) +} + +// Get looks for the key item in the tree, returning it. It returns nil if +// unable to find that item. +func (t *BTree) Get(key Item) Item { + if t.root == nil { + return nil + } + return t.root.get(key) +} + +// Min returns the smallest item in the tree, or nil if the tree is empty. +func (t *BTree) Min() Item { + return min(t.root) +} + +// Max returns the largest item in the tree, or nil if the tree is empty. +func (t *BTree) Max() Item { + return max(t.root) +} + +// Has returns true if the given key is in the tree. +func (t *BTree) Has(key Item) bool { + return t.Get(key) != nil +} + +// Len returns the number of items currently in the tree. +func (t *BTree) Len() int { + return t.length +} + +// Int implements the Item interface for integers. +type Int int + +// Less returns true if int(a) < int(b). +func (a Int) Less(b Item) bool { + return a < b.(Int) +} diff --git a/vendor/github.com/google/btree/btree_mem.go b/vendor/github.com/google/btree/btree_mem.go new file mode 100644 index 0000000000..cb95b7fa1b --- /dev/null +++ b/vendor/github.com/google/btree/btree_mem.go @@ -0,0 +1,76 @@ +// Copyright 2014 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build ignore + +// This binary compares memory usage between btree and gollrb. +package main + +import ( + "flag" + "fmt" + "math/rand" + "runtime" + "time" + + "github.com/google/btree" + "github.com/petar/GoLLRB/llrb" +) + +var ( + size = flag.Int("size", 1000000, "size of the tree to build") + degree = flag.Int("degree", 8, "degree of btree") + gollrb = flag.Bool("llrb", false, "use llrb instead of btree") +) + +func main() { + flag.Parse() + vals := rand.Perm(*size) + var t, v interface{} + v = vals + var stats runtime.MemStats + for i := 0; i < 10; i++ { + runtime.GC() + } + fmt.Println("-------- BEFORE ----------") + runtime.ReadMemStats(&stats) + fmt.Printf("%+v\n", stats) + start := time.Now() + if *gollrb { + tr := llrb.New() + for _, v := range vals { + tr.ReplaceOrInsert(llrb.Int(v)) + } + t = tr // keep it around + } else { + tr := btree.New(*degree) + for _, v := range vals { + tr.ReplaceOrInsert(btree.Int(v)) + } + t = tr // keep it around + } + fmt.Printf("%v inserts in %v\n", *size, time.Since(start)) + fmt.Println("-------- AFTER ----------") + runtime.ReadMemStats(&stats) + fmt.Printf("%+v\n", stats) + for i := 0; i < 10; i++ { + runtime.GC() + } + fmt.Println("-------- AFTER GC ----------") + runtime.ReadMemStats(&stats) + fmt.Printf("%+v\n", stats) + if t == v { + fmt.Println("to make sure vals and tree aren't GC'd") + } +} diff --git a/vendor/github.com/google/btree/btree_test.go b/vendor/github.com/google/btree/btree_test.go new file mode 100644 index 0000000000..5da9d8b694 --- /dev/null +++ b/vendor/github.com/google/btree/btree_test.go @@ -0,0 +1,689 @@ +// Copyright 2014 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package btree + +import ( + "flag" + "fmt" + "math/rand" + "reflect" + "sort" + "sync" + "testing" + "time" +) + +func init() { + seed := time.Now().Unix() + fmt.Println(seed) + rand.Seed(seed) +} + +// perm returns a random permutation of n Int items in the range [0, n). +func perm(n int) (out []Item) { + for _, v := range rand.Perm(n) { + out = append(out, Int(v)) + } + return +} + +// rang returns an ordered list of Int items in the range [0, n). +func rang(n int) (out []Item) { + for i := 0; i < n; i++ { + out = append(out, Int(i)) + } + return +} + +// all extracts all items from a tree in order as a slice. +func all(t *BTree) (out []Item) { + t.Ascend(func(a Item) bool { + out = append(out, a) + return true + }) + return +} + +// rangerev returns a reversed ordered list of Int items in the range [0, n). +func rangrev(n int) (out []Item) { + for i := n - 1; i >= 0; i-- { + out = append(out, Int(i)) + } + return +} + +// allrev extracts all items from a tree in reverse order as a slice. +func allrev(t *BTree) (out []Item) { + t.Descend(func(a Item) bool { + out = append(out, a) + return true + }) + return +} + +var btreeDegree = flag.Int("degree", 32, "B-Tree degree") + +func TestBTree(t *testing.T) { + tr := New(*btreeDegree) + const treeSize = 10000 + for i := 0; i < 10; i++ { + if min := tr.Min(); min != nil { + t.Fatalf("empty min, got %+v", min) + } + if max := tr.Max(); max != nil { + t.Fatalf("empty max, got %+v", max) + } + for _, item := range perm(treeSize) { + if x := tr.ReplaceOrInsert(item); x != nil { + t.Fatal("insert found item", item) + } + } + for _, item := range perm(treeSize) { + if x := tr.ReplaceOrInsert(item); x == nil { + t.Fatal("insert didn't find item", item) + } + } + if min, want := tr.Min(), Item(Int(0)); min != want { + t.Fatalf("min: want %+v, got %+v", want, min) + } + if max, want := tr.Max(), Item(Int(treeSize-1)); max != want { + t.Fatalf("max: want %+v, got %+v", want, max) + } + got := all(tr) + want := rang(treeSize) + if !reflect.DeepEqual(got, want) { + t.Fatalf("mismatch:\n got: %v\nwant: %v", got, want) + } + + gotrev := allrev(tr) + wantrev := rangrev(treeSize) + if !reflect.DeepEqual(gotrev, wantrev) { + t.Fatalf("mismatch:\n got: %v\nwant: %v", got, want) + } + + for _, item := range perm(treeSize) { + if x := tr.Delete(item); x == nil { + t.Fatalf("didn't find %v", item) + } + } + if got = all(tr); len(got) > 0 { + t.Fatalf("some left!: %v", got) + } + } +} + +func ExampleBTree() { + tr := New(*btreeDegree) + for i := Int(0); i < 10; i++ { + tr.ReplaceOrInsert(i) + } + fmt.Println("len: ", tr.Len()) + fmt.Println("get3: ", tr.Get(Int(3))) + fmt.Println("get100: ", tr.Get(Int(100))) + fmt.Println("del4: ", tr.Delete(Int(4))) + fmt.Println("del100: ", tr.Delete(Int(100))) + fmt.Println("replace5: ", tr.ReplaceOrInsert(Int(5))) + fmt.Println("replace100:", tr.ReplaceOrInsert(Int(100))) + fmt.Println("min: ", tr.Min()) + fmt.Println("delmin: ", tr.DeleteMin()) + fmt.Println("max: ", tr.Max()) + fmt.Println("delmax: ", tr.DeleteMax()) + fmt.Println("len: ", tr.Len()) + // Output: + // len: 10 + // get3: 3 + // get100: + // del4: 4 + // del100: + // replace5: 5 + // replace100: + // min: 0 + // delmin: 0 + // max: 100 + // delmax: 100 + // len: 8 +} + +func TestDeleteMin(t *testing.T) { + tr := New(3) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + for v := tr.DeleteMin(); v != nil; v = tr.DeleteMin() { + got = append(got, v) + } + if want := rang(100); !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } +} + +func TestDeleteMax(t *testing.T) { + tr := New(3) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + for v := tr.DeleteMax(); v != nil; v = tr.DeleteMax() { + got = append(got, v) + } + // Reverse our list. + for i := 0; i < len(got)/2; i++ { + got[i], got[len(got)-i-1] = got[len(got)-i-1], got[i] + } + if want := rang(100); !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } +} + +func TestAscendRange(t *testing.T) { + tr := New(2) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + tr.AscendRange(Int(40), Int(60), func(a Item) bool { + got = append(got, a) + return true + }) + if want := rang(100)[40:60]; !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } + got = got[:0] + tr.AscendRange(Int(40), Int(60), func(a Item) bool { + if a.(Int) > 50 { + return false + } + got = append(got, a) + return true + }) + if want := rang(100)[40:51]; !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } +} + +func TestDescendRange(t *testing.T) { + tr := New(2) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + tr.DescendRange(Int(60), Int(40), func(a Item) bool { + got = append(got, a) + return true + }) + if want := rangrev(100)[39:59]; !reflect.DeepEqual(got, want) { + t.Fatalf("descendrange:\n got: %v\nwant: %v", got, want) + } + got = got[:0] + tr.DescendRange(Int(60), Int(40), func(a Item) bool { + if a.(Int) < 50 { + return false + } + got = append(got, a) + return true + }) + if want := rangrev(100)[39:50]; !reflect.DeepEqual(got, want) { + t.Fatalf("descendrange:\n got: %v\nwant: %v", got, want) + } +} +func TestAscendLessThan(t *testing.T) { + tr := New(*btreeDegree) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + tr.AscendLessThan(Int(60), func(a Item) bool { + got = append(got, a) + return true + }) + if want := rang(100)[:60]; !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } + got = got[:0] + tr.AscendLessThan(Int(60), func(a Item) bool { + if a.(Int) > 50 { + return false + } + got = append(got, a) + return true + }) + if want := rang(100)[:51]; !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } +} + +func TestDescendLessOrEqual(t *testing.T) { + tr := New(*btreeDegree) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + tr.DescendLessOrEqual(Int(40), func(a Item) bool { + got = append(got, a) + return true + }) + if want := rangrev(100)[59:]; !reflect.DeepEqual(got, want) { + t.Fatalf("descendlessorequal:\n got: %v\nwant: %v", got, want) + } + got = got[:0] + tr.DescendLessOrEqual(Int(60), func(a Item) bool { + if a.(Int) < 50 { + return false + } + got = append(got, a) + return true + }) + if want := rangrev(100)[39:50]; !reflect.DeepEqual(got, want) { + t.Fatalf("descendlessorequal:\n got: %v\nwant: %v", got, want) + } +} +func TestAscendGreaterOrEqual(t *testing.T) { + tr := New(*btreeDegree) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + tr.AscendGreaterOrEqual(Int(40), func(a Item) bool { + got = append(got, a) + return true + }) + if want := rang(100)[40:]; !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } + got = got[:0] + tr.AscendGreaterOrEqual(Int(40), func(a Item) bool { + if a.(Int) > 50 { + return false + } + got = append(got, a) + return true + }) + if want := rang(100)[40:51]; !reflect.DeepEqual(got, want) { + t.Fatalf("ascendrange:\n got: %v\nwant: %v", got, want) + } +} + +func TestDescendGreaterThan(t *testing.T) { + tr := New(*btreeDegree) + for _, v := range perm(100) { + tr.ReplaceOrInsert(v) + } + var got []Item + tr.DescendGreaterThan(Int(40), func(a Item) bool { + got = append(got, a) + return true + }) + if want := rangrev(100)[:59]; !reflect.DeepEqual(got, want) { + t.Fatalf("descendgreaterthan:\n got: %v\nwant: %v", got, want) + } + got = got[:0] + tr.DescendGreaterThan(Int(40), func(a Item) bool { + if a.(Int) < 50 { + return false + } + got = append(got, a) + return true + }) + if want := rangrev(100)[:50]; !reflect.DeepEqual(got, want) { + t.Fatalf("descendgreaterthan:\n got: %v\nwant: %v", got, want) + } +} + +const benchmarkTreeSize = 10000 + +func BenchmarkInsert(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + b.StartTimer() + i := 0 + for i < b.N { + tr := New(*btreeDegree) + for _, item := range insertP { + tr.ReplaceOrInsert(item) + i++ + if i >= b.N { + return + } + } + } +} + +func BenchmarkDeleteInsert(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, item := range insertP { + tr.ReplaceOrInsert(item) + } + b.StartTimer() + for i := 0; i < b.N; i++ { + tr.Delete(insertP[i%benchmarkTreeSize]) + tr.ReplaceOrInsert(insertP[i%benchmarkTreeSize]) + } +} + +func BenchmarkDeleteInsertCloneOnce(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, item := range insertP { + tr.ReplaceOrInsert(item) + } + tr = tr.Clone() + b.StartTimer() + for i := 0; i < b.N; i++ { + tr.Delete(insertP[i%benchmarkTreeSize]) + tr.ReplaceOrInsert(insertP[i%benchmarkTreeSize]) + } +} + +func BenchmarkDeleteInsertCloneEachTime(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, item := range insertP { + tr.ReplaceOrInsert(item) + } + b.StartTimer() + for i := 0; i < b.N; i++ { + tr = tr.Clone() + tr.Delete(insertP[i%benchmarkTreeSize]) + tr.ReplaceOrInsert(insertP[i%benchmarkTreeSize]) + } +} + +func BenchmarkDelete(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + removeP := perm(benchmarkTreeSize) + b.StartTimer() + i := 0 + for i < b.N { + b.StopTimer() + tr := New(*btreeDegree) + for _, v := range insertP { + tr.ReplaceOrInsert(v) + } + b.StartTimer() + for _, item := range removeP { + tr.Delete(item) + i++ + if i >= b.N { + return + } + } + if tr.Len() > 0 { + panic(tr.Len()) + } + } +} + +func BenchmarkGet(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + removeP := perm(benchmarkTreeSize) + b.StartTimer() + i := 0 + for i < b.N { + b.StopTimer() + tr := New(*btreeDegree) + for _, v := range insertP { + tr.ReplaceOrInsert(v) + } + b.StartTimer() + for _, item := range removeP { + tr.Get(item) + i++ + if i >= b.N { + return + } + } + } +} + +func BenchmarkGetCloneEachTime(b *testing.B) { + b.StopTimer() + insertP := perm(benchmarkTreeSize) + removeP := perm(benchmarkTreeSize) + b.StartTimer() + i := 0 + for i < b.N { + b.StopTimer() + tr := New(*btreeDegree) + for _, v := range insertP { + tr.ReplaceOrInsert(v) + } + b.StartTimer() + for _, item := range removeP { + tr = tr.Clone() + tr.Get(item) + i++ + if i >= b.N { + return + } + } + } +} + +type byInts []Item + +func (a byInts) Len() int { + return len(a) +} + +func (a byInts) Less(i, j int) bool { + return a[i].(Int) < a[j].(Int) +} + +func (a byInts) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func BenchmarkAscend(b *testing.B) { + arr := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, v := range arr { + tr.ReplaceOrInsert(v) + } + sort.Sort(byInts(arr)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := 0 + tr.Ascend(func(item Item) bool { + if item.(Int) != arr[j].(Int) { + b.Fatalf("mismatch: expected: %v, got %v", arr[j].(Int), item.(Int)) + } + j++ + return true + }) + } +} + +func BenchmarkDescend(b *testing.B) { + arr := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, v := range arr { + tr.ReplaceOrInsert(v) + } + sort.Sort(byInts(arr)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := len(arr) - 1 + tr.Descend(func(item Item) bool { + if item.(Int) != arr[j].(Int) { + b.Fatalf("mismatch: expected: %v, got %v", arr[j].(Int), item.(Int)) + } + j-- + return true + }) + } +} +func BenchmarkAscendRange(b *testing.B) { + arr := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, v := range arr { + tr.ReplaceOrInsert(v) + } + sort.Sort(byInts(arr)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := 100 + tr.AscendRange(Int(100), arr[len(arr)-100], func(item Item) bool { + if item.(Int) != arr[j].(Int) { + b.Fatalf("mismatch: expected: %v, got %v", arr[j].(Int), item.(Int)) + } + j++ + return true + }) + if j != len(arr)-100 { + b.Fatalf("expected: %v, got %v", len(arr)-100, j) + } + } +} + +func BenchmarkDescendRange(b *testing.B) { + arr := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, v := range arr { + tr.ReplaceOrInsert(v) + } + sort.Sort(byInts(arr)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := len(arr) - 100 + tr.DescendRange(arr[len(arr)-100], Int(100), func(item Item) bool { + if item.(Int) != arr[j].(Int) { + b.Fatalf("mismatch: expected: %v, got %v", arr[j].(Int), item.(Int)) + } + j-- + return true + }) + if j != 100 { + b.Fatalf("expected: %v, got %v", len(arr)-100, j) + } + } +} +func BenchmarkAscendGreaterOrEqual(b *testing.B) { + arr := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, v := range arr { + tr.ReplaceOrInsert(v) + } + sort.Sort(byInts(arr)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := 100 + k := 0 + tr.AscendGreaterOrEqual(Int(100), func(item Item) bool { + if item.(Int) != arr[j].(Int) { + b.Fatalf("mismatch: expected: %v, got %v", arr[j].(Int), item.(Int)) + } + j++ + k++ + return true + }) + if j != len(arr) { + b.Fatalf("expected: %v, got %v", len(arr), j) + } + if k != len(arr)-100 { + b.Fatalf("expected: %v, got %v", len(arr)-100, k) + } + } +} +func BenchmarkDescendLessOrEqual(b *testing.B) { + arr := perm(benchmarkTreeSize) + tr := New(*btreeDegree) + for _, v := range arr { + tr.ReplaceOrInsert(v) + } + sort.Sort(byInts(arr)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := len(arr) - 100 + k := len(arr) + tr.DescendLessOrEqual(arr[len(arr)-100], func(item Item) bool { + if item.(Int) != arr[j].(Int) { + b.Fatalf("mismatch: expected: %v, got %v", arr[j].(Int), item.(Int)) + } + j-- + k-- + return true + }) + if j != -1 { + b.Fatalf("expected: %v, got %v", -1, j) + } + if k != 99 { + b.Fatalf("expected: %v, got %v", 99, k) + } + } +} + +const cloneTestSize = 10000 + +func cloneTest(t *testing.T, b *BTree, start int, p []Item, wg *sync.WaitGroup, trees *[]*BTree) { + t.Logf("Starting new clone at %v", start) + *trees = append(*trees, b) + for i := start; i < cloneTestSize; i++ { + b.ReplaceOrInsert(p[i]) + if i%(cloneTestSize/5) == 0 { + wg.Add(1) + go cloneTest(t, b.Clone(), i+1, p, wg, trees) + } + } + wg.Done() +} + +func TestCloneConcurrentOperations(t *testing.T) { + b := New(*btreeDegree) + trees := []*BTree{} + p := perm(cloneTestSize) + var wg sync.WaitGroup + wg.Add(1) + go cloneTest(t, b, 0, p, &wg, &trees) + wg.Wait() + want := rang(cloneTestSize) + t.Logf("Starting equality checks on %d trees", len(trees)) + for i, tree := range trees { + if !reflect.DeepEqual(want, all(tree)) { + t.Errorf("tree %v mismatch", i) + } + } + t.Log("Removing half from first half") + toRemove := rang(cloneTestSize)[cloneTestSize/2:] + for i := 0; i < len(trees)/2; i++ { + tree := trees[i] + wg.Add(1) + go func() { + for _, item := range toRemove { + tree.Delete(item) + } + wg.Done() + }() + } + wg.Wait() + t.Log("Checking all values again") + for i, tree := range trees { + var wantpart []Item + if i < len(trees)/2 { + wantpart = want[:cloneTestSize/2] + } else { + wantpart = want + } + if got := all(tree); !reflect.DeepEqual(wantpart, got) { + t.Errorf("tree %v mismatch, want %v got %v", i, len(want), len(got)) + } + } +} diff --git a/vendor/github.com/nats-io/nuid/.gitignore b/vendor/github.com/nats-io/nuid/.gitignore new file mode 100644 index 0000000000..daf913b1b3 --- /dev/null +++ b/vendor/github.com/nats-io/nuid/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/nats-io/nuid/README.md b/vendor/github.com/nats-io/nuid/README.md new file mode 100644 index 0000000000..73d42e14ac --- /dev/null +++ b/vendor/github.com/nats-io/nuid/README.md @@ -0,0 +1,66 @@ +# NUID + +[![License MIT](https://img.shields.io/npm/l/express.svg)](http://opensource.org/licenses/MIT) +[![ReportCard](http://goreportcard.com/badge/nats-io/nuid)](http://goreportcard.com/report/nats-io/nuid) +[![Build Status](https://travis-ci.org/nats-io/nuid.svg?branch=master)](http://travis-ci.org/nats-io/nuid) +[![Release](https://img.shields.io/badge/release-v1.0.0-1eb0fc.svg)](https://github.com/nats-io/nuid/releases/tag/v1.0.0) +[![GoDoc](http://godoc.org/github.com/nats-io/nuid?status.png)](http://godoc.org/github.com/nats-io/nuid) +[![Coverage Status](https://coveralls.io/repos/github/nats-io/nuid/badge.svg?branch=master)](https://coveralls.io/github/nats-io/nuid?branch=master) + +A highly performant unique identifier generator. + +## Installation + +Use the `go` command: + + $ go get github.com/nats-io/nuid + +## Basic Usage +```go + +// Utilize the global locked instance +nuid := nuid.Next() + +// Create an instance, these are not locked. +n := nuid.New() +nuid = n.Next() + +// Generate a new crypto/rand seeded prefix. +// Generally not needed, happens automatically. +n.RandomizePrefix() +``` + +## Performance +NUID needs to be very fast to generate and be truly unique, all while being entropy pool friendly. +NUID uses 12 bytes of crypto generated data (entropy draining), and 10 bytes of pseudo-random +sequential data that increments with a pseudo-random increment. + +Total length of a NUID string is 22 bytes of base 36 ascii text, so 36^22 or +17324272922341479351919144385642496 possibilities. + +NUID can generate identifiers as fast as 60ns, or ~16 million per second. There is an associated +benchmark you can use to test performance on your own hardware. + +## License + +(The MIT License) + +Copyright (c) 2016 Apcera Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to +deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. diff --git a/vendor/github.com/nats-io/nuid/nuid_test.go b/vendor/github.com/nats-io/nuid/nuid_test.go new file mode 100644 index 0000000000..c59677f283 --- /dev/null +++ b/vendor/github.com/nats-io/nuid/nuid_test.go @@ -0,0 +1,79 @@ +package nuid + +import ( + "bytes" + "testing" +) + +func TestDigits(t *testing.T) { + if len(digits) != base { + t.Fatalf("digits length does not match base modulo") + } +} + +func TestGlobalNUIDInit(t *testing.T) { + if globalNUID == nil { + t.Fatalf("Expected g to be non-nil\n") + } + if globalNUID.pre == nil || len(globalNUID.pre) != preLen { + t.Fatalf("Expected prefix to be initialized\n") + } + if globalNUID.seq == 0 { + t.Fatalf("Expected seq to be non-zero\n") + } +} + +func TestNUIDRollover(t *testing.T) { + globalNUID.seq = maxSeq + // copy + oldPre := append([]byte{}, globalNUID.pre...) + Next() + if bytes.Equal(globalNUID.pre, oldPre) { + t.Fatalf("Expected new pre, got the old one\n") + } +} + +func TestGUIDLen(t *testing.T) { + nuid := Next() + if len(nuid) != totalLen { + t.Fatalf("Expected len of %d, got %d\n", totalLen, len(nuid)) + } +} + +func TestProperPrefix(t *testing.T) { + min := byte(255) + max := byte(0) + for i := 0; i < len(digits); i++ { + if digits[i] < min { + min = digits[i] + } + if digits[i] > max { + max = digits[i] + } + } + total := 100000 + for i := 0; i < total; i++ { + n := New() + for j := 0; j < preLen; j++ { + if n.pre[j] < min || n.pre[j] > max { + t.Fatalf("Iter %d. Valid range for bytes prefix: [%d..%d]\nIncorrect prefix at pos %d: %v (%s)", + i, min, max, j, n.pre, string(n.pre)) + } + } + } +} + +func BenchmarkNUIDSpeed(b *testing.B) { + n := New() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + n.Next() + } +} + +func BenchmarkGlobalNUIDSpeed(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + Next() + } +} diff --git a/vendor/github.com/nats-io/nuid/unique_test.go b/vendor/github.com/nats-io/nuid/unique_test.go new file mode 100644 index 0000000000..6714a33357 --- /dev/null +++ b/vendor/github.com/nats-io/nuid/unique_test.go @@ -0,0 +1,19 @@ +// +build !race + +package nuid + +import ( + "testing" +) + +func TestBasicUniqueness(t *testing.T) { + n := 10000000 + m := make(map[string]struct{}, n) + for i := 0; i < n; i++ { + n := Next() + if _, ok := m[n]; ok { + t.Fatalf("Duplicate NUID found: %v\n", n) + } + m[n] = struct{}{} + } +} diff --git a/vendor/manifest b/vendor/manifest index 14aa048a8e..e19ad5766e 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -35,6 +35,14 @@ "branch": "master", "path": "windows", "notests": true - } + }, + { + "importpath": "github.com/google/btree", + "repository": "https://github.com/google/btree", + "vcs": "git", + "revision": "316fb6d3f031ae8f4d457c6c5186b9e3ded70435", + "branch": "master", + "notests": true + } ] } \ No newline at end of file