From f5705aebe184c26c9056fbdaedd3855f1e8cdd68 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 23 Jul 2015 10:49:43 -0600 Subject: [PATCH 1/9] Rename raftState.openRaft to open --- meta/state.go | 6 +++--- meta/store.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/meta/state.go b/meta/state.go index be7647ce483..dc37edef264 100644 --- a/meta/state.go +++ b/meta/state.go @@ -17,7 +17,7 @@ import ( // across local or remote nodes. It is a form of the state design pattern and allows // the meta.Store to change its behavior with the raft layer at runtime. type raftState interface { - openRaft() error + open() error initialize() error leader() string isLeader() bool @@ -76,7 +76,7 @@ func (r *localRaft) invalidate() error { return nil } -func (r *localRaft) openRaft() error { +func (r *localRaft) open() error { s := r.store // Setup raft configuration. config := raft.DefaultConfig() @@ -308,7 +308,7 @@ func (r *remoteRaft) addPeer(addr string) error { return fmt.Errorf("cannot add peer using remote raft") } -func (r *remoteRaft) openRaft() error { +func (r *remoteRaft) open() error { go func() { for { select { diff --git a/meta/store.go b/meta/store.go index 87d4ad12e2a..034ebd9d5b3 100644 --- a/meta/store.go +++ b/meta/store.go @@ -276,7 +276,7 @@ func (s *Store) Open() error { // openRaft initializes the raft store. func (s *Store) openRaft() error { - return s.raftState.openRaft() + return s.raftState.open() } // initialize attempts to bootstrap the raft store if there are no committed entries. From c93e46d56932162bca15a3850be9a0f4bd08dba6 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 23 Jul 2015 15:53:39 -0600 Subject: [PATCH 2/9] Support add new raft nodes This change adds the first 3 nodes to the cluster as raft peers. Other nodes are data-only. --- meta/rpc.go | 49 +++++++++----- meta/rpc_test.go | 4 +- meta/state.go | 71 ++++++++++++++++++-- meta/store.go | 171 +++++++++++++++++++++++++++++++++++------------ 4 files changed, 228 insertions(+), 67 deletions(-) diff --git a/meta/rpc.go b/meta/rpc.go index 2e8fad99af9..c8c01165dfd 100644 --- a/meta/rpc.go +++ b/meta/rpc.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/hashicorp/raft" "github.com/influxdb/influxdb/meta/internal" ) @@ -29,7 +30,7 @@ type rpc struct { cachedData() *Data IsLeader() bool Leader() string - Peers() []string + Peers() ([]string, error) AddPeer(host string) error CreateNode(host string) (*NodeInfo, error) NodeByHost(host string) (*NodeInfo, error) @@ -215,26 +216,33 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon r.traceCluster("join request from: %v", *req.Addr) node, err := func() (*NodeInfo, error) { + // attempt to create the node node, err := r.store.CreateNode(*req.Addr) // if it exists, return the existing node if err == ErrNodeExists { - return r.store.NodeByHost(*req.Addr) + node, err = r.store.NodeByHost(*req.Addr) + if err != nil { + return node, err + } + r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host) } else if err != nil { return nil, fmt.Errorf("create node: %v", err) } - // FIXME: jwilder: adding raft nodes is tricky since going - // from 1 node (leader) to two kills the cluster because - // quorum is lost after adding the second node. For now, - // can only add non-raft enabled nodes - - // If we have less than 3 nodes, add them as raft peers - // if len(r.store.Peers()) < MaxRaftNodes { - // if err = r.store.AddPeer(*req.Addr); err != nil { - // return node, fmt.Errorf("add peer: %v", err) - // } - // } + peers, err := r.store.Peers() + if err != nil { + return nil, fmt.Errorf("list peers: %v", err) + } + + // If we have less than 3 nodes, add them as raft peers if they are not + // already a peer + if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) { + r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr) + if err = r.store.AddPeer(*req.Addr); err != nil { + return node, fmt.Errorf("add peer: %v", err) + } + } return node, err }() @@ -247,13 +255,18 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon return nil, err } + // get the current raft peers + peers, err := r.store.Peers() + if err != nil { + return nil, fmt.Errorf("list peers: %v", err) + } + return &internal.JoinResponse{ Header: &internal.ResponseHeader{ OK: proto.Bool(true), }, - //EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)), - EnableRaft: proto.Bool(false), - RaftNodes: r.store.Peers(), + EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)), + RaftNodes: peers, NodeID: proto.Uint64(nodeID), }, err @@ -355,7 +368,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { // Create a connection to the leader. conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout) if err != nil { - return nil, err + return nil, fmt.Errorf("rpc dial: %v", err) } defer conn.Close() @@ -421,7 +434,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { func (r *rpc) traceCluster(msg string, args ...interface{}) { if r.tracingEnabled { - r.logger.Printf("rpc error: "+msg, args...) + r.logger.Printf("rpc: "+msg, args...) } } diff --git a/meta/rpc_test.go b/meta/rpc_test.go index 2fb5886950c..3f60c6bd05c 100644 --- a/meta/rpc_test.go +++ b/meta/rpc_test.go @@ -159,7 +159,7 @@ func TestRPCJoin(t *testing.T) { t.Fatalf("failed to join: %v", err) } - if exp := false; res.RaftEnabled != false { + if exp := true; res.RaftEnabled != true { t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp) } @@ -230,7 +230,7 @@ func (f *fakeStore) cachedData() *Data { func (f *fakeStore) IsLeader() bool { return true } func (f *fakeStore) Leader() string { return f.leader } -func (f *fakeStore) Peers() []string { return []string{f.leader} } +func (f *fakeStore) Peers() ([]string, error) { return []string{f.leader}, nil } func (f *fakeStore) AddPeer(host string) error { return nil } func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) { return &NodeInfo{ID: f.newNodeID, Host: host}, nil diff --git a/meta/state.go b/meta/state.go index dc37edef264..732ca355b01 100644 --- a/meta/state.go +++ b/meta/state.go @@ -1,8 +1,11 @@ package meta import ( + "bytes" + "encoding/json" "errors" "fmt" + "io/ioutil" "math/rand" "net" "os" @@ -18,12 +21,14 @@ import ( // the meta.Store to change its behavior with the raft layer at runtime. type raftState interface { open() error + remove() error initialize() error leader() string isLeader() bool sync(index uint64, timeout time.Duration) error setPeers(addrs []string) error addPeer(addr string) error + peers() ([]string, error) invalidate() error close() error lastIndex() uint64 @@ -42,6 +47,19 @@ type localRaft struct { raftLayer *raftLayer } +func (r *localRaft) remove() error { + if err := os.RemoveAll(filepath.Join(r.store.path, "raft.db")); err != nil { + return err + } + if err := os.RemoveAll(filepath.Join(r.store.path, "peers.json")); err != nil { + return err + } + if err := os.RemoveAll(filepath.Join(r.store.path, "snapshots")); err != nil { + return err + } + return nil +} + func (r *localRaft) updateMetaData(ms *Data) { if ms == nil { return @@ -89,11 +107,6 @@ func (r *localRaft) open() error { // If no peers are set in the config then start as a single server. config.EnableSingleNode = (len(s.peers) == 0) - // Ensure our addr is in the peer list - if config.EnableSingleNode { - s.peers = append(s.peers, s.Addr.String()) - } - // Build raft layer to multiplex listener. r.raftLayer = newRaftLayer(s.RaftListener, s.Addr) @@ -246,6 +259,10 @@ func (r *localRaft) setPeers(addrs []string) error { return r.raft.SetPeers(a).Error() } +func (r *localRaft) peers() ([]string, error) { + return r.peerStore.Peers() +} + func (r *localRaft) leader() string { if r.raft == nil { return "" @@ -269,6 +286,10 @@ type remoteRaft struct { store *Store } +func (r *remoteRaft) remove() error { + return nil +} + func (r *remoteRaft) updateMetaData(ms *Data) { if ms == nil { return @@ -300,7 +321,15 @@ func (r *remoteRaft) invalidate() error { } func (r *remoteRaft) setPeers(addrs []string) error { - return nil + // Convert to JSON + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + if err := enc.Encode(addrs); err != nil { + return err + } + + // Write out as JSON + return ioutil.WriteFile(filepath.Join(r.store.path, "peers.json"), buf.Bytes(), 0755) } // addPeer adds addr to the list of peers in the cluster. @@ -308,7 +337,15 @@ func (r *remoteRaft) addPeer(addr string) error { return fmt.Errorf("cannot add peer using remote raft") } +func (r *remoteRaft) peers() ([]string, error) { + return readPeersJSON(filepath.Join(r.store.path, "peers.json")) +} + func (r *remoteRaft) open() error { + if err := r.setPeers(r.store.peers); err != nil { + return err + } + go func() { for { select { @@ -366,3 +403,25 @@ func (r *remoteRaft) sync(index uint64, timeout time.Duration) error { func (r *remoteRaft) snapshot() error { return fmt.Errorf("cannot snapshot while in remote raft state") } + +func readPeersJSON(path string) ([]string, error) { + // Read the file + buf, err := ioutil.ReadFile(path) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + // Check for no peers + if len(buf) == 0 { + return nil, nil + } + + // Decode the peers + var peers []string + dec := json.NewDecoder(bytes.NewReader(buf)) + if err := dec.Decode(&peers); err != nil { + return nil, err + } + + return peers, nil +} diff --git a/meta/store.go b/meta/store.go index 034ebd9d5b3..5abd072dd33 100644 --- a/meta/store.go +++ b/meta/store.go @@ -171,41 +171,6 @@ func (s *Store) IDPath() string { return filepath.Join(s.path, "id") } // Open opens and initializes the raft store. func (s *Store) Open() error { - // If we have a join addr, attempt to join - if s.join != "" { - - joined := false - for _, join := range strings.Split(s.join, ",") { - res, err := s.rpc.join(s.Addr.String(), join) - if err != nil { - s.Logger.Printf("join failed: %v", err) - continue - } - joined = true - - s.Logger.Printf("joined remote node %v", join) - s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes) - - s.peers = res.RaftNodes - s.id = res.NodeID - - if err := s.writeNodeID(res.NodeID); err != nil { - return err - } - - if !res.RaftEnabled { - s.raftState = &remoteRaft{s} - if err := s.invalidate(); err != nil { - return err - } - } - } - - if !joined { - return fmt.Errorf("failed to join existing cluster at %v", s.join) - } - } - // Verify that no more than 3 peers. // https://github.com/influxdb/influxdb/issues/2750 if len(s.peers) > MaxRaftNodes { @@ -231,6 +196,11 @@ func (s *Store) Open() error { } s.opened = true + // load our raft state + if err := s.loadState(); err != nil { + return err + } + // Create the root directory if it doesn't already exist. if err := s.createRootDir(); err != nil { return fmt.Errorf("mkdir all: %s", err) @@ -264,6 +234,11 @@ func (s *Store) Open() error { s.wg.Add(1) go s.serveRPCListener() + // Join an existing cluster if we needed + if err := s.joinCluster(); err != nil { + return fmt.Errorf("join: %v", err) + } + // If the ID doesn't exist then create a new node. if s.id == 0 { go s.init() @@ -274,6 +249,121 @@ func (s *Store) Open() error { return nil } +// loadState sets the appropriate raftState from our persistent storage +func (s *Store) loadState() error { + peers, err := readPeersJSON(filepath.Join(s.path, "peers.json")) + if err != nil { + return err + } + + // If we have existing peers, use those. This will override what's in the + // config. + if len(peers) > 0 { + s.peers = peers + } + + // if no peers on disk, we need to start raft in order to initialize a new + // cluster or join an existing one. + if len(peers) == 0 { + s.raftState = &localRaft{store: s} + // if we have a raft database, (maybe restored), we should start raft locally + } else if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err == nil { + s.raftState = &localRaft{store: s} + // otherwise, we should use remote raft + } else { + s.raftState = &remoteRaft{store: s} + } + return nil +} + +func (s *Store) joinCluster() error { + // No join options, so nothing to do + if s.join == "" { + return nil + } + + // We already have a node ID so were already part of a cluster, + // don't join again so we can use our existing state. + if s.id != 0 { + s.Logger.Printf("skipping join: already member of cluster: nodeId=%v raftEnabled=%v raftNodes=%v", + s.id, raft.PeerContained(s.peers, s.Addr.String()), s.peers) + return nil + } + + s.Logger.Printf("joining cluster at: %v", s.join) + for { + for _, join := range strings.Split(s.join, ",") { + res, err := s.rpc.join(s.Addr.String(), join) + if err != nil { + s.Logger.Printf("join failed: %v", err) + continue + } + + s.Logger.Printf("joined remote node %v", join) + s.Logger.Printf("nodeId=%v raftEnabled=%v raftNodes=%v", res.NodeID, res.RaftEnabled, res.RaftNodes) + + s.peers = res.RaftNodes + s.id = res.NodeID + + if err := s.writeNodeID(res.NodeID); err != nil { + s.Logger.Printf("write node id failed: %v", err) + break + } + + if !res.RaftEnabled { + // Shutdown our local raft and transition to a remote raft state + if err := s.enableRemoteRaft(); err != nil { + s.Logger.Printf("enable remote raft failed: %v", err) + break + } + } + return nil + } + + s.Logger.Printf("join failed: retrying...") + time.Sleep(time.Second) + } +} + +func (s *Store) enableLocalRaft() error { + if _, ok := s.raftState.(*localRaft); ok { + return nil + } + s.Logger.Printf("switching to local raft") + + lr := &localRaft{store: s} + return s.changeState(lr) +} + +func (s *Store) enableRemoteRaft() error { + if _, ok := s.raftState.(*remoteRaft); ok { + return nil + } + + s.Logger.Printf("switching to remote raft") + rr := &remoteRaft{store: s} + return s.changeState(rr) +} + +func (s *Store) changeState(state raftState) error { + if err := s.raftState.close(); err != nil { + return err + } + + // Clear out any persistent state + if err := s.raftState.remove(); err != nil { + return err + } + + s.raftState = state + + if err := s.raftState.open(); err != nil { + return err + } + + return nil +} + // openRaft initializes the raft store. func (s *Store) openRaft() error { return s.raftState.open() @@ -404,10 +494,6 @@ func (s *Store) Snapshot() error { // WaitForLeader sleeps until a leader is found or a timeout occurs. // timeout == 0 means to wait forever. func (s *Store) WaitForLeader(timeout time.Duration) error { - if s.Leader() != "" { - return nil - } - // Begin timeout timer. timer := time.NewTimer(timeout) defer timer.Stop() @@ -452,6 +538,9 @@ func (s *Store) IsLeader() bool { func (s *Store) Leader() string { s.mu.RLock() defer s.mu.RUnlock() + if s.raftState == nil { + return "" + } return s.raftState.leader() } @@ -466,10 +555,10 @@ func (s *Store) AddPeer(addr string) error { } // Peers returns the list of peers in the cluster. -func (s *Store) Peers() []string { +func (s *Store) Peers() ([]string, error) { s.mu.RLock() defer s.mu.RUnlock() - return s.peers + return s.raftState.peers() } // serveExecListener processes remote exec connections. From 2938601e9e5695d687321877814edf987966d434 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 10:45:03 -0600 Subject: [PATCH 3/9] Add more meta store cluster tests * Test add new nodes that become raft peers * Test restarting a cluster w/ 3 raft nodes and 3 non-raft nodes --- meta/store_test.go | 203 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 178 insertions(+), 25 deletions(-) diff --git a/meta/store_test.go b/meta/store_test.go index 22d85e26476..13ffcabe8b4 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -787,33 +787,101 @@ func TestCluster_Open(t *testing.T) { t.Fatal("no leader found") } - // Add a database to each node. - for i, s := range c.Stores { - if di, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil { - t.Fatal(err) - } else if di == nil { - t.Fatal("expected database") + // ensure all the nodes see the same metastore data + assertDatabaseReplicated(t, c) +} + +// Ensure a multi-node cluster can start, join the cluster, and the first three members are raft nodes. +func TestCluster_OpenRaft(t *testing.T) { + // Start a single node. + c := MustOpenCluster(1) + defer c.Close() + + // Check that the node becomes leader. + if s := c.Leader(); s == nil { + t.Fatal("no leader found") + } + + // Add 5 more nodes. + for i := 0; i < 5; i++ { + if err := c.Join(); err != nil { + t.Fatalf("failed to join cluster: %v", err) } } - // Verify that each store has all databases. - for i := 0; i < len(c.Stores); i++ { - for _, s := range c.Stores { - if di, err := s.Database(fmt.Sprintf("db%d", i)); err != nil { - t.Fatal(err) - } else if di == nil { - t.Fatal("expected database") - } + // ensure we have 3 raft nodes + assertRaftPeerNodes(t, c, 3) + + // ensure all the nodes see the same metastore data + assertDatabaseReplicated(t, c) +} + +// Ensure a multi-node cluster can restart +func TestCluster_Restart(t *testing.T) { + // Start a single node. + c := MustOpenCluster(1) + defer c.Close() + + // Check that one node is leader. + if s := c.Leader(); s == nil { + t.Fatal("no leader found") + } + + // Add 5 more ndes, 2 should become raft peers, 3 remote raft clients + for i := 0; i < 5; i++ { + if err := c.Join(); err != nil { + t.Fatalf("failed to join cluster: %v", err) } } + + // The tests use a host host assigned listener port. We need to re-use + // the original ports when the new cluster is restarted so that the existing + // peer store addresses can be reached. + addrs := []string{} + + // Make sure we keep files on disk when we shutdown as well as record the + // current cluster IP addresses + for _, s := range c.Stores { + s.LeaveFiles = true + addrs = append(addrs, s.Addr.String()) + } + + // Stop the cluster + if err := c.Close(); err != nil { + t.Fatalf("failed to close cluster: %v", err) + } + + // Wait a bit to avoid spurious port in use conflict errors from trying to + // start the new cluster to fast + time.Sleep(100 * time.Millisecond) + + // Re-create the cluster nodes from existing disk paths and addresses + stores := []*Store{} + for i, s := range c.Stores { + store := MustOpenStoreWithPath(addrs[i], s.Path()) + stores = append(stores, store) + } + c.Stores = stores + + // Wait for the cluster to stabilize + if err := c.WaitForLeader(); err != nil { + t.Fatal("no leader found") + } + + // ensure we have 3 raft nodes + assertRaftPeerNodes(t, c, 3) + + // ensure all the nodes see the same metastore data + assertDatabaseReplicated(t, c) } // Store is a test wrapper for meta.Store. type Store struct { *meta.Store - Listener net.Listener - Stderr bytes.Buffer - LeaveFiles bool // set to true to leave temporary files on close + BindAddress string + Listener net.Listener + Stderr bytes.Buffer + LeaveFiles bool // set to true to leave temporary files on close } // NewStore returns a new test wrapper for Store. @@ -828,7 +896,16 @@ func NewStore(c *meta.Config) *Store { // MustOpenStore opens a store in a temporary path. Panic on error. func MustOpenStore() *Store { - s := NewStore(NewConfig(MustTempFile())) + return MustOpenStoreWithPath("", MustTempFile()) +} + +// MustOpenStoreWith opens a store from a given path. Panic on error. +func MustOpenStoreWithPath(addr, path string) *Store { + c := NewConfig(path) + s := NewStore(c) + if addr != "" { + s.BindAddress = addr + } if err := s.Open(); err != nil { panic(err) } @@ -845,8 +922,13 @@ func MustOpenStore() *Store { // Open opens the store on a random TCP port. func (s *Store) Open() error { + + addr := "127.0.0.1:0" + if s.BindAddress != "" { + addr = s.BindAddress + } // Open a TCP port. - ln, err := net.Listen("tcp", "127.0.0.1:0") + ln, err := net.Listen("tcp", addr) if err != nil { return fmt.Errorf("listen: %s", err) } @@ -895,17 +977,21 @@ func NewConfig(path string) *meta.Config { // Cluster represents a group of stores joined as a raft cluster. type Cluster struct { + path string Stores []*Store } // NewCluster returns a cluster of n stores within path. func NewCluster(path string, n int) *Cluster { - c := &Cluster{} - - // Construct a list of temporary peers. - peers := make([]string, n) - for i := range peers { - peers[i] = "127.0.0.1:0" + c := &Cluster{path: path} + + peers := []string{} + if n > 1 { + // Construct a list of temporary peers. + peers := make([]string, n) + for i := range peers { + peers[i] = "127.0.0.1:0" + } } // Create new stores with temporary peers. @@ -937,6 +1023,23 @@ func MustOpenCluster(n int) *Cluster { return c } +func (c *Cluster) Join() error { + config := NewConfig(filepath.Join(c.path, strconv.Itoa(len(c.Stores)))) + config.Join = c.Stores[0].Addr.String() + s := NewStore(config) + if err := s.Open(); err != nil { + return err + } + select { + case err := <-s.Err(): + panic(fmt.Sprintf("store: i=%d, addr=%s, err=%s", len(c.Stores), s.Addr.String(), err)) + case <-s.Ready(): + } + + c.Stores = append(c.Stores, s) + return nil +} + // Open opens and initializes all stores in the cluster. func (c *Cluster) Open() error { if err := func() error { @@ -972,6 +1075,15 @@ func (c *Cluster) Close() error { return nil } +func (c *Cluster) WaitForLeader() error { + for _, s := range c.Stores { + if err := s.WaitForLeader(5 * time.Second); err != nil { + return err + } + } + return nil +} + // Leader returns the store that is currently leader. func (c *Cluster) Leader() *Store { for _, s := range c.Stores { @@ -994,3 +1106,44 @@ func MustTempFile() string { func mockHashPassword(password string) ([]byte, error) { return []byte(password), nil } + +// assertRaftPeerNodes counts the number of nodes running with a local raft +// database and asserts that the count is equal to n +func assertRaftPeerNodes(t *testing.T, c *Cluster, n int) { + // Ensure we have the required number of raft nodes + raftCount := 0 + for _, s := range c.Stores { + if _, err := os.Stat(filepath.Join(s.Path(), "raft.db")); err == nil { + raftCount += 1 + } + } + + if raftCount != n { + t.Errorf("raft nodes mismatch: got %v, exp %v", raftCount, n) + } +} + +// assertDatabaseReplicated creates a new database named after each node and +// then verifies that each node can see all the created databases from their +// local meta data +func assertDatabaseReplicated(t *testing.T, c *Cluster) { + // Add a database to each node. + for i, s := range c.Stores { + if di, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil { + t.Fatal(err) + } else if di == nil { + t.Fatal("expected database") + } + } + + // Verify that each store has all databases. + for i := 0; i < len(c.Stores); i++ { + for _, s := range c.Stores { + if di, err := s.Database(fmt.Sprintf("db%d", i)); err != nil { + t.Fatal(err) + } else if di == nil { + t.Fatal("expected database") + } + } + } +} From 06d8ff7c1300c9d2011f79b7d1ad2cd292ba27af Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 15:02:27 -0600 Subject: [PATCH 4/9] Use config.Peers when passing -join flag Removes the two separate variables in the meta.Config. -join will now override the Peers var. --- cmd/influxd/run/command.go | 3 ++- meta/config.go | 3 --- meta/rpc.go | 2 ++ meta/store.go | 10 ++++------ meta/store_test.go | 2 +- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index b424e392d18..7961f9dd965 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -10,6 +10,7 @@ import ( "path/filepath" "runtime" "strconv" + "strings" "github.com/BurntSushi/toml" ) @@ -83,7 +84,7 @@ func (cmd *Command) Run(args ...string) error { } if options.Join != "" { - config.Meta.Join = options.Join + config.Meta.Peers = strings.Split(options.Join, ",") } // Validate the configuration. diff --git a/meta/config.go b/meta/config.go index 7fd0ab07917..310d601bdd8 100644 --- a/meta/config.go +++ b/meta/config.go @@ -38,9 +38,6 @@ type Config struct { LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"` CommitTimeout toml.Duration `toml:"commit-timeout"` ClusterTracing bool `toml:"cluster-tracing"` - - // The join command-line argument - Join string `toml:"-"` } func NewConfig() *Config { diff --git a/meta/rpc.go b/meta/rpc.go index c8c01165dfd..d52c0b0a74f 100644 --- a/meta/rpc.go +++ b/meta/rpc.go @@ -395,11 +395,13 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { // Should always have a size and type if exp := 16; len(data) < exp { + r.traceCluster("recv: %v", string(data)) return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data), exp) } sz := btou64(data[0:8]) if len(data[8:]) != int(sz) { + r.traceCluster("recv: %v", string(data)) return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data[8:]), sz) } diff --git a/meta/store.go b/meta/store.go index 5abd072dd33..ef8646c4661 100644 --- a/meta/store.go +++ b/meta/store.go @@ -66,7 +66,6 @@ type Store struct { // All peers in cluster. Used during bootstrapping. peers []string - join string data *Data @@ -131,7 +130,6 @@ func NewStore(c *Config) *Store { s := &Store{ path: c.Dir, peers: c.Peers, - join: c.Join, data: &Data{}, ready: make(chan struct{}), @@ -277,8 +275,9 @@ func (s *Store) loadState() error { } func (s *Store) joinCluster() error { + // No join options, so nothing to do - if s.join == "" { + if len(s.peers) == 0 { return nil } @@ -290,9 +289,9 @@ func (s *Store) joinCluster() error { return nil } - s.Logger.Printf("joining cluster at: %v", s.join) + s.Logger.Printf("joining cluster at: %v", s.peers) for { - for _, join := range strings.Split(s.join, ",") { + for _, join := range s.peers { res, err := s.rpc.join(s.Addr.String(), join) if err != nil { s.Logger.Printf("join failed: %v", err) @@ -663,7 +662,6 @@ func (s *Store) handleExecConn(conn net.Conn) { // This function runs in a separate goroutine. func (s *Store) serveRPCListener() { defer s.wg.Done() - <-s.ready for { // Accept next TCP connection. diff --git a/meta/store_test.go b/meta/store_test.go index 13ffcabe8b4..b07306617a9 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -1025,7 +1025,7 @@ func MustOpenCluster(n int) *Cluster { func (c *Cluster) Join() error { config := NewConfig(filepath.Join(c.path, strconv.Itoa(len(c.Stores)))) - config.Join = c.Stores[0].Addr.String() + config.Peers = []string{c.Stores[0].Addr.String()} s := NewStore(config) if err := s.Open(); err != nil { return err From f5d86b95b3c097ed3c4336e98afebd9e670137f8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 15:13:03 -0600 Subject: [PATCH 5/9] Add raft column to show servers statement Reports whether the not is part of the raft consensus cluster or not. --- meta/statement_executor.go | 10 ++++++++-- meta/statement_executor_test.go | 14 +++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 58f86393ca4..d3a4681b400 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -10,6 +10,7 @@ import ( type StatementExecutor struct { Store interface { Nodes() ([]NodeInfo, error) + Peers() ([]string, error) Database(name string) (*DatabaseInfo, error) Databases() ([]DatabaseInfo, error) @@ -127,9 +128,14 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS return &influxql.Result{Err: err} } - row := &influxql.Row{Columns: []string{"id", "url"}} + peers, err := e.Store.Peers() + if err != nil { + return &influxql.Result{Err: err} + } + + row := &influxql.Row{Columns: []string{"id", "url", "raft"}} for _, ni := range nis { - row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host}) + row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host, contains(peers, ni.Host)}) } return &influxql.Result{Series: []*influxql.Row{row}} } diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index b382a09f685..99fcea36e2b 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -121,15 +121,18 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) { {ID: 2, Host: "node1"}, }, nil } + e.Store.PeersFn = func() ([]string, error) { + return []string{"node0"}, nil + } if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil { t.Fatal(res.Err) } else if !reflect.DeepEqual(res.Series, influxql.Rows{ { - Columns: []string{"id", "url"}, + Columns: []string{"id", "url", "raft"}, Values: [][]interface{}{ - {uint64(1), "http://node0"}, - {uint64(2), "http://node1"}, + {uint64(1), "http://node0", true}, + {uint64(2), "http://node1", false}, }, }, }) { @@ -778,6 +781,7 @@ func NewStatementExecutor() *StatementExecutor { // StatementExecutorStore represents a mock implementation of StatementExecutor.Store. type StatementExecutorStore struct { NodesFn func() ([]meta.NodeInfo, error) + PeersFn func() ([]string, error) DatabaseFn func(name string) (*meta.DatabaseInfo, error) DatabasesFn func() ([]meta.DatabaseInfo, error) CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) @@ -804,6 +808,10 @@ func (s *StatementExecutorStore) Nodes() ([]meta.NodeInfo, error) { return s.NodesFn() } +func (s *StatementExecutorStore) Peers() ([]string, error) { + return s.PeersFn() +} + func (s *StatementExecutorStore) Database(name string) (*meta.DatabaseInfo, error) { return s.DatabaseFn(name) } From 514f36cf540955d2d81738e91216826360b1a97e Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 15:24:49 -0600 Subject: [PATCH 6/9] Exit report goroutine if server is closing --- cmd/influxd/run/server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f9c17ba6a5d..50c12fb72a3 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -373,6 +373,11 @@ func (s *Server) Close() error { // startServerReporting starts periodic server reporting. func (s *Server) startServerReporting() { for { + select { + case <-s.closing: + return + default: + } if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil { log.Printf("no leader available for reporting: %s", err.Error()) continue From 95c98d1ab7bb8eb06811d10f1eda3257dc8aad9c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 15:30:22 -0600 Subject: [PATCH 7/9] Fix data race in WaitForDataChanged --- meta/store.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/meta/store.go b/meta/store.go index ef8646c4661..0eea28ef8ec 100644 --- a/meta/store.go +++ b/meta/store.go @@ -383,11 +383,15 @@ func (s *Store) Close() error { // WaitForDataChanged will block the current goroutine until the metastore index has // be updated. func (s *Store) WaitForDataChanged() error { + s.mu.RLock() + changed := s.changed + s.mu.RUnlock() + for { select { case <-s.closing: return errors.New("closing") - case <-s.changed: + case <-changed: return nil } } From b6c95925fb68be48da368e528e8fa3809720dbe8 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 15:35:02 -0600 Subject: [PATCH 8/9] Add retry delay for report service loop when error occurs There is a race when stopping servers where the meta.Store is closing but the server has not signaled it is closing so the reporting goroutine repeeatedly errors out in fast loop during this time. It creates a lot of noise in the logs. --- cmd/influxd/run/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 50c12fb72a3..59c5ba9bc91 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -380,6 +380,7 @@ func (s *Server) startServerReporting() { } if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil { log.Printf("no leader available for reporting: %s", err.Error()) + time.Sleep(time.Second) continue } s.reportServer() From c12b556e5b89645b5f7588652b597a86ebe9d137 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 27 Jul 2015 15:55:07 -0600 Subject: [PATCH 9/9] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 099f49e9855..6d229a68c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [#3376](https://github.com/influxdb/influxdb/pull/3376): Support for remote shard query mapping - [#3372](https://github.com/influxdb/influxdb/pull/3372): Support joining nodes to existing cluster - [#3426](https://github.com/influxdb/influxdb/pull/3426): Additional logging for continuous queries. Thanks @jhorwit2 +- [#3478](https://github.com/influxdb/influxdb/pull/3478)): Support incremental cluster joins ### Bugfixes - [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2