diff --git a/connectionbroker/broker.go b/connectionbroker/broker.go new file mode 100644 index 0000000000..f22726f2b1 --- /dev/null +++ b/connectionbroker/broker.go @@ -0,0 +1,105 @@ +// Package connectionbroker is a layer on top of remotes that returns +// a gRPC connection to a manager. The connection may be a local connection +// using a local socket such as a UNIX socket. +package connectionbroker + +import ( + "sync" + + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/remotes" + "google.golang.org/grpc" +) + +// Broker is a simple connection broker. It can either return a fresh +// connection to a remote manager selected with weighted randomization, or a +// local gRPC connection to the local manager. +type Broker struct { + mu sync.Mutex + remotes remotes.Remotes + localConn *grpc.ClientConn +} + +// New creates a new connection broker. +func New(remotes remotes.Remotes) *Broker { + return &Broker{ + remotes: remotes, + } +} + +// SetLocalConn changes the local gRPC connection used by the connection broker. +func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) { + b.mu.Lock() + defer b.mu.Unlock() + + b.localConn = localConn +} + +// Select a manager from the set of available managers, and return a connection. +func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { + b.mu.Lock() + localConn := b.localConn + b.mu.Unlock() + + if localConn != nil { + return &Conn{ + ClientConn: localConn, + isLocal: true, + }, nil + } + + return b.SelectRemote(dialOpts...) +} + +// SelectRemote chooses a manager from the remotes, and returns a TCP +// connection. +func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { + peer, err := b.remotes.Select() + if err != nil { + return nil, err + } + + cc, err := grpc.Dial(peer.Addr, dialOpts...) + if err != nil { + b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight) + return nil, err + } + + return &Conn{ + ClientConn: cc, + remotes: b.remotes, + peer: peer, + }, nil +} + +// Remotes returns the remotes interface used by the broker, so the caller +// can make observations or see weights directly. +func (b *Broker) Remotes() remotes.Remotes { + return b.remotes +} + +// Conn is a wrapper around a gRPC client connection. +type Conn struct { + *grpc.ClientConn + isLocal bool + remotes remotes.Remotes + peer api.Peer +} + +// Close closes the client connection if it is a remote connection. It also +// records a positive experience with the remote peer if success is true, +// otherwise it records a negative experience. If a local connection is in use, +// Close is a noop. +func (c *Conn) Close(success bool) error { + if c.isLocal { + return nil + } + + if success { + c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight) + } else { + c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight) + } + + return c.ClientConn.Close() +}