diff --git a/agent/agent.go b/agent/agent.go index 434bdd75f9..d8cc595474 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -336,12 +336,10 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe seen := map[api.Peer]struct{}{} for _, manager := range message.Managers { if manager.Peer.Addr == "" { - log.G(ctx).WithField("manager.addr", manager.Peer.Addr). - Warnf("skipping bad manager address") continue } - a.config.Managers.Observe(*manager.Peer, int(manager.Weight)) + a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight)) seen[*manager.Peer] = struct{}{} } @@ -358,9 +356,9 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe } // prune managers not in list. - for peer := range a.config.Managers.Weights() { + for peer := range a.config.ConnBroker.Remotes().Weights() { if _, ok := seen[peer]; !ok { - a.config.Managers.Remove(peer) + a.config.ConnBroker.Remotes().Remove(peer) } } @@ -468,7 +466,7 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP ) err = a.withSession(ctx, func(session *session) error { - publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx) + publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx) return err }) if err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index 398a9887bd..55a641f799 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -8,6 +8,7 @@ import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/ca/testutils" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/remotes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -73,7 +74,7 @@ func TestAgentStartStop(t *testing.T) { agent, err := New(&Config{ Executor: &NoopExecutor{}, - Managers: remotes, + ConnBroker: connectionbroker.New(remotes), Credentials: agentSecurityConfig.ClientTLSCreds, DB: db, }) @@ -147,7 +148,7 @@ func agentTestEnv(t *testing.T) (*Agent, func()) { agent, err := New(&Config{ Executor: &NoopExecutor{}, - Managers: remotes, + ConnBroker: connectionbroker.New(remotes), Credentials: agentSecurityConfig.ClientTLSCreds, DB: db, }) diff --git a/agent/config.go b/agent/config.go index d62e15c8d5..de9359842e 100644 --- a/agent/config.go +++ b/agent/config.go @@ -4,7 +4,7 @@ import ( "github.com/boltdb/bolt" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" - "github.com/docker/swarmkit/remotes" + "github.com/docker/swarmkit/connectionbroker" "github.com/pkg/errors" "google.golang.org/grpc/credentials" ) @@ -14,9 +14,9 @@ type Config struct { // Hostname the name of host for agent instance. Hostname string - // Managers provides the manager backend used by the agent. It will be - // updated with managers weights as observed by the agent. - Managers remotes.Remotes + // ConnBroker provides a connection broker for retrieving gRPC + // connections to managers. + ConnBroker *connectionbroker.Broker // Executor specifies the executor to use for the agent. Executor exec.Executor diff --git a/agent/resource.go b/agent/resource.go index eca7564aa0..8e88d2cd65 100644 --- a/agent/resource.go +++ b/agent/resource.go @@ -30,7 +30,7 @@ type ResourceAllocator interface { func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error) { var taskID string if err := r.agent.withSession(ctx, func(session *session) error { - client := api.NewResourceAllocatorClient(session.conn) + client := api.NewResourceAllocatorClient(session.conn.ClientConn) r, err := client.AttachNetwork(ctx, &api.AttachNetworkRequest{ Config: &api.NetworkAttachmentConfig{ Target: target, @@ -53,7 +53,7 @@ func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string // DetachNetwork deletes a network attachment. func (r *resourceAllocator) DetachNetwork(ctx context.Context, aID string) error { return r.agent.withSession(ctx, func(session *session) error { - client := api.NewResourceAllocatorClient(session.conn) + client := api.NewResourceAllocatorClient(session.conn.ClientConn) _, err := client.DetachNetwork(ctx, &api.DetachNetworkRequest{ AttachmentID: aID, }) diff --git a/agent/session.go b/agent/session.go index a8f657ffa3..fc1dca0db6 100644 --- a/agent/session.go +++ b/agent/session.go @@ -7,9 +7,9 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/protobuf/ptypes" - "github.com/docker/swarmkit/remotes" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -30,8 +30,7 @@ var ( // flow into the agent, such as task assignment, are called back into the // agent through errs, messages and tasks. type session struct { - conn *grpc.ClientConn - addr string + conn *connectionbroker.Conn agent *Agent sessionID string @@ -61,12 +60,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI // TODO(stevvooe): Need to move connection management up a level or create // independent connection for log broker client. - peer, err := agent.config.Managers.Select() - if err != nil { - s.errs <- err - return s - } - cc, err := grpc.Dial(peer.Addr, + cc, err := agent.config.ConnBroker.Select( grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), ) @@ -74,7 +68,6 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI s.errs <- err return s } - s.addr = peer.Addr s.conn = cc go s.run(ctx, delay, description) @@ -127,7 +120,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e // Need to run Session in a goroutine since there's no way to set a // timeout for an individual Recv call in a stream. go func() { - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) stream, err = client.Session(sessionCtx, &api.SessionRequest{ Description: description, @@ -160,7 +153,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e func (s *session) heartbeat(ctx context.Context) error { log.G(ctx).Debugf("(*session).heartbeat") - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) heartbeat := time.NewTimer(1) // send out a heartbeat right away defer heartbeat.Stop() @@ -224,7 +217,7 @@ func (s *session) logSubscriptions(ctx context.Context) error { log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"}) log.Debugf("") - client := api.NewLogBrokerClient(s.conn) + client := api.NewLogBrokerClient(s.conn.ClientConn) subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{}) if err != nil { return err @@ -269,7 +262,7 @@ func (s *session) watch(ctx context.Context) error { err error ) - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) for { // If this is the first time we're running the loop, or there was a reference mismatch // attempt to get the assignmentWatch @@ -344,7 +337,7 @@ func (s *session) watch(ctx context.Context) error { // sendTaskStatus uses the current session to send the status of a single task. func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error { - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{ SessionID: s.sessionID, Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{ @@ -385,7 +378,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa return updates, ctx.Err() } - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) n := batchSize if len(updates) < n { @@ -416,8 +409,7 @@ func (s *session) sendError(err error) { func (s *session) close() error { s.closeOnce.Do(func() { if s.conn != nil { - s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -remotes.DefaultObservationWeight) - s.conn.Close() + s.conn.Close(false) } close(s.closed) diff --git a/ca/certificates.go b/ca/certificates.go index 85b8bb9a6e..52537c89e4 100644 --- a/ca/certificates.go +++ b/ca/certificates.go @@ -22,8 +22,8 @@ import ( "github.com/cloudflare/cfssl/signer/local" "github.com/docker/go-events" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/ioutils" - "github.com/docker/swarmkit/remotes" "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" @@ -169,6 +169,15 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit if err == nil { break } + + // If the first attempt fails, we should try a remote + // connection. The local node may be a manager that was + // demoted, so the local connection (which is preferred) may + // not work. If we are successful in renewing the certificate, + // the local connection will not be returned by the connection + // broker anymore. + config.ForceRemote = true + } if err != nil { return nil, err @@ -202,7 +211,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit var kekUpdate *KEKData for i := 0; i < 5; i++ { - kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes) + kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.ConnBroker) if err == nil { break } @@ -218,7 +227,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit return &tlsKeyPair, nil } -func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, r remotes.Remotes) (*KEKData, error) { +func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, connBroker *connectionbroker.Broker) (*KEKData, error) { var managerRole bool for _, ou := range cert.Subject.OrganizationalUnit { if ou == ManagerRole { @@ -229,25 +238,25 @@ func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, key if managerRole { mtlsCreds := credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rca.Pool, Certificates: []tls.Certificate{keypair}}) - conn, peer, err := getGRPCConnection(mtlsCreds, r) + conn, err := getGRPCConnection(mtlsCreds, connBroker, false) if err != nil { return nil, err } - defer conn.Close() - client := api.NewCAClient(conn) + client := api.NewCAClient(conn.ClientConn) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() response, err := client.GetUnlockKey(ctx, &api.GetUnlockKeyRequest{}) if err != nil { if grpc.Code(err) == codes.Unimplemented { // if the server does not support keks, return as if no encryption key was specified + conn.Close(true) return &KEKData{}, nil } - r.Observe(peer, -remotes.DefaultObservationWeight) + conn.Close(false) return nil, err } - r.Observe(peer, remotes.DefaultObservationWeight) + conn.Close(true) return &KEKData{KEK: response.UnlockKey, Version: response.Version.Index}, nil } @@ -440,45 +449,33 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) { return NewRootCA(cert, key, DefaultNodeCertExpiration) } -func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) { - peer, err := r.Select() - if err != nil { - return nil, api.Peer{}, err - } - - opts := []grpc.DialOption{ +func getGRPCConnection(creds credentials.TransportCredentials, connBroker *connectionbroker.Broker, forceRemote bool) (*connectionbroker.Conn, error) { + dialOpts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), grpc.WithTimeout(5 * time.Second), grpc.WithBackoffMaxDelay(5 * time.Second), } - - conn, err := grpc.Dial(peer.Addr, opts...) - if err != nil { - return nil, api.Peer{}, err + if forceRemote { + return connBroker.SelectRemote(dialOpts...) } - return conn, peer, nil + return connBroker.Select(dialOpts...) } // GetRemoteCA returns the remote endpoint's CA certificate -func GetRemoteCA(ctx context.Context, d digest.Digest, r remotes.Remotes) (RootCA, error) { +func GetRemoteCA(ctx context.Context, d digest.Digest, connBroker *connectionbroker.Broker) (RootCA, error) { // This TLS Config is intentionally using InsecureSkipVerify. We use the // digest instead to check the integrity of the CA certificate. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) - conn, peer, err := getGRPCConnection(insecureCreds, r) + conn, err := getGRPCConnection(insecureCreds, connBroker, false) if err != nil { return RootCA{}, err } - defer conn.Close() - client := api.NewCAClient(conn) + client := api.NewCAClient(conn.ClientConn) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() defer func() { - if err != nil { - r.Observe(peer, -remotes.DefaultObservationWeight) - return - } - r.Observe(peer, remotes.DefaultObservationWeight) + conn.Close(err == nil) }() response, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{}) if err != nil { @@ -558,20 +555,22 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool}) } - conn, peer, err := getGRPCConnection(creds, config.Remotes) + conn, err := getGRPCConnection(creds, config.ConnBroker, config.ForceRemote) if err != nil { return nil, err } - defer conn.Close() // Create a CAClient to retrieve a new Certificate - caClient := api.NewNodeCAClient(conn) + caClient := api.NewNodeCAClient(conn.ClientConn) + + issueCtx, issueCancel := context.WithTimeout(ctx, 5*time.Second) + defer issueCancel() // Send the Request and retrieve the request token issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token, Availability: config.Availability} - issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest) + issueResponse, err := caClient.IssueNodeCertificate(issueCtx, issueRequest) if err != nil { - config.Remotes.Observe(peer, -remotes.DefaultObservationWeight) + conn.Close(false) return nil, err } @@ -589,13 +588,14 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 defer cancel() statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest) if err != nil { - config.Remotes.Observe(peer, -remotes.DefaultObservationWeight) + conn.Close(false) return nil, err } // If the certificate was issued, return if statusResponse.Status.State == api.IssuanceStateIssued { if statusResponse.Certificate == nil { + conn.Close(false) return nil, errors.New("no certificate in CertificateStatus response") } @@ -605,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 // retry until the certificate gets updated per our // current request. if bytes.Equal(statusResponse.Certificate.CSR, csr) { - config.Remotes.Observe(peer, remotes.DefaultObservationWeight) + conn.Close(true) return statusResponse.Certificate.Certificate, nil } } diff --git a/ca/certificates_test.go b/ca/certificates_test.go index 0d234397fa..449e98ebb3 100644 --- a/ca/certificates_test.go +++ b/ca/certificates_test.go @@ -258,7 +258,7 @@ func TestGetRemoteCA(t *testing.T) { d, err := digest.Parse("sha256:" + mdStr) assert.NoError(t, err) - cert, err := ca.GetRemoteCA(tc.Context, d, tc.Remotes) + cert, err := ca.GetRemoteCA(tc.Context, d, tc.ConnBroker) assert.NoError(t, err) assert.NotNil(t, cert) } @@ -276,7 +276,7 @@ func TestGetRemoteCAInvalidHash(t *testing.T) { tc := testutils.NewTestCA(t) defer tc.Stop() - _, err := ca.GetRemoteCA(tc.Context, "sha256:2d2f968475269f0dde5299427cf74348ee1d6115b95c6e3f283e5a4de8da445b", tc.Remotes) + _, err := ca.GetRemoteCA(tc.Context, "sha256:2d2f968475269f0dde5299427cf74348ee1d6115b95c6e3f283e5a4de8da445b", tc.ConnBroker) assert.Error(t, err) } @@ -288,8 +288,8 @@ func TestRequestAndSaveNewCertificates(t *testing.T) { rca := ca.RootCA{Cert: tc.RootCA.Cert, Pool: tc.RootCA.Pool} cert, err := rca.RequestAndSaveNewCertificates(tc.Context, tc.KeyReadWriter, ca.CertificateRequestConfig{ - Token: tc.ManagerToken, - Remotes: tc.Remotes, + Token: tc.ManagerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, cert) @@ -306,8 +306,8 @@ func TestRequestAndSaveNewCertificates(t *testing.T) { // the worker token is also unencrypted cert, err = rca.RequestAndSaveNewCertificates(tc.Context, tc.KeyReadWriter, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, cert) @@ -330,8 +330,8 @@ func TestRequestAndSaveNewCertificates(t *testing.T) { _, err = rca.RequestAndSaveNewCertificates(tc.Context, tc.KeyReadWriter, ca.CertificateRequestConfig{ - Token: tc.ManagerToken, - Remotes: tc.Remotes, + Token: tc.ManagerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) @@ -345,8 +345,8 @@ func TestRequestAndSaveNewCertificates(t *testing.T) { // if it's a worker though, the key is always unencrypted, even though the manager key is encrypted _, err = rca.RequestAndSaveNewCertificates(tc.Context, tc.KeyReadWriter, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) _, _, err = unencryptedKeyReader.Read() @@ -412,8 +412,8 @@ func TestGetRemoteSignedCertificate(t *testing.T) { certs, err := ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool, ca.CertificateRequestConfig{ - Token: tc.ManagerToken, - Remotes: tc.Remotes, + Token: tc.ManagerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, certs) @@ -429,8 +429,8 @@ func TestGetRemoteSignedCertificate(t *testing.T) { // Test the expiration for an worker certificate certs, err = ca.GetRemoteSignedCertificate(tc.Context, csr, tc.RootCA.Pool, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, certs) @@ -452,8 +452,8 @@ func TestGetRemoteSignedCertificateNodeInfo(t *testing.T) { cert, err := ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, cert) @@ -476,8 +476,8 @@ func TestGetRemoteSignedCertificateWithPending(t *testing.T) { go func() { _, err := ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) completed <- err }() diff --git a/ca/config.go b/ca/config.go index 1afa536946..d2664bd635 100644 --- a/ca/config.go +++ b/ca/config.go @@ -16,9 +16,9 @@ import ( "github.com/Sirupsen/logrus" cfconfig "github.com/cloudflare/cfssl/config" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/log" - "github.com/docker/swarmkit/remotes" "github.com/opencontainers/go-digest" "github.com/pkg/errors" "google.golang.org/grpc/credentials" @@ -200,7 +200,7 @@ func getCAHashFromToken(token string) (digest.Digest, error) { } // DownloadRootCA tries to retrieve a remote root CA and matches the digest against the provided token. -func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remotes.Remotes) (RootCA, error) { +func DownloadRootCA(ctx context.Context, paths CertPaths, token string, connBroker *connectionbroker.Broker) (RootCA, error) { var rootCA RootCA // Get a digest for the optional CA hash string that we've been provided // If we were provided a non-empty string, and it is an invalid hash, return @@ -221,7 +221,7 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote // just been demoted, for example). for i := 0; i != 5; i++ { - rootCA, err = GetRemoteCA(ctx, d, r) + rootCA, err = GetRemoteCA(ctx, d, connBroker) if err == nil { break } @@ -313,11 +313,16 @@ type CertificateRequestConfig struct { Token string // Availability allows a user to control the current scheduling status of a node Availability api.NodeSpec_Availability - // Remotes is the set of remote CAs. - Remotes remotes.Remotes + // ConnBroker provides connections to CAs. + ConnBroker *connectionbroker.Broker // Credentials provides transport credentials for communicating with the // remote server. Credentials credentials.TransportCredentials + // ForceRemote specifies that only a remote (TCP) connection should + // be used to request the certificate. This may be necessary in cases + // where the local node is running a manager, but is in the process of + // being demoted. + ForceRemote bool } // CreateSecurityConfig creates a new key and cert for this node, either locally @@ -380,7 +385,7 @@ func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWrite // RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to // RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking -func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error { +func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker) error { s.renewalMu.Lock() defer s.renewalMu.Unlock() @@ -395,7 +400,7 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx, s.KeyWriter(), CertificateRequestConfig{ - Remotes: r, + ConnBroker: connBroker, Credentials: s.ClientTLSCreds, }) if err != nil { @@ -437,7 +442,7 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes // RenewTLSConfig will continuously monitor for the necessity of renewing the local certificates, either by // issuing them locally if key-material is available, or requesting them from a remote CA. -func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remotes, renew <-chan struct{}) <-chan CertificateUpdate { +func RenewTLSConfig(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker, renew <-chan struct{}) <-chan CertificateUpdate { updates := make(chan CertificateUpdate) go func() { @@ -501,7 +506,7 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo // ignore errors - it will just try again later var certUpdate CertificateUpdate - if err := RenewTLSConfigNow(ctx, s, remotes); err != nil { + if err := RenewTLSConfigNow(ctx, s, connBroker); err != nil { certUpdate.Err = err } else { certUpdate.Role = s.ClientTLSCreds.Role() diff --git a/ca/config_test.go b/ca/config_test.go index 6bcde60739..2e5bea9f46 100644 --- a/ca/config_test.go +++ b/ca/config_test.go @@ -25,7 +25,7 @@ func TestDownloadRootCASuccess(t *testing.T) { // Remove the CA cert os.RemoveAll(tc.Paths.RootCA.Cert) - rootCA, err := ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, tc.WorkerToken, tc.Remotes) + rootCA, err := ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, tc.WorkerToken, tc.ConnBroker) require.NoError(t, err) require.NotNil(t, rootCA.Pool) require.NotNil(t, rootCA.Cert) @@ -37,7 +37,7 @@ func TestDownloadRootCASuccess(t *testing.T) { os.RemoveAll(tc.Paths.RootCA.Cert) // downloading without a join token also succeeds - rootCA, err = ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, "", tc.Remotes) + rootCA, err = ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, "", tc.ConnBroker) require.NoError(t, err) require.NotNil(t, rootCA.Pool) require.NotNil(t, rootCA.Cert) @@ -54,7 +54,7 @@ func TestDownloadRootCAWrongCAHash(t *testing.T) { os.RemoveAll(tc.Paths.RootCA.Cert) // invalid token - _, err := ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, "invalidtoken", tc.Remotes) + _, err := ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, "invalidtoken", tc.ConnBroker) require.Error(t, err) require.Contains(t, err.Error(), "invalid join token") @@ -65,7 +65,7 @@ func TestDownloadRootCAWrongCAHash(t *testing.T) { os.RemoveAll(tc.Paths.RootCA.Cert) - _, err = ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, replacementToken, tc.Remotes) + _, err = ca.DownloadRootCA(tc.Context, tc.Paths.RootCA, replacementToken, tc.ConnBroker) require.Error(t, err) require.Contains(t, err.Error(), "remote CA does not match fingerprint.") } @@ -79,8 +79,8 @@ func TestCreateSecurityConfigEmptyDir(t *testing.T) { krw := ca.NewKeyReadWriter(tc.Paths.Node, nil, nil) nodeConfig, err := tc.RootCA.CreateSecurityConfig(tc.Context, krw, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, nodeConfig) @@ -99,8 +99,8 @@ func TestCreateSecurityConfigNoCerts(t *testing.T) { krw := ca.NewKeyReadWriter(tc.Paths.Node, nil, nil) nodeConfig, err := tc.RootCA.CreateSecurityConfig(tc.Context, krw, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, nodeConfig) @@ -115,8 +115,8 @@ func TestCreateSecurityConfigNoCerts(t *testing.T) { assert.NoError(t, err) nodeConfig, err = rootCA.CreateSecurityConfig(tc.Context, krw, ca.CertificateRequestConfig{ - Token: tc.WorkerToken, - Remotes: tc.Remotes, + Token: tc.WorkerToken, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, nodeConfig) @@ -141,7 +141,7 @@ some random garbage\n nodeConfig, err := tc.RootCA.CreateSecurityConfig(tc.Context, krw, ca.CertificateRequestConfig{ - Remotes: tc.Remotes, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) @@ -167,7 +167,7 @@ some random garbage\n nodeConfig, err := tc.RootCA.CreateSecurityConfig(tc.Context, krw, ca.CertificateRequestConfig{ - Remotes: tc.Remotes, + ConnBroker: tc.ConnBroker, }) assert.NoError(t, err) assert.NotNil(t, nodeConfig) @@ -232,7 +232,7 @@ func TestRenewTLSConfigWorker(t *testing.T) { assert.NoError(t, err) renew := make(chan struct{}) - updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.Remotes, renew) + updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.ConnBroker, renew) select { case <-time.After(10 * time.Second): assert.Fail(t, "TestRenewTLSConfig timed-out") @@ -288,7 +288,7 @@ func TestRenewTLSConfigManager(t *testing.T) { // Get a new nodeConfig with a TLS cert that has 1 minute to live renew := make(chan struct{}) - updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.Remotes, renew) + updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.ConnBroker, renew) select { case <-time.After(10 * time.Second): assert.Fail(t, "TestRenewTLSConfig timed-out") @@ -350,7 +350,7 @@ func TestRenewTLSConfigWithNoNode(t *testing.T) { assert.NoError(t, err) renew := make(chan struct{}) - updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.Remotes, renew) + updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.ConnBroker, renew) select { case <-time.After(10 * time.Second): assert.Fail(t, "TestRenewTLSConfig timed-out") @@ -374,7 +374,7 @@ func TestForceRenewTLSConfig(t *testing.T) { assert.NoError(t, err) renew := make(chan struct{}, 1) - updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.Remotes, renew) + updates := ca.RenewTLSConfig(ctx, nodeConfig, tc.ConnBroker, renew) renew <- struct{}{} select { case <-time.After(10 * time.Second): diff --git a/ca/testutils/cautils.go b/ca/testutils/cautils.go index 8474a11c0f..bb161b75d6 100644 --- a/ca/testutils/cautils.go +++ b/ca/testutils/cautils.go @@ -18,6 +18,7 @@ import ( "github.com/cloudflare/cfssl/signer/local" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/ioutils" "github.com/docker/swarmkit/manager/state/store" @@ -45,7 +46,7 @@ type TestCA struct { Conns []*grpc.ClientConn WorkerToken string ManagerToken string - Remotes remotes.Remotes + ConnBroker *connectionbroker.Broker KeyReadWriter *ca.KeyReadWriter } @@ -199,7 +200,7 @@ func NewTestCA(t *testing.T, krwGenerators ...func(ca.CertPaths) *ca.KeyReadWrit CAServer: caServer, WorkerToken: workerToken, ManagerToken: managerToken, - Remotes: remotes, + ConnBroker: connectionbroker.New(remotes), KeyReadWriter: krw, } } diff --git a/manager/manager.go b/manager/manager.go index f852d4f6a6..b1c65aa14b 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -1,6 +1,7 @@ package manager import ( + "crypto/tls" "crypto/x509" "encoding/pem" "fmt" @@ -16,6 +17,7 @@ import ( "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/allocator" "github.com/docker/swarmkit/manager/controlapi" @@ -38,6 +40,7 @@ import ( "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( @@ -557,9 +560,6 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { "node.role": ca.ManagerRole, }) - // we are our own peer from which we get certs - try to connect over the local socket - r := remotes.NewRemotes(api.Peer{Addr: m.Addr(), NodeID: nodeID}) - kekData := ca.KEKData{Version: cluster.Meta.Version.Index} for _, encryptionKey := range cluster.UnlockKeys { if encryptionKey.Subsystem == ca.ManagerRole { @@ -579,8 +579,27 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews; // don't wait because it might take a bit go func() { - if err := ca.RenewTLSConfigNow(ctx, securityConfig, r); err != nil { - logger.WithError(err).Errorf("failed to download new TLS certificate after locking the cluster") + insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + + conn, err := grpc.Dial( + m.config.ControlAPI, + grpc.WithTransportCredentials(insecureCreds), + grpc.WithDialer( + func(addr string, timeout time.Duration) (net.Conn, error) { + return xnet.DialTimeoutLocal(addr, timeout) + }), + ) + if err != nil { + logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster") + return + } + + defer conn.Close() + + connBroker := connectionbroker.New(remotes.NewRemotes()) + connBroker.SetLocalConn(conn) + if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker); err != nil { + logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster") } }() } diff --git a/node/node.go b/node/node.go index cf1545c529..be776dc151 100644 --- a/node/node.go +++ b/node/node.go @@ -18,6 +18,7 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/ioutils" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager" @@ -105,6 +106,7 @@ type Node struct { sync.RWMutex config *Config remotes *persistentRemotes + connBroker *connectionbroker.Broker role string roleCond *sync.Cond conn *grpc.ClientConn @@ -154,7 +156,6 @@ func New(c *Config) (*Node, error) { return nil, err } } - n := &Node{ remotes: newPersistentRemotes(stateFile, p...), role: ca.WorkerRole, @@ -174,6 +175,8 @@ func New(c *Config) (*Node, error) { } } + n.connBroker = connectionbroker.New(n.remotes) + n.roleCond = sync.NewCond(n.RLocker()) n.connCond = sync.NewCond(n.RLocker()) return n, nil @@ -261,7 +264,7 @@ func (n *Node) run(ctx context.Context) (err error) { } }() - updates := ca.RenewTLSConfig(ctx, securityConfig, n.remotes, forceCertRenewal) + updates := ca.RenewTLSConfig(ctx, securityConfig, n.connBroker, forceCertRenewal) go func() { for { select { @@ -368,17 +371,35 @@ func (n *Node) Err(ctx context.Context) error { } func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportCredentials, ready chan<- struct{}) error { + waitCtx, waitCancel := context.WithCancel(ctx) + remotesCh := n.remotes.WaitSelect(ctx) + controlCh := n.ListenControlSocket(waitCtx) + +waitPeer: + for { + select { + case <-ctx.Done(): + break waitPeer + case <-remotesCh: + break waitPeer + case conn := <-controlCh: + if conn != nil { + break waitPeer + } + } + } + + waitCancel() + select { case <-ctx.Done(): - case <-n.remotes.WaitSelect(ctx): - } - if ctx.Err() != nil { return ctx.Err() + default: } a, err := agent.New(&agent.Config{ Hostname: n.config.Hostname, - Managers: n.remotes, + ConnBroker: n.connBroker, Executor: n.config.Executor, DB: db, NotifyNodeChange: n.notifyNodeChange, @@ -423,6 +444,7 @@ func (n *Node) setControlSocket(conn *grpc.ClientConn) { n.conn.Close() } n.conn = conn + n.connBroker.SetLocalConn(conn) n.connCond.Broadcast() n.Unlock() } @@ -447,15 +469,21 @@ func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn defer close(done) defer n.RUnlock() for { - if ctx.Err() != nil { + select { + case <-ctx.Done(): return + default: } if conn == n.conn { n.connCond.Wait() continue } conn = n.conn - c <- conn + select { + case c <- conn: + case <-ctx.Done(): + return + } } }() return c @@ -532,7 +560,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro } log.G(ctx).Debug("generated CA key and certificate") } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk - rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.remotes) + rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker) if err != nil { return nil, err } @@ -559,7 +587,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{ Token: n.config.JoinToken, Availability: n.config.Availability, - Remotes: n.remotes, + ConnBroker: n.connBroker, }) if err != nil { @@ -687,22 +715,6 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig go n.initManagerConnection(connCtx, ready) - // this happens only on initial start - if ready != nil { - go func(ready chan struct{}) { - select { - case <-ready: - addr, err := n.RemoteAPIAddr() - if err != nil { - log.G(ctx).WithError(err).Errorf("get remote api addr") - } else { - n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight) - } - case <-connCtx.Done(): - } - }(ready) - } - // wait for manager stop or for role change select { case <-done: diff --git a/node/node_test.go b/node/node_test.go index ef74912ef5..8ec421675b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -101,7 +101,7 @@ func TestLoadSecurityConfigLoadFromDisk(t *testing.T) { tc := cautils.NewTestCA(t) defer tc.Stop() - peer, err := tc.Remotes.Select() + peer, err := tc.ConnBroker.Remotes().Select() require.NoError(t, err) // Load successfully with valid passphrase @@ -169,7 +169,7 @@ func TestLoadSecurityConfigDownloadAllCerts(t *testing.T) { tc := cautils.NewTestCA(t) defer tc.Stop() - peer, err := tc.Remotes.Select() + peer, err := tc.ConnBroker.Remotes().Select() require.NoError(t, err) node, err = New(&Config{