Skip to content

Commit

Permalink
Merge pull request #3478 from influxdb/jw-cluster
Browse files Browse the repository at this point in the history
Support incremental cluster joins
  • Loading branch information
jwilder committed Jul 28, 2015
2 parents f994a97 + c12b556 commit 1536cd5
Show file tree
Hide file tree
Showing 11 changed files with 446 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [#3376](https://github.com/influxdb/influxdb/pull/3376): Support for remote shard query mapping
- [#3372](https://github.com/influxdb/influxdb/pull/3372): Support joining nodes to existing cluster
- [#3426](https://github.com/influxdb/influxdb/pull/3426): Additional logging for continuous queries. Thanks @jhorwit2
- [#3478](https://github.com/influxdb/influxdb/pull/3478)): Support incremental cluster joins

### Bugfixes
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
Expand Down
3 changes: 2 additions & 1 deletion cmd/influxd/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"strings"

"github.com/BurntSushi/toml"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func (cmd *Command) Run(args ...string) error {
}

if options.Join != "" {
config.Meta.Join = options.Join
config.Meta.Peers = strings.Split(options.Join, ",")
}

// Validate the configuration.
Expand Down
6 changes: 6 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,14 @@ func (s *Server) Close() error {
// startServerReporting starts periodic server reporting.
func (s *Server) startServerReporting() {
for {
select {
case <-s.closing:
return
default:
}
if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil {
log.Printf("no leader available for reporting: %s", err.Error())
time.Sleep(time.Second)
continue
}
s.reportServer()
Expand Down
3 changes: 0 additions & 3 deletions meta/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Config struct {
LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"`
CommitTimeout toml.Duration `toml:"commit-timeout"`
ClusterTracing bool `toml:"cluster-tracing"`

// The join command-line argument
Join string `toml:"-"`
}

func NewConfig() *Config {
Expand Down
51 changes: 33 additions & 18 deletions meta/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/influxdb/influxdb/meta/internal"
)

Expand All @@ -29,7 +30,7 @@ type rpc struct {
cachedData() *Data
IsLeader() bool
Leader() string
Peers() []string
Peers() ([]string, error)
AddPeer(host string) error
CreateNode(host string) (*NodeInfo, error)
NodeByHost(host string) (*NodeInfo, error)
Expand Down Expand Up @@ -215,26 +216,33 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
r.traceCluster("join request from: %v", *req.Addr)

node, err := func() (*NodeInfo, error) {

// attempt to create the node
node, err := r.store.CreateNode(*req.Addr)
// if it exists, return the existing node
if err == ErrNodeExists {
return r.store.NodeByHost(*req.Addr)
node, err = r.store.NodeByHost(*req.Addr)
if err != nil {
return node, err
}
r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host)
} else if err != nil {
return nil, fmt.Errorf("create node: %v", err)
}

// FIXME: jwilder: adding raft nodes is tricky since going
// from 1 node (leader) to two kills the cluster because
// quorum is lost after adding the second node. For now,
// can only add non-raft enabled nodes

// If we have less than 3 nodes, add them as raft peers
// if len(r.store.Peers()) < MaxRaftNodes {
// if err = r.store.AddPeer(*req.Addr); err != nil {
// return node, fmt.Errorf("add peer: %v", err)
// }
// }
peers, err := r.store.Peers()
if err != nil {
return nil, fmt.Errorf("list peers: %v", err)
}

// If we have less than 3 nodes, add them as raft peers if they are not
// already a peer
if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) {
r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr)
if err = r.store.AddPeer(*req.Addr); err != nil {
return node, fmt.Errorf("add peer: %v", err)
}
}
return node, err
}()

Expand All @@ -247,13 +255,18 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
return nil, err
}

// get the current raft peers
peers, err := r.store.Peers()
if err != nil {
return nil, fmt.Errorf("list peers: %v", err)
}

return &internal.JoinResponse{
Header: &internal.ResponseHeader{
OK: proto.Bool(true),
},
//EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)),
EnableRaft: proto.Bool(false),
RaftNodes: r.store.Peers(),
EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)),
RaftNodes: peers,
NodeID: proto.Uint64(nodeID),
}, err

Expand Down Expand Up @@ -355,7 +368,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
// Create a connection to the leader.
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
if err != nil {
return nil, err
return nil, fmt.Errorf("rpc dial: %v", err)
}
defer conn.Close()

Expand All @@ -382,11 +395,13 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {

// Should always have a size and type
if exp := 16; len(data) < exp {
r.traceCluster("recv: %v", string(data))
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data), exp)
}

sz := btou64(data[0:8])
if len(data[8:]) != int(sz) {
r.traceCluster("recv: %v", string(data))
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data[8:]), sz)
}

Expand Down Expand Up @@ -421,7 +436,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {

func (r *rpc) traceCluster(msg string, args ...interface{}) {
if r.tracingEnabled {
r.logger.Printf("rpc error: "+msg, args...)
r.logger.Printf("rpc: "+msg, args...)
}
}

Expand Down
4 changes: 2 additions & 2 deletions meta/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestRPCJoin(t *testing.T) {
t.Fatalf("failed to join: %v", err)
}

if exp := false; res.RaftEnabled != false {
if exp := true; res.RaftEnabled != true {
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func (f *fakeStore) cachedData() *Data {

func (f *fakeStore) IsLeader() bool { return true }
func (f *fakeStore) Leader() string { return f.leader }
func (f *fakeStore) Peers() []string { return []string{f.leader} }
func (f *fakeStore) Peers() ([]string, error) { return []string{f.leader}, nil }
func (f *fakeStore) AddPeer(host string) error { return nil }
func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) {
return &NodeInfo{ID: f.newNodeID, Host: host}, nil
Expand Down
77 changes: 68 additions & 9 deletions meta/state.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package meta

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
Expand All @@ -17,13 +20,15 @@ import (
// across local or remote nodes. It is a form of the state design pattern and allows
// the meta.Store to change its behavior with the raft layer at runtime.
type raftState interface {
openRaft() error
open() error
remove() error
initialize() error
leader() string
isLeader() bool
sync(index uint64, timeout time.Duration) error
setPeers(addrs []string) error
addPeer(addr string) error
peers() ([]string, error)
invalidate() error
close() error
lastIndex() uint64
Expand All @@ -42,6 +47,19 @@ type localRaft struct {
raftLayer *raftLayer
}

func (r *localRaft) remove() error {
if err := os.RemoveAll(filepath.Join(r.store.path, "raft.db")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(r.store.path, "peers.json")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(r.store.path, "snapshots")); err != nil {
return err
}
return nil
}

func (r *localRaft) updateMetaData(ms *Data) {
if ms == nil {
return
Expand Down Expand Up @@ -76,7 +94,7 @@ func (r *localRaft) invalidate() error {
return nil
}

func (r *localRaft) openRaft() error {
func (r *localRaft) open() error {
s := r.store
// Setup raft configuration.
config := raft.DefaultConfig()
Expand All @@ -89,11 +107,6 @@ func (r *localRaft) openRaft() error {
// If no peers are set in the config then start as a single server.
config.EnableSingleNode = (len(s.peers) == 0)

// Ensure our addr is in the peer list
if config.EnableSingleNode {
s.peers = append(s.peers, s.Addr.String())
}

// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)

Expand Down Expand Up @@ -246,6 +259,10 @@ func (r *localRaft) setPeers(addrs []string) error {
return r.raft.SetPeers(a).Error()
}

func (r *localRaft) peers() ([]string, error) {
return r.peerStore.Peers()
}

func (r *localRaft) leader() string {
if r.raft == nil {
return ""
Expand All @@ -269,6 +286,10 @@ type remoteRaft struct {
store *Store
}

func (r *remoteRaft) remove() error {
return nil
}

func (r *remoteRaft) updateMetaData(ms *Data) {
if ms == nil {
return
Expand Down Expand Up @@ -300,15 +321,31 @@ func (r *remoteRaft) invalidate() error {
}

func (r *remoteRaft) setPeers(addrs []string) error {
return nil
// Convert to JSON
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(addrs); err != nil {
return err
}

// Write out as JSON
return ioutil.WriteFile(filepath.Join(r.store.path, "peers.json"), buf.Bytes(), 0755)
}

// addPeer adds addr to the list of peers in the cluster.
func (r *remoteRaft) addPeer(addr string) error {
return fmt.Errorf("cannot add peer using remote raft")
}

func (r *remoteRaft) openRaft() error {
func (r *remoteRaft) peers() ([]string, error) {
return readPeersJSON(filepath.Join(r.store.path, "peers.json"))
}

func (r *remoteRaft) open() error {
if err := r.setPeers(r.store.peers); err != nil {
return err
}

go func() {
for {
select {
Expand Down Expand Up @@ -366,3 +403,25 @@ func (r *remoteRaft) sync(index uint64, timeout time.Duration) error {
func (r *remoteRaft) snapshot() error {
return fmt.Errorf("cannot snapshot while in remote raft state")
}

func readPeersJSON(path string) ([]string, error) {
// Read the file
buf, err := ioutil.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}

// Check for no peers
if len(buf) == 0 {
return nil, nil
}

// Decode the peers
var peers []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peers); err != nil {
return nil, err
}

return peers, nil
}
10 changes: 8 additions & 2 deletions meta/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type StatementExecutor struct {
Store interface {
Nodes() ([]NodeInfo, error)
Peers() ([]string, error)

Database(name string) (*DatabaseInfo, error)
Databases() ([]DatabaseInfo, error)
Expand Down Expand Up @@ -127,9 +128,14 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS
return &influxql.Result{Err: err}
}

row := &influxql.Row{Columns: []string{"id", "url"}}
peers, err := e.Store.Peers()
if err != nil {
return &influxql.Result{Err: err}
}

row := &influxql.Row{Columns: []string{"id", "url", "raft"}}
for _, ni := range nis {
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host})
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host, contains(peers, ni.Host)})
}
return &influxql.Result{Series: []*influxql.Row{row}}
}
Expand Down
Loading

0 comments on commit 1536cd5

Please sign in to comment.