Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support incremental cluster joins #3478

Merged
merged 9 commits into from
Jul 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [#3376](https://github.com/influxdb/influxdb/pull/3376): Support for remote shard query mapping
- [#3372](https://github.com/influxdb/influxdb/pull/3372): Support joining nodes to existing cluster
- [#3426](https://github.com/influxdb/influxdb/pull/3426): Additional logging for continuous queries. Thanks @jhorwit2
- [#3478](https://github.com/influxdb/influxdb/pull/3478)): Support incremental cluster joins

### Bugfixes
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
Expand Down
3 changes: 2 additions & 1 deletion cmd/influxd/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"strings"

"github.com/BurntSushi/toml"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func (cmd *Command) Run(args ...string) error {
}

if options.Join != "" {
config.Meta.Join = options.Join
config.Meta.Peers = strings.Split(options.Join, ",")
}

// Validate the configuration.
Expand Down
6 changes: 6 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,14 @@ func (s *Server) Close() error {
// startServerReporting starts periodic server reporting.
func (s *Server) startServerReporting() {
for {
select {
case <-s.closing:
return
default:
}
if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil {
log.Printf("no leader available for reporting: %s", err.Error())
time.Sleep(time.Second)
continue
}
s.reportServer()
Expand Down
3 changes: 0 additions & 3 deletions meta/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Config struct {
LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"`
CommitTimeout toml.Duration `toml:"commit-timeout"`
ClusterTracing bool `toml:"cluster-tracing"`

// The join command-line argument
Join string `toml:"-"`
}

func NewConfig() *Config {
Expand Down
51 changes: 33 additions & 18 deletions meta/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/influxdb/influxdb/meta/internal"
)

Expand All @@ -29,7 +30,7 @@ type rpc struct {
cachedData() *Data
IsLeader() bool
Leader() string
Peers() []string
Peers() ([]string, error)
AddPeer(host string) error
CreateNode(host string) (*NodeInfo, error)
NodeByHost(host string) (*NodeInfo, error)
Expand Down Expand Up @@ -215,26 +216,33 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
r.traceCluster("join request from: %v", *req.Addr)

node, err := func() (*NodeInfo, error) {

// attempt to create the node
node, err := r.store.CreateNode(*req.Addr)
// if it exists, return the existing node
if err == ErrNodeExists {
return r.store.NodeByHost(*req.Addr)
node, err = r.store.NodeByHost(*req.Addr)
if err != nil {
return node, err
}
r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host)
} else if err != nil {
return nil, fmt.Errorf("create node: %v", err)
}

// FIXME: jwilder: adding raft nodes is tricky since going
// from 1 node (leader) to two kills the cluster because
// quorum is lost after adding the second node. For now,
// can only add non-raft enabled nodes

// If we have less than 3 nodes, add them as raft peers
// if len(r.store.Peers()) < MaxRaftNodes {
// if err = r.store.AddPeer(*req.Addr); err != nil {
// return node, fmt.Errorf("add peer: %v", err)
// }
// }
peers, err := r.store.Peers()
if err != nil {
return nil, fmt.Errorf("list peers: %v", err)
}

// If we have less than 3 nodes, add them as raft peers if they are not
// already a peer
if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) {
r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr)
if err = r.store.AddPeer(*req.Addr); err != nil {
return node, fmt.Errorf("add peer: %v", err)
}
}
return node, err
}()

Expand All @@ -247,13 +255,18 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
return nil, err
}

// get the current raft peers
peers, err := r.store.Peers()
if err != nil {
return nil, fmt.Errorf("list peers: %v", err)
}

return &internal.JoinResponse{
Header: &internal.ResponseHeader{
OK: proto.Bool(true),
},
//EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)),
EnableRaft: proto.Bool(false),
RaftNodes: r.store.Peers(),
EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)),
RaftNodes: peers,
NodeID: proto.Uint64(nodeID),
}, err

Expand Down Expand Up @@ -355,7 +368,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
// Create a connection to the leader.
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
if err != nil {
return nil, err
return nil, fmt.Errorf("rpc dial: %v", err)
}
defer conn.Close()

Expand All @@ -382,11 +395,13 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {

// Should always have a size and type
if exp := 16; len(data) < exp {
r.traceCluster("recv: %v", string(data))
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data), exp)
}

sz := btou64(data[0:8])
if len(data[8:]) != int(sz) {
r.traceCluster("recv: %v", string(data))
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data[8:]), sz)
}

Expand Down Expand Up @@ -421,7 +436,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {

func (r *rpc) traceCluster(msg string, args ...interface{}) {
if r.tracingEnabled {
r.logger.Printf("rpc error: "+msg, args...)
r.logger.Printf("rpc: "+msg, args...)
}
}

Expand Down
4 changes: 2 additions & 2 deletions meta/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestRPCJoin(t *testing.T) {
t.Fatalf("failed to join: %v", err)
}

if exp := false; res.RaftEnabled != false {
if exp := true; res.RaftEnabled != true {
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func (f *fakeStore) cachedData() *Data {

func (f *fakeStore) IsLeader() bool { return true }
func (f *fakeStore) Leader() string { return f.leader }
func (f *fakeStore) Peers() []string { return []string{f.leader} }
func (f *fakeStore) Peers() ([]string, error) { return []string{f.leader}, nil }
func (f *fakeStore) AddPeer(host string) error { return nil }
func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) {
return &NodeInfo{ID: f.newNodeID, Host: host}, nil
Expand Down
77 changes: 68 additions & 9 deletions meta/state.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package meta

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
Expand All @@ -17,13 +20,15 @@ import (
// across local or remote nodes. It is a form of the state design pattern and allows
// the meta.Store to change its behavior with the raft layer at runtime.
type raftState interface {
openRaft() error
open() error
remove() error
initialize() error
leader() string
isLeader() bool
sync(index uint64, timeout time.Duration) error
setPeers(addrs []string) error
addPeer(addr string) error
peers() ([]string, error)
invalidate() error
close() error
lastIndex() uint64
Expand All @@ -42,6 +47,19 @@ type localRaft struct {
raftLayer *raftLayer
}

func (r *localRaft) remove() error {
if err := os.RemoveAll(filepath.Join(r.store.path, "raft.db")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(r.store.path, "peers.json")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(r.store.path, "snapshots")); err != nil {
return err
}
return nil
}

func (r *localRaft) updateMetaData(ms *Data) {
if ms == nil {
return
Expand Down Expand Up @@ -76,7 +94,7 @@ func (r *localRaft) invalidate() error {
return nil
}

func (r *localRaft) openRaft() error {
func (r *localRaft) open() error {
s := r.store
// Setup raft configuration.
config := raft.DefaultConfig()
Expand All @@ -89,11 +107,6 @@ func (r *localRaft) openRaft() error {
// If no peers are set in the config then start as a single server.
config.EnableSingleNode = (len(s.peers) == 0)

// Ensure our addr is in the peer list
if config.EnableSingleNode {
s.peers = append(s.peers, s.Addr.String())
}

// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)

Expand Down Expand Up @@ -246,6 +259,10 @@ func (r *localRaft) setPeers(addrs []string) error {
return r.raft.SetPeers(a).Error()
}

func (r *localRaft) peers() ([]string, error) {
return r.peerStore.Peers()
}

func (r *localRaft) leader() string {
if r.raft == nil {
return ""
Expand All @@ -269,6 +286,10 @@ type remoteRaft struct {
store *Store
}

func (r *remoteRaft) remove() error {
return nil
}

func (r *remoteRaft) updateMetaData(ms *Data) {
if ms == nil {
return
Expand Down Expand Up @@ -300,15 +321,31 @@ func (r *remoteRaft) invalidate() error {
}

func (r *remoteRaft) setPeers(addrs []string) error {
return nil
// Convert to JSON
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(addrs); err != nil {
return err
}

// Write out as JSON
return ioutil.WriteFile(filepath.Join(r.store.path, "peers.json"), buf.Bytes(), 0755)
}

// addPeer adds addr to the list of peers in the cluster.
func (r *remoteRaft) addPeer(addr string) error {
return fmt.Errorf("cannot add peer using remote raft")
}

func (r *remoteRaft) openRaft() error {
func (r *remoteRaft) peers() ([]string, error) {
return readPeersJSON(filepath.Join(r.store.path, "peers.json"))
}

func (r *remoteRaft) open() error {
if err := r.setPeers(r.store.peers); err != nil {
return err
}

go func() {
for {
select {
Expand Down Expand Up @@ -366,3 +403,25 @@ func (r *remoteRaft) sync(index uint64, timeout time.Duration) error {
func (r *remoteRaft) snapshot() error {
return fmt.Errorf("cannot snapshot while in remote raft state")
}

func readPeersJSON(path string) ([]string, error) {
// Read the file
buf, err := ioutil.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}

// Check for no peers
if len(buf) == 0 {
return nil, nil
}

// Decode the peers
var peers []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peers); err != nil {
return nil, err
}

return peers, nil
}
10 changes: 8 additions & 2 deletions meta/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type StatementExecutor struct {
Store interface {
Nodes() ([]NodeInfo, error)
Peers() ([]string, error)

Database(name string) (*DatabaseInfo, error)
Databases() ([]DatabaseInfo, error)
Expand Down Expand Up @@ -127,9 +128,14 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS
return &influxql.Result{Err: err}
}

row := &influxql.Row{Columns: []string{"id", "url"}}
peers, err := e.Store.Peers()
if err != nil {
return &influxql.Result{Err: err}
}

row := &influxql.Row{Columns: []string{"id", "url", "raft"}}
for _, ni := range nis {
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host})
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host, contains(peers, ni.Host)})
}
return &influxql.Result{Series: []*influxql.Row{row}}
}
Expand Down
Loading