Skip to content

Commit

Permalink
Merge pull request moby#1359 from LK4D4/return_picker
Browse files Browse the repository at this point in the history
manager: use leader picker only for controlapi
  • Loading branch information
aaronlehmann authored Aug 10, 2016
2 parents 3eaa764 + dae7cfb commit ad6fb9e
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 19 deletions.
8 changes: 4 additions & 4 deletions api/ca.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/control.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/dispatcher.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/health.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions api/raft.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions manager/controlapi/hackpicker/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package hackpicker

// AddrSelector is interface which should track cluster for its leader address.
type AddrSelector interface {
LeaderAddr() (string, error)
}

// RaftCluster is interface which combines useful methods for clustering.
type RaftCluster interface {
AddrSelector
IsLeader() bool
}
141 changes: 141 additions & 0 deletions manager/controlapi/hackpicker/raftpicker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Package hackpicker is temporary solution to provide more seamless experience
// for controlapi. It has drawback of slow reaction to leader change, but it
// tracks leader automatically without erroring out to client.
package hackpicker

import (
"sync"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
)

// picker always picks address of cluster leader.
type picker struct {
mu sync.Mutex
addr string
raft AddrSelector
conn *grpc.Conn
cc *grpc.ClientConn
}

// Init does initial processing for the Picker, e.g., initiate some connections.
func (p *picker) Init(cc *grpc.ClientConn) error {
p.cc = cc
return nil
}

func (p *picker) initConn() error {
if p.conn == nil {
conn, err := grpc.NewConn(p.cc)
if err != nil {
return err
}
p.conn = conn
}
return nil
}

// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
// or some error happens.
func (p *picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Lock()
if err := p.initConn(); err != nil {
p.mu.Unlock()
return nil, err
}
p.mu.Unlock()

addr, err := p.raft.LeaderAddr()
if err != nil {
return nil, err
}
p.mu.Lock()
if p.addr != addr {
p.addr = addr
p.conn.NotifyReset()
}
p.mu.Unlock()
return p.conn.Wait(ctx)
}

// PickAddr picks a peer address for connecting. This will be called repeated for
// connecting/reconnecting.
func (p *picker) PickAddr() (string, error) {
addr, err := p.raft.LeaderAddr()
if err != nil {
return "", err
}
p.mu.Lock()
p.addr = addr
p.mu.Unlock()
return addr, nil
}

// State returns the connectivity state of the underlying connections.
func (p *picker) State() (grpc.ConnectivityState, error) {
return p.conn.State(), nil
}

// WaitForStateChange blocks until the state changes to something other than
// the sourceState. It returns the new state or error.
func (p *picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) {
return p.conn.WaitForStateChange(ctx, sourceState)
}

// Reset the current connection and force a reconnect to another address.
func (p *picker) Reset() error {
p.conn.NotifyReset()
return nil
}

// Close closes all the Conn's owned by this Picker.
func (p *picker) Close() error {
return p.conn.Close()
}

// ConnSelector is struct for obtaining connection with raftpicker.
type ConnSelector struct {
mu sync.Mutex
cc *grpc.ClientConn
cluster RaftCluster
opts []grpc.DialOption
}

// NewConnSelector returns new ConnSelector with cluster and grpc.DialOpts which
// will be used for Dial on first call of Conn.
func NewConnSelector(cluster RaftCluster, opts ...grpc.DialOption) *ConnSelector {
return &ConnSelector{
cluster: cluster,
opts: opts,
}
}

// Conn returns *grpc.ClientConn with picker which picks raft cluster leader.
// Internal connection estabilished lazily on this call.
// It can return error if cluster wasn't ready at the moment of initial call.
func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cc != nil {
return c.cc, nil
}
addr, err := c.cluster.LeaderAddr()
if err != nil {
return nil, err
}
picker := &picker{raft: c.cluster, addr: addr}
opts := append(c.opts, grpc.WithPicker(picker))
cc, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
c.cc = cc
return c.cc, nil
}

// Reset does nothing for hackpicker.
func (c *ConnSelector) Reset() error {
return nil
}
14 changes: 13 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
"github.com/docker/swarmkit/manager/controlapi"
"github.com/docker/swarmkit/manager/controlapi/hackpicker"
"github.com/docker/swarmkit/manager/dispatcher"
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager"
Expand Down Expand Up @@ -442,6 +443,17 @@ func (m *Manager) Run(parent context.Context) error {
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

// We need special connSelector for controlapi because it provides automatic
// leader tracking.
// Other APIs are using connSelector which errors out on leader change, but
// allows to react quickly to reelections.
controlAPIProxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

controlAPIConnSelector := hackpicker.NewConnSelector(m.RaftNode, controlAPIProxyOpts...)

authorize := func(ctx context.Context, roles []string) error {
// Authorize the remote roles, ensure they can only be forwarded by managers
_, err := ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization())
Expand Down Expand Up @@ -471,7 +483,7 @@ func (m *Manager) Run(parent context.Context) error {
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, controlAPIConnSelector, m.RaftNode, forwardAsOwnRequest)

// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!
Expand Down
7 changes: 7 additions & 0 deletions manager/raftpicker/raftpicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (
"google.golang.org/grpc"
)

// Interface is interface to replace implementation with controlapi/hackpicker.
// TODO: it should be done cooler.
type Interface interface {
Conn() (*grpc.ClientConn, error)
Reset() error
}

// ConnSelector is struct for obtaining connection connected to cluster leader.
type ConnSelector struct {
mu sync.Mutex
Expand Down
4 changes: 2 additions & 2 deletions protobuf/plugin/raftproxy/raftproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func (g *raftProxyGen) Name() string {
func (g *raftProxyGen) genProxyStruct(s *descriptor.ServiceDescriptorProto) {
g.gen.P("type " + serviceTypeName(s) + " struct {")
g.gen.P("\tlocal " + s.GetName() + "Server")
g.gen.P("\tconnSelector *raftpicker.ConnSelector")
g.gen.P("\tconnSelector raftpicker.Interface")
g.gen.P("\tcluster raftpicker.RaftCluster")
g.gen.P("\tctxMods []func(context.Context)(context.Context, error)")
g.gen.P("}")
}

func (g *raftProxyGen) genProxyConstructor(s *descriptor.ServiceDescriptorProto) {
g.gen.P("func NewRaftProxy" + s.GetName() + "Server(local " + s.GetName() + "Server, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context)(context.Context, error)) " + s.GetName() + "Server {")
g.gen.P("func NewRaftProxy" + s.GetName() + "Server(local " + s.GetName() + "Server, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context)(context.Context, error)) " + s.GetName() + "Server {")
g.gen.P(`redirectChecker := func(ctx context.Context)(context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions protobuf/plugin/raftproxy/test/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ad6fb9e

Please sign in to comment.