Skip to content

Commit

Permalink
Make meta RPC private
Browse files Browse the repository at this point in the history
  • Loading branch information
jwilder committed Jul 23, 2015
1 parent aa520d4 commit 2263eee
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 30 deletions.
34 changes: 17 additions & 17 deletions meta/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ const (
leaderDialTimeout = 10 * time.Second
)

// RPC handles request/response style messaging between cluster nodes
type RPC struct {
Logger *log.Logger
// rpc handles request/response style messaging between cluster nodes
type rpc struct {
logger *log.Logger
tracingEnabled bool

store interface {
Expand All @@ -48,7 +48,7 @@ type Reply interface {
}

// proxyLeader proxies the connection to the current raft leader
func (r *RPC) proxyLeader(conn *net.TCPConn) {
func (r *rpc) proxyLeader(conn *net.TCPConn) {
if r.store.Leader() == "" {
r.sendError(conn, "no leader")
return
Expand All @@ -68,7 +68,7 @@ func (r *RPC) proxyLeader(conn *net.TCPConn) {
}

// handleRPCConn reads a command from the connection and executes it.
func (r *RPC) handleRPCConn(conn net.Conn) {
func (r *rpc) handleRPCConn(conn net.Conn) {
defer conn.Close()
// RPC connections should execute on the leader. If we are not the leader,
// proxy the connection to the leader so that clients an connect to any node
Expand Down Expand Up @@ -148,21 +148,21 @@ func (r *RPC) handleRPCConn(conn net.Conn) {
r.sendResponse(conn, typ, resp)
}

func (r *RPC) sendResponse(conn net.Conn, typ internal.RPCType, resp proto.Message) {
func (r *rpc) sendResponse(conn net.Conn, typ internal.RPCType, resp proto.Message) {
// Marshal the response back to a protobuf
buf, err := proto.Marshal(resp)
if err != nil {
r.Logger.Printf("unable to marshal response: %v", err)
r.logger.Printf("unable to marshal response: %v", err)
return
}

// Encode response back to connection.
if _, err := conn.Write(r.pack(typ, buf)); err != nil {
r.Logger.Printf("unable to write rpc response: %s", err)
r.logger.Printf("unable to write rpc response: %s", err)
}
}

func (r *RPC) sendError(conn net.Conn, msg string) {
func (r *rpc) sendError(conn net.Conn, msg string) {
r.traceCluster(msg)
resp := &internal.ErrorResponse{
Header: &internal.ResponseHeader{
Expand All @@ -175,7 +175,7 @@ func (r *RPC) sendError(conn net.Conn, msg string) {
}

// handleFetchData handles a request for the current nodes meta data
func (r *RPC) handleFetchData(req *internal.FetchDataRequest) (*internal.FetchDataResponse, error) {
func (r *rpc) handleFetchData(req *internal.FetchDataRequest) (*internal.FetchDataResponse, error) {
var (
b []byte
data *Data
Expand Down Expand Up @@ -211,7 +211,7 @@ func (r *RPC) handleFetchData(req *internal.FetchDataRequest) (*internal.FetchDa
}

// handleJoinRequest handles a request to join the cluster
func (r *RPC) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinResponse, error) {
func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinResponse, error) {
r.traceCluster("join request from: %v", *req.Addr)

node, err := func() (*NodeInfo, error) {
Expand Down Expand Up @@ -261,7 +261,7 @@ func (r *RPC) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon

// pack returns a TLV style byte slice encoding the size of the payload, the RPC type
// and the RPC data
func (r *RPC) pack(typ internal.RPCType, b []byte) []byte {
func (r *rpc) pack(typ internal.RPCType, b []byte) []byte {
buf := u64tob(uint64(len(b)) + 8)
buf = append(buf, u64tob(uint64(typ))...)
buf = append(buf, b...)
Expand All @@ -270,7 +270,7 @@ func (r *RPC) pack(typ internal.RPCType, b []byte) []byte {

// fetchMetaData returns the latest copy of the meta store data from the current
// leader.
func (r *RPC) fetchMetaData(blocking bool) (*Data, error) {
func (r *rpc) fetchMetaData(blocking bool) (*Data, error) {
assert(r.store != nil, "store is nil")

// Retrieve the current known leader.
Expand Down Expand Up @@ -314,7 +314,7 @@ func (r *RPC) fetchMetaData(blocking bool) (*Data, error) {

// join attempts to join a cluster at remoteAddr using localAddr as the current
// node's cluster address
func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) {
func (r *rpc) join(localAddr, remoteAddr string) (*JoinResult, error) {
req := &internal.JoinRequest{
Addr: proto.String(localAddr),
}
Expand All @@ -340,7 +340,7 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) {

// call sends an encoded request to the remote leader and returns
// an encoded response value.
func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) {
func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
// Determine type of request
var rpcType internal.RPCType
switch t := req.(type) {
Expand Down Expand Up @@ -419,9 +419,9 @@ func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) {
return resp, nil
}

func (r *RPC) traceCluster(msg string, args ...interface{}) {
func (r *rpc) traceCluster(msg string, args ...interface{}) {
if r.tracingEnabled {
r.Logger.Printf("rpc error: "+msg, args...)
r.logger.Printf("rpc error: "+msg, args...)
}
}

Expand Down
20 changes: 10 additions & 10 deletions meta/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestRPCFetchData(t *testing.T) {

serverRPC := &RPC{
serverRPC := &rpc{
store: &fakeStore{
md: &Data{Index: 99},
},
Expand All @@ -22,7 +22,7 @@ func TestRPCFetchData(t *testing.T) {
<-srv.Ready

// create a new RPC with no existing meta.Data cache
clientRPC := &RPC{
clientRPC := &rpc{
store: &fakeStore{
leader: srv.Listener.Addr().String(),
},
Expand All @@ -44,7 +44,7 @@ func TestRPCFetchData(t *testing.T) {
}

func TestRPCFetchDataMatchesLeader(t *testing.T) {
serverRPC := &RPC{
serverRPC := &rpc{
store: &fakeStore{
md: &Data{Index: 99},
},
Expand All @@ -58,7 +58,7 @@ func TestRPCFetchDataMatchesLeader(t *testing.T) {
<-srv.Ready

// create a new RPC with a matching index as the server
clientRPC := &RPC{
clientRPC := &rpc{
store: &fakeStore{
leader: srv.Listener.Addr().String(),
md: &Data{Index: 99},
Expand All @@ -81,7 +81,7 @@ func TestRPCFetchDataMatchesBlocking(t *testing.T) {
md: &Data{Index: 99},
blockChan: make(chan struct{}),
}
serverRPC := &RPC{
serverRPC := &rpc{
store: fs,
}

Expand All @@ -93,7 +93,7 @@ func TestRPCFetchDataMatchesBlocking(t *testing.T) {
<-srv.Ready

// create a new RPC with a matching index as the server
clientRPC := &RPC{
clientRPC := &rpc{
store: &fakeStore{
leader: srv.Listener.Addr().String(),
md: &Data{Index: 99},
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestRPCJoin(t *testing.T) {
newNodeID: uint64(100),
blockChan: make(chan struct{}),
}
serverRPC := &RPC{
serverRPC := &rpc{
store: fs,
}

Expand All @@ -147,7 +147,7 @@ func TestRPCJoin(t *testing.T) {
<-srv.Ready

// create a new RPC with a matching index as the server
clientRPC := &RPC{
clientRPC := &rpc{
store: &fakeStore{
leader: srv.Listener.Addr().String(),
md: &Data{Index: 99},
Expand Down Expand Up @@ -187,11 +187,11 @@ type fakeStore struct {
type testServer struct {
Listener net.Listener
Ready chan struct{}
rpc *RPC
rpc *rpc
t *testing.T
}

func newTestServer(t *testing.T, rpc *RPC) *testServer {
func newTestServer(t *testing.T, rpc *rpc) *testServer {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Store struct {

data *Data

rpc *RPC
rpc *rpc

remoteAddr net.Addr

Expand Down Expand Up @@ -154,10 +154,10 @@ func NewStore(c *Config) *Store {
}

s.raftState = &localRaft{store: s}
s.rpc = &RPC{
s.rpc = &rpc{
store: s,
tracingEnabled: c.ClusterTracing,
Logger: s.Logger,
logger: s.Logger,
}
return s
}
Expand Down

0 comments on commit 2263eee

Please sign in to comment.