Skip to content

Commit

Permalink
raft: transport package
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Morozov <lk4d4math@gmail.com>
  • Loading branch information
LK4D4 committed Nov 16, 2016
1 parent 2eaae1a commit 542aad9
Show file tree
Hide file tree
Showing 7 changed files with 911 additions and 5 deletions.
2 changes: 2 additions & 0 deletions manager/state/raft/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode")
// ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change
ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
// ErrMemberRemoved is thrown when a node was removed from the cluster
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
)

// deferredConn used to store removed members connection for some time.
Expand Down
8 changes: 3 additions & 5 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ var (
ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
// ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
// ErrMemberRemoved is thrown when a node was removed from the cluster
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
// ErrNoClusterLeader is thrown when the cluster has no elected leader
ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
// ErrMemberUnknown is sent in response to a message from an
Expand Down Expand Up @@ -501,7 +499,7 @@ func (n *Node) Run(ctx context.Context) error {
// If the node was removed from other members,
// send back an error to the caller to start
// the shutdown process.
return ErrMemberRemoved
return membership.ErrMemberRemoved
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -829,7 +827,7 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
return nil, membership.ErrMemberRemoved
}

var sourceHost string
Expand Down Expand Up @@ -1246,7 +1244,7 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.

_, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
if grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() {
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
Expand Down
162 changes: 162 additions & 0 deletions manager/state/raft/transport/mock_raft_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package transport

import (
"net"
"time"

"golang.org/x/net/context"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/state/raft/membership"

"google.golang.org/grpc"
)

type snapshotReport struct {
id uint64
status raft.SnapshotStatus
}

type mockRaft struct {
lis net.Listener
s *grpc.Server
tr *Transport

cancel context.CancelFunc

nodeRemovedSignal chan struct{}

removed map[uint64]bool

processedMessages chan *raftpb.Message
processedSnapshots chan snapshotReport

reportedUnreachables chan uint64
}

func newMockRaft(ctx context.Context) (*mockRaft, error) {
l, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
mr := &mockRaft{
lis: l,
s: grpc.NewServer(),
cancel: cancel,
removed: make(map[uint64]bool),
nodeRemovedSignal: make(chan struct{}),
processedMessages: make(chan *raftpb.Message, 4096),
processedSnapshots: make(chan snapshotReport, 4096),
reportedUnreachables: make(chan uint64, 4096),
}
cfg := &Config{
SendTimeout: 2 * time.Second,
Raft: mr,
}
tr := New(ctx, cfg)
mr.tr = tr
hs := health.NewHealthServer()
hs.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
api.RegisterRaftServer(mr.s, mr)
api.RegisterHealthServer(mr.s, hs)
go mr.s.Serve(l)
return mr, nil
}

func (r *mockRaft) Addr() string {
return r.lis.Addr().String()
}

func (r *mockRaft) Stop() {
r.cancel()
r.s.Stop()
}

func (r *mockRaft) RemovePeer(id uint64) error {
r.removed[id] = true
return r.tr.RemovePeer(id)
}

func (r *mockRaft) ProcessRaftMessage(ctx context.Context, req *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if r.removed[req.Message.From] {
return nil, membership.ErrMemberRemoved
}
r.processedMessages <- req.Message
return &api.ProcessRaftMessageResponse{}, nil
}

func (r *mockRaft) ResolveAddress(ctx context.Context, req *api.ResolveAddressRequest) (*api.ResolveAddressResponse, error) {
addr, err := r.tr.GetPeerAddr(req.RaftID)
if err != nil {
return nil, err
}
return &api.ResolveAddressResponse{
Addr: addr,
}, nil
}

func (r *mockRaft) ReportUnreachable(id uint64) {
r.reportedUnreachables <- id
}

func (r *mockRaft) IsIDRemoved(id uint64) bool {
return r.removed[id]
}

func (r *mockRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
r.processedSnapshots <- snapshotReport{
id: id,
status: status,
}
}

func (r *mockRaft) NodeRemoved() {
close(r.nodeRemovedSignal)
}

type mockCluster struct {
rafts map[uint64]*mockRaft
ctx context.Context
cancel context.CancelFunc
}

func newCluster(ctx context.Context) *mockCluster {
ctx, cancel := context.WithCancel(ctx)
return &mockCluster{
rafts: make(map[uint64]*mockRaft),
ctx: ctx,
cancel: cancel,
}
}

func (c *mockCluster) Stop() {
c.cancel()
for _, r := range c.rafts {
r.s.Stop()
}
}

func (c *mockCluster) Add(id uint64) error {
mr, err := newMockRaft(c.ctx)
if err != nil {
return err
}
for otherID, otherRaft := range c.rafts {
if err := mr.tr.AddPeer(c.ctx, otherID, otherRaft.Addr()); err != nil {
return err
}
if err := otherRaft.tr.AddPeer(c.ctx, id, mr.Addr()); err != nil {
return err
}
}
c.rafts[id] = mr
return nil
}

func (c *mockCluster) Get(id uint64) *mockRaft {
return c.rafts[id]
}
175 changes: 175 additions & 0 deletions manager/state/raft/transport/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package transport

import (
"sync"
"time"

"golang.org/x/net/context"

"google.golang.org/grpc"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/pkg/errors"
)

type peer struct {
id uint64

tr *Transport

msgc chan raftpb.Message

ctx context.Context
cancel context.CancelFunc
done chan struct{}

mu sync.Mutex
cc *grpc.ClientConn
addr string

active bool
becameActive time.Time
}

func newPeer(ctx context.Context, id uint64, addr string, tr *Transport) (*peer, error) {
cc, err := tr.dial(ctx, addr)
if err != nil {
return nil, errors.Wrapf(err, "failed to create conn for %x with addr %s", id, addr)
}
ctx, cancel := context.WithCancel(tr.ctx)
p := &peer{
id: id,
addr: addr,
cc: cc,
tr: tr,
ctx: ctx,
cancel: cancel,
msgc: make(chan raftpb.Message, 4096),
done: make(chan struct{}),
active: true,
becameActive: time.Now(),
}
go p.run(ctx)
return p, nil
}

func (p *peer) send(ctx context.Context, m raftpb.Message) (err error) {
defer func() {
if err != nil {
p.mu.Lock()
p.active = false
p.mu.Unlock()
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-p.ctx.Done():
return p.ctx.Err()
default:
}
select {
case p.msgc <- m:
case <-ctx.Done():
return ctx.Err()
case <-p.ctx.Done():
return p.ctx.Err()
default:
p.tr.config.Raft.ReportUnreachable(m.To)
return errors.Errorf("peer is unreachable")
}
return nil
}

func (p *peer) update(ctx context.Context, addr string) error {
p.mu.Lock()
defer p.mu.Unlock()
if addr == p.addr {
return nil
}
cc, err := p.tr.dial(ctx, addr)
if err != nil {
return err
}
p.cc.Close()
p.cc = cc
p.addr = addr
return nil
}

func (p *peer) conn() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
return p.cc
}

func (p *peer) address() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.addr
}

func (p *peer) resolveAddr(ctx context.Context, id uint64) (string, error) {
resp, err := api.NewRaftClient(p.conn()).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: id})
if err != nil {
return "", errors.Wrap(err, "failed to resolve address")
}
return resp.Addr, nil
}

func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
if _, err := api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}); err != nil {
p.mu.Lock()
p.active = false
p.mu.Unlock()
if m.Type == raftpb.MsgSnap {
p.tr.config.Raft.ReportSnapshot(m.To, raft.SnapshotFailure)
}
p.tr.config.Raft.ReportUnreachable(m.To)
if grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() {
p.tr.config.Raft.NodeRemoved()
}
return err
}
if m.Type == raftpb.MsgSnap {
p.tr.config.Raft.ReportSnapshot(m.To, raft.SnapshotFinish)
}
p.mu.Lock()
if !p.active {
p.active = true
p.becameActive = time.Now()
}
p.mu.Unlock()
return nil
}

func (p *peer) run(ctx context.Context) {
defer func() {
close(p.done)
p.mu.Lock()
p.cc.Close()
p.mu.Unlock()
}()
for {
select {
case <-ctx.Done():
return
default:
}
select {
case m := <-p.msgc:
ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
err := p.sendProcessMessage(ctx, m)
cancel()
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to send message %s to peer %x", m.Type, m.To)
}
case <-ctx.Done():
return
}
}
}
Loading

0 comments on commit 542aad9

Please sign in to comment.