From dae7cfbb814f4da925d04717093b3b4569613dbd Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 10 Aug 2016 10:12:26 -0700 Subject: [PATCH] manager: use leader picker only for controlapi Signed-off-by: Alexander Morozov --- api/ca.pb.go | 8 +- api/control.pb.go | 4 +- api/dispatcher.pb.go | 4 +- api/health.pb.go | 4 +- api/raft.pb.go | 8 +- manager/controlapi/hackpicker/cluster.go | 12 ++ manager/controlapi/hackpicker/raftpicker.go | 141 +++++++++++++++++++ manager/manager.go | 14 +- manager/raftpicker/raftpicker.go | 7 + protobuf/plugin/raftproxy/raftproxy.go | 4 +- protobuf/plugin/raftproxy/test/service.pb.go | 4 +- 11 files changed, 191 insertions(+), 19 deletions(-) create mode 100644 manager/controlapi/hackpicker/cluster.go create mode 100644 manager/controlapi/hackpicker/raftpicker.go diff --git a/api/ca.pb.go b/api/ca.pb.go index 1f5bf3e1bc70b..0fb608c9658fb 100644 --- a/api/ca.pb.go +++ b/api/ca.pb.go @@ -668,12 +668,12 @@ func encodeVarintCa(data []byte, offset int, v uint64) int { type raftProxyCAServer struct { local CAServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyCAServer(local CAServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) CAServer { +func NewRaftProxyCAServer(local CAServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) CAServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { @@ -742,12 +742,12 @@ func (p *raftProxyCAServer) GetRootCACertificate(ctx context.Context, r *GetRoot type raftProxyNodeCAServer struct { local NodeCAServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyNodeCAServer(local NodeCAServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) NodeCAServer { +func NewRaftProxyNodeCAServer(local NodeCAServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) NodeCAServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { diff --git a/api/control.pb.go b/api/control.pb.go index 4d66435638979..f634ca5f5069c 100644 --- a/api/control.pb.go +++ b/api/control.pb.go @@ -4239,12 +4239,12 @@ func encodeVarintControl(data []byte, offset int, v uint64) int { type raftProxyControlServer struct { local ControlServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyControlServer(local ControlServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) ControlServer { +func NewRaftProxyControlServer(local ControlServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) ControlServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { diff --git a/api/dispatcher.pb.go b/api/dispatcher.pb.go index face317f975f9..edaffda232d0e 100644 --- a/api/dispatcher.pb.go +++ b/api/dispatcher.pb.go @@ -1072,12 +1072,12 @@ func encodeVarintDispatcher(data []byte, offset int, v uint64) int { type raftProxyDispatcherServer struct { local DispatcherServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyDispatcherServer(local DispatcherServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) DispatcherServer { +func NewRaftProxyDispatcherServer(local DispatcherServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) DispatcherServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { diff --git a/api/health.pb.go b/api/health.pb.go index 693836637e8b9..604535307c9f1 100644 --- a/api/health.pb.go +++ b/api/health.pb.go @@ -319,12 +319,12 @@ func encodeVarintHealth(data []byte, offset int, v uint64) int { type raftProxyHealthServer struct { local HealthServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyHealthServer(local HealthServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) HealthServer { +func NewRaftProxyHealthServer(local HealthServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) HealthServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { diff --git a/api/raft.pb.go b/api/raft.pb.go index 0acd404d64e20..e3bc24b9e63f2 100644 --- a/api/raft.pb.go +++ b/api/raft.pb.go @@ -1438,12 +1438,12 @@ func encodeVarintRaft(data []byte, offset int, v uint64) int { type raftProxyRaftServer struct { local RaftServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyRaftServer(local RaftServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftServer { +func NewRaftProxyRaftServer(local RaftServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { @@ -1541,12 +1541,12 @@ func (p *raftProxyRaftServer) ResolveAddress(ctx context.Context, r *ResolveAddr type raftProxyRaftMembershipServer struct { local RaftMembershipServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyRaftMembershipServer(local RaftMembershipServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftMembershipServer { +func NewRaftProxyRaftMembershipServer(local RaftMembershipServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftMembershipServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { diff --git a/manager/controlapi/hackpicker/cluster.go b/manager/controlapi/hackpicker/cluster.go new file mode 100644 index 0000000000000..60d9ba6cddbf6 --- /dev/null +++ b/manager/controlapi/hackpicker/cluster.go @@ -0,0 +1,12 @@ +package hackpicker + +// AddrSelector is interface which should track cluster for its leader address. +type AddrSelector interface { + LeaderAddr() (string, error) +} + +// RaftCluster is interface which combines useful methods for clustering. +type RaftCluster interface { + AddrSelector + IsLeader() bool +} diff --git a/manager/controlapi/hackpicker/raftpicker.go b/manager/controlapi/hackpicker/raftpicker.go new file mode 100644 index 0000000000000..baa11542f34ea --- /dev/null +++ b/manager/controlapi/hackpicker/raftpicker.go @@ -0,0 +1,141 @@ +// Package hackpicker is temporary solution to provide more seamless experience +// for controlapi. It has drawback of slow reaction to leader change, but it +// tracks leader automatically without erroring out to client. +package hackpicker + +import ( + "sync" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/transport" +) + +// picker always picks address of cluster leader. +type picker struct { + mu sync.Mutex + addr string + raft AddrSelector + conn *grpc.Conn + cc *grpc.ClientConn +} + +// Init does initial processing for the Picker, e.g., initiate some connections. +func (p *picker) Init(cc *grpc.ClientConn) error { + p.cc = cc + return nil +} + +func (p *picker) initConn() error { + if p.conn == nil { + conn, err := grpc.NewConn(p.cc) + if err != nil { + return err + } + p.conn = conn + } + return nil +} + +// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC +// or some error happens. +func (p *picker) Pick(ctx context.Context) (transport.ClientTransport, error) { + p.mu.Lock() + if err := p.initConn(); err != nil { + p.mu.Unlock() + return nil, err + } + p.mu.Unlock() + + addr, err := p.raft.LeaderAddr() + if err != nil { + return nil, err + } + p.mu.Lock() + if p.addr != addr { + p.addr = addr + p.conn.NotifyReset() + } + p.mu.Unlock() + return p.conn.Wait(ctx) +} + +// PickAddr picks a peer address for connecting. This will be called repeated for +// connecting/reconnecting. +func (p *picker) PickAddr() (string, error) { + addr, err := p.raft.LeaderAddr() + if err != nil { + return "", err + } + p.mu.Lock() + p.addr = addr + p.mu.Unlock() + return addr, nil +} + +// State returns the connectivity state of the underlying connections. +func (p *picker) State() (grpc.ConnectivityState, error) { + return p.conn.State(), nil +} + +// WaitForStateChange blocks until the state changes to something other than +// the sourceState. It returns the new state or error. +func (p *picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) { + return p.conn.WaitForStateChange(ctx, sourceState) +} + +// Reset the current connection and force a reconnect to another address. +func (p *picker) Reset() error { + p.conn.NotifyReset() + return nil +} + +// Close closes all the Conn's owned by this Picker. +func (p *picker) Close() error { + return p.conn.Close() +} + +// ConnSelector is struct for obtaining connection with raftpicker. +type ConnSelector struct { + mu sync.Mutex + cc *grpc.ClientConn + cluster RaftCluster + opts []grpc.DialOption +} + +// NewConnSelector returns new ConnSelector with cluster and grpc.DialOpts which +// will be used for Dial on first call of Conn. +func NewConnSelector(cluster RaftCluster, opts ...grpc.DialOption) *ConnSelector { + return &ConnSelector{ + cluster: cluster, + opts: opts, + } +} + +// Conn returns *grpc.ClientConn with picker which picks raft cluster leader. +// Internal connection estabilished lazily on this call. +// It can return error if cluster wasn't ready at the moment of initial call. +func (c *ConnSelector) Conn() (*grpc.ClientConn, error) { + c.mu.Lock() + defer c.mu.Unlock() + if c.cc != nil { + return c.cc, nil + } + addr, err := c.cluster.LeaderAddr() + if err != nil { + return nil, err + } + picker := &picker{raft: c.cluster, addr: addr} + opts := append(c.opts, grpc.WithPicker(picker)) + cc, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, err + } + c.cc = cc + return c.cc, nil +} + +// Reset does nothing for hackpicker. +func (c *ConnSelector) Reset() error { + return nil +} diff --git a/manager/manager.go b/manager/manager.go index ba7263a2fdf50..957485ee7e21c 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -17,6 +17,7 @@ import ( "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/allocator" "github.com/docker/swarmkit/manager/controlapi" + "github.com/docker/swarmkit/manager/controlapi/hackpicker" "github.com/docker/swarmkit/manager/dispatcher" "github.com/docker/swarmkit/manager/health" "github.com/docker/swarmkit/manager/keymanager" @@ -442,6 +443,17 @@ func (m *Manager) Run(parent context.Context) error { cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...) m.connSelector = cs + // We need special connSelector for controlapi because it provides automatic + // leader tracking. + // Other APIs are using connSelector which errors out on leader change, but + // allows to react quickly to reelections. + controlAPIProxyOpts := []grpc.DialOption{ + grpc.WithBackoffMaxDelay(time.Second), + grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds), + } + + controlAPIConnSelector := hackpicker.NewConnSelector(m.RaftNode, controlAPIProxyOpts...) + authorize := func(ctx context.Context, roles []string) error { // Authorize the remote roles, ensure they can only be forwarded by managers _, err := ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization()) @@ -471,7 +483,7 @@ func (m *Manager) Run(parent context.Context) error { // this manager rather than forwarded requests (it has no TLS // information to put in the metadata map). forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil } - localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest) + localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, controlAPIConnSelector, m.RaftNode, forwardAsOwnRequest) // Everything registered on m.server should be an authenticated // wrapper, or a proxy wrapping an authenticated wrapper! diff --git a/manager/raftpicker/raftpicker.go b/manager/raftpicker/raftpicker.go index 3322a39a6d9bf..03b75d097b19b 100644 --- a/manager/raftpicker/raftpicker.go +++ b/manager/raftpicker/raftpicker.go @@ -9,6 +9,13 @@ import ( "google.golang.org/grpc" ) +// Interface is interface to replace implementation with controlapi/hackpicker. +// TODO: it should be done cooler. +type Interface interface { + Conn() (*grpc.ClientConn, error) + Reset() error +} + // ConnSelector is struct for obtaining connection connected to cluster leader. type ConnSelector struct { mu sync.Mutex diff --git a/protobuf/plugin/raftproxy/raftproxy.go b/protobuf/plugin/raftproxy/raftproxy.go index 7af804910c0f3..dcfbd047b0610 100644 --- a/protobuf/plugin/raftproxy/raftproxy.go +++ b/protobuf/plugin/raftproxy/raftproxy.go @@ -40,14 +40,14 @@ func (g *raftProxyGen) Name() string { func (g *raftProxyGen) genProxyStruct(s *descriptor.ServiceDescriptorProto) { g.gen.P("type " + serviceTypeName(s) + " struct {") g.gen.P("\tlocal " + s.GetName() + "Server") - g.gen.P("\tconnSelector *raftpicker.ConnSelector") + g.gen.P("\tconnSelector raftpicker.Interface") g.gen.P("\tcluster raftpicker.RaftCluster") g.gen.P("\tctxMods []func(context.Context)(context.Context, error)") g.gen.P("}") } func (g *raftProxyGen) genProxyConstructor(s *descriptor.ServiceDescriptorProto) { - g.gen.P("func NewRaftProxy" + s.GetName() + "Server(local " + s.GetName() + "Server, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context)(context.Context, error)) " + s.GetName() + "Server {") + g.gen.P("func NewRaftProxy" + s.GetName() + "Server(local " + s.GetName() + "Server, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context)(context.Context, error)) " + s.GetName() + "Server {") g.gen.P(`redirectChecker := func(ctx context.Context)(context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok { diff --git a/protobuf/plugin/raftproxy/test/service.pb.go b/protobuf/plugin/raftproxy/test/service.pb.go index ee4b5f433442c..e0887665b990c 100644 --- a/protobuf/plugin/raftproxy/test/service.pb.go +++ b/protobuf/plugin/raftproxy/test/service.pb.go @@ -724,12 +724,12 @@ func encodeVarintService(data []byte, offset int, v uint64) int { type raftProxyRouteGuideServer struct { local RouteGuideServer - connSelector *raftpicker.ConnSelector + connSelector raftpicker.Interface cluster raftpicker.RaftCluster ctxMods []func(context.Context) (context.Context, error) } -func NewRaftProxyRouteGuideServer(local RouteGuideServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RouteGuideServer { +func NewRaftProxyRouteGuideServer(local RouteGuideServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RouteGuideServer { redirectChecker := func(ctx context.Context) (context.Context, error) { s, ok := transport.StreamFromContext(ctx) if !ok {