Skip to content

Commit

Permalink
Code review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jwilder committed Jul 20, 2015
1 parent 52cceb6 commit 8dfe86f
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 22 deletions.
6 changes: 3 additions & 3 deletions meta/internal/meta.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions meta/internal/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ message Command {
UpdateUserCommand = 15;
SetPrivilegeCommand = 16;
SetDataCommand = 17;
SetAdminPrivilegeCommand = 18;
SetAdminPrivilegeCommand = 18;
}

required Type type = 1;
Expand Down Expand Up @@ -301,9 +301,10 @@ message JoinResponse {

// Indicates that this node should take part in the raft cluster.
optional bool EnableRaft = 2;

// The addresses of raft peers to use if joining as a raft member. If not joining
// as a raft member, these are the nodes running raft.
repeated string Peers = 3;
repeated string RaftNodes = 3;

// The node ID assigned to the requesting node.
optional uint64 NodeID = 4;
Expand Down
16 changes: 9 additions & 7 deletions meta/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
)

// Max size of a message before we treat the size as invalid
const MaxMessageSize = 1024 * 1024 * 1024
const (
MaxMessageSize = 1024 * 1024 * 1024
leaderDialTimeout = 10 * time.Second
)

// RPC handles request/response style messaging between cluster nodes
type RPC struct {
Expand All @@ -36,7 +39,7 @@ type RPC struct {

type JoinResult struct {
RaftEnabled bool
Peers []string
RaftNodes []string
NodeID uint64
}

Expand All @@ -51,7 +54,7 @@ func (r *RPC) proxyLeader(conn *net.TCPConn) {
return
}

leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), 10*time.Second)
leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), leaderDialTimeout)
if err != nil {
r.sendError(conn, fmt.Sprintf("dial leader: %v", err))
return
Expand Down Expand Up @@ -250,7 +253,7 @@ func (r *RPC) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
},
//EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)),
EnableRaft: proto.Bool(false),
Peers: r.store.Peers(),
RaftNodes: r.store.Peers(),
NodeID: proto.Uint64(nodeID),
}, err

Expand Down Expand Up @@ -325,7 +328,7 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) {
case *internal.JoinResponse:
return &JoinResult{
RaftEnabled: t.GetEnableRaft(),
Peers: t.GetPeers(),
RaftNodes: t.GetRaftNodes(),
NodeID: t.GetNodeID(),
}, nil
case *internal.ErrorResponse:
Expand All @@ -338,7 +341,6 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) {
// call sends an encoded request to the remote leader and returns
// an encoded response value.
func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) {

// Determine type of request
var rpcType internal.RPCType
switch t := req.(type) {
Expand All @@ -351,7 +353,7 @@ func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) {
}

// Create a connection to the leader.
conn, err := net.DialTimeout("tcp", dest, 10*time.Second)
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions meta/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ func TestRPCJoin(t *testing.T) {
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
}

if exp := 1; len(res.Peers) != exp {
t.Fatalf("raft peer mismatch: got %v, exp %v", len(res.Peers), exp)
if exp := 1; len(res.RaftNodes) != exp {
t.Fatalf("raft peer mismatch: got %v, exp %v", len(res.RaftNodes), exp)
}

if exp := "1.2.3.4:1234"; res.Peers[0] != exp {
t.Fatalf("raft peer mismatch: got %v, exp %v", res.Peers[0], exp)
if exp := "1.2.3.4:1234"; res.RaftNodes[0] != exp {
t.Fatalf("raft peer mismatch: got %v, exp %v", res.RaftNodes[0], exp)
}

if exp := uint64(100); res.NodeID != exp {
Expand Down
4 changes: 2 additions & 2 deletions meta/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// raftState abstracts the interaction of the raft consensus layer
// across local or remote nodes. It is a form of the state design pattern and allows
// the meta.Store to change how its behavior with the raft layer at runtime.
// the meta.Store to change its behavior with the raft layer at runtime.
type raftState interface {
openRaft() error
initialize() error
Expand All @@ -31,7 +31,7 @@ type raftState interface {
snapshot() error
}

// localRaft is a consensus strategy that uses a local raft implementation fo
// localRaft is a consensus strategy that uses a local raft implementation for
// consensus operations.
type localRaft struct {
store *Store
Expand Down
7 changes: 3 additions & 4 deletions meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ func (s *Store) Open() error {
joined = true

s.Logger.Printf("joined remote node %v", join)
s.Logger.Printf("raftEnabled=%v peers=%v", res.RaftEnabled, res.Peers)
s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes)

s.peers = res.Peers
s.peers = res.RaftNodes
s.id = res.NodeID

if err := s.writeNodeID(res.NodeID); err != nil {
Expand Down Expand Up @@ -573,9 +573,8 @@ func (s *Store) handleExecConn(conn net.Conn) {
// serveRPCListener processes remote exec connections.
// This function runs in a separate goroutine.
func (s *Store) serveRPCListener() {
<-s.ready

defer s.wg.Done()
<-s.ready

for {
// Accept next TCP connection.
Expand Down

0 comments on commit 8dfe86f

Please sign in to comment.