diff --git a/CHANGELOG.md b/CHANGELOG.md index e23e1d43de9..f0a4f3066d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ BUG FIXES: * core: Fix search endpoint forwarding for multi-region clusters [[GH-3680](https://github.com/hashicorp/nomad/issues/3680)] + * core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)] * core: Fix an issue in which batch jobs with queued placements and lost allocations could result in improper placement counts [[GH-3717](https://github.com/hashicorp/nomad/issues/3717)] * config: Revert minimum CPU limit back to 20 from 100. diff --git a/client/client.go b/client/client.go index 1dca496ed92..314b9dda405 100644 --- a/client/client.go +++ b/client/client.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" + nconfig "github.com/hashicorp/nomad/nomad/structs/config" vaultapi "github.com/hashicorp/vault/api" "github.com/mitchellh/hashstructure" "github.com/shirou/gopsutil/host" @@ -364,6 +365,34 @@ func (c *Client) init() error { return nil } +// reloadTLSConnections allows a client to reload its TLS configuration on the +// fly +func (c *Client) reloadTLSConnections(newConfig *nconfig.TLSConfig) error { + var tlsWrap tlsutil.RegionWrapper + if newConfig != nil && newConfig.EnableRPC { + tw, err := tlsutil.NewTLSConfiguration(newConfig).OutgoingTLSWrapper() + if err != nil { + return err + } + tlsWrap = tw + } + + // Keep the client configuration up to date as we use configuration values to + // decide on what type of connections to accept + c.configLock.Lock() + c.config.TLSConfig = newConfig + c.configLock.Unlock() + + c.connPool.ReloadTLS(tlsWrap) + + return nil +} + +// Reload allows a client to reload its configuration on the fly +func (c *Client) Reload(newConfig *config.Config) error { + return c.reloadTLSConnections(newConfig.TLSConfig) +} + // Leave is used to prepare the client to leave the cluster func (c *Client) Leave() error { // TODO diff --git a/client/client_test.go b/client/client_test.go index 3492557f7f9..95ff480d357 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1001,3 +1001,156 @@ func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) { assert.Equal(c.ValidateMigrateToken("", ""), true) } + +func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + s1, addr := testServer(t, func(c *nomad.Config) { + c.Region = "regionFoo" + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + + c1 := testClient(t, func(c *config.Config) { + c.Servers = []string{addr} + }) + defer c1.Shutdown() + + // Registering a node over plaintext should succeed + { + req := structs.NodeSpecificRequest{ + NodeID: c1.Node().ID, + QueryOptions: structs.QueryOptions{Region: "regionFoo"}, + } + + testutil.WaitForResult(func() (bool, error) { + var out structs.SingleNodeResponse + err := c1.RPC("Node.GetNode", &req, &out) + if err != nil { + return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err) + } + return true, nil + }, + func(err error) { + t.Fatalf(err.Error()) + }, + ) + } + + newConfig := &nconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + + err := c1.reloadTLSConnections(newConfig) + assert.Nil(err) + + // Registering a node over plaintext should fail after the node has upgraded + // to TLS + { + req := structs.NodeSpecificRequest{ + NodeID: c1.Node().ID, + QueryOptions: structs.QueryOptions{Region: "regionFoo"}, + } + testutil.WaitForResult(func() (bool, error) { + var out structs.SingleNodeResponse + err := c1.RPC("Node.GetNode", &req, &out) + if err == nil { + return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", err) + } + return true, nil + }, + func(err error) { + t.Fatalf(err.Error()) + }, + ) + } +} + +func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + s1, addr := testServer(t, func(c *nomad.Config) { + c.Region = "regionFoo" + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + + c1 := testClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.TLSConfig = &nconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + }) + defer c1.Shutdown() + + // assert that when one node is running in encrypted mode, a RPC request to a + // node running in plaintext mode should fail + { + req := structs.NodeSpecificRequest{ + NodeID: c1.Node().ID, + QueryOptions: structs.QueryOptions{Region: "regionFoo"}, + } + testutil.WaitForResult(func() (bool, error) { + var out structs.SingleNodeResponse + err := c1.RPC("Node.GetNode", &req, &out) + if err == nil { + return false, fmt.Errorf("client RPC succeeded when it should have failed :\n%+v", err) + } + return true, nil + }, + func(err error) { + t.Fatalf(err.Error()) + }, + ) + } + + newConfig := &nconfig.TLSConfig{} + + err := c1.reloadTLSConnections(newConfig) + assert.Nil(err) + + // assert that when both nodes are in plaintext mode, a RPC request should + // succeed + { + req := structs.NodeSpecificRequest{ + NodeID: c1.Node().ID, + QueryOptions: structs.QueryOptions{Region: "regionFoo"}, + } + testutil.WaitForResult(func() (bool, error) { + var out structs.SingleNodeResponse + err := c1.RPC("Node.GetNode", &req, &out) + if err != nil { + return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err) + } + return true, nil + }, + func(err error) { + t.Fatalf(err.Error()) + }, + ) + } +} diff --git a/client/config/config.go b/client/config/config.go index 1f89e4033c6..8a16eb8bebe 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -347,9 +347,10 @@ func (c *Config) ReadStringListToMapDefault(key, defaultValue string) map[string return list } -// TLSConfig returns a TLSUtil Config based on the client configuration +// TLSConfiguration returns a TLSUtil Config based on the existing client +// configuration func (c *Config) TLSConfiguration() *tlsutil.Config { - tlsConf := &tlsutil.Config{ + return &tlsutil.Config{ VerifyIncoming: true, VerifyOutgoing: true, VerifyServerHostname: c.TLSConfig.VerifyServerHostname, @@ -358,5 +359,4 @@ func (c *Config) TLSConfiguration() *tlsutil.Config { KeyFile: c.TLSConfig.KeyFile, KeyLoader: c.TLSConfig.GetKeyLoader(), } - return tlsConf } diff --git a/command/agent/agent.go b/command/agent/agent.go index de05400fa3d..3ec9e00f0b4 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -730,37 +730,66 @@ func (a *Agent) Stats() map[string]map[string]string { return stats } +// ShouldReload determines if we should reload the configuration and agent +// connections. If the TLS Configuration has not changed, we shouldn't reload. +func (a *Agent) ShouldReload(newConfig *Config) (bool, bool) { + a.configLock.Lock() + defer a.configLock.Unlock() + if a.config.TLSConfig.Equals(newConfig.TLSConfig) { + return false, false + } + + return true, true // requires a reload of both agent and http server +} + // Reload handles configuration changes for the agent. Provides a method that // is easier to unit test, as this action is invoked via SIGHUP. func (a *Agent) Reload(newConfig *Config) error { a.configLock.Lock() defer a.configLock.Unlock() - if newConfig.TLSConfig != nil { - - // TODO(chelseakomlo) In a later PR, we will introduce the ability to reload - // TLS configuration if the agent is not running with TLS enabled. - if a.config.TLSConfig != nil { - // Reload the certificates on the keyloader and on success store the - // updated TLS config. It is important to reuse the same keyloader - // as this allows us to dynamically reload configurations not only - // on the Agent but on the Server and Client too (they are - // referencing the same keyloader). - keyloader := a.config.TLSConfig.GetKeyLoader() - _, err := keyloader.LoadKeyPair(newConfig.TLSConfig.CertFile, newConfig.TLSConfig.KeyFile) - if err != nil { - return err - } - a.config.TLSConfig = newConfig.TLSConfig - a.config.TLSConfig.KeyLoader = keyloader + if newConfig == nil || newConfig.TLSConfig == nil { + return fmt.Errorf("cannot reload agent with nil configuration") + } + + // This is just a TLS configuration reload, we don't need to refresh + // existing network connections + if !a.config.TLSConfig.IsEmpty() && !newConfig.TLSConfig.IsEmpty() { + + // Reload the certificates on the keyloader and on success store the + // updated TLS config. It is important to reuse the same keyloader + // as this allows us to dynamically reload configurations not only + // on the Agent but on the Server and Client too (they are + // referencing the same keyloader). + keyloader := a.config.TLSConfig.GetKeyLoader() + _, err := keyloader.LoadKeyPair(newConfig.TLSConfig.CertFile, newConfig.TLSConfig.KeyFile) + if err != nil { + return err } + a.config.TLSConfig = newConfig.TLSConfig + a.config.TLSConfig.KeyLoader = keyloader + return nil + } + + // Completely reload the agent's TLS configuration (moving from non-TLS to + // TLS, or vice versa) + // This does not handle errors in loading the new TLS configuration + a.config.TLSConfig = newConfig.TLSConfig.Copy() + + if newConfig.TLSConfig.IsEmpty() { + a.logger.Println("[WARN] agent: Downgrading agent's existing TLS configuration to plaintext") + } else { + a.logger.Println("[INFO] agent: Upgrading from plaintext configuration to TLS") } return nil } -// GetConfigCopy creates a replica of the agent's config, excluding locks +// GetConfig creates a locked reference to the agent's config func (a *Agent) GetConfig() *Config { + a.configLock.Lock() + defer a.configLock.Unlock() + return a.config } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 5dc8d8ce737..196b00d7759 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -746,3 +746,197 @@ func Test_GetConfig(t *testing.T) { actualAgentConfig := agent.GetConfig() assert.Equal(actualAgentConfig, agentConfig) } + +func TestServer_Reload_TLS_WithNilConfiguration(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + logger := log.New(ioutil.Discard, "", 0) + + agent := &Agent{ + logger: logger, + config: &Config{}, + } + + err := agent.Reload(nil) + assert.NotNil(err) + assert.Equal(err.Error(), "cannot reload agent with nil configuration") +} + +func TestServer_Reload_TLS_UpgradeToTLS(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../../helper/tlsutil/testdata/ca.pem" + foocert = "../../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + logger := log.New(ioutil.Discard, "", 0) + + agentConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{}, + } + + agent := &Agent{ + logger: logger, + config: agentConfig, + } + + newConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + }, + } + + err := agent.Reload(newConfig) + assert.Nil(err) + + assert.Equal(agent.config.TLSConfig.CAFile, newConfig.TLSConfig.CAFile) + assert.Equal(agent.config.TLSConfig.CertFile, newConfig.TLSConfig.CertFile) + assert.Equal(agent.config.TLSConfig.KeyFile, newConfig.TLSConfig.KeyFile) +} + +func TestServer_Reload_TLS_DowngradeFromTLS(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../../helper/tlsutil/testdata/ca.pem" + foocert = "../../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + logger := log.New(ioutil.Discard, "", 0) + + agentConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + }, + } + + agent := &Agent{ + logger: logger, + config: agentConfig, + } + + newConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{}, + } + + assert.False(agentConfig.TLSConfig.IsEmpty()) + + err := agent.Reload(newConfig) + assert.Nil(err) + + assert.True(agentConfig.TLSConfig.IsEmpty()) +} + +func TestServer_ShouldReload_ReturnFalseForNoChanges(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../../helper/tlsutil/testdata/ca.pem" + foocert = "../../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + logger := log.New(ioutil.Discard, "", 0) + + agentConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + }, + } + + sameAgentConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + }, + } + + agent := &Agent{ + logger: logger, + config: agentConfig, + } + + shouldReloadAgent, shouldReloadHTTPServer := agent.ShouldReload(sameAgentConfig) + assert.False(shouldReloadAgent) + assert.False(shouldReloadHTTPServer) +} + +func TestServer_ShouldReload_ReturnTrueForConfigChanges(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../../helper/tlsutil/testdata/ca.pem" + foocert = "../../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem" + foocert2 = "any_cert_path" + fookey2 = "any_key_path" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + logger := log.New(ioutil.Discard, "", 0) + + agentConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + }, + } + + newConfig := &Config{ + TLSConfig: &sconfig.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert2, + KeyFile: fookey2, + }, + } + + agent := &Agent{ + logger: logger, + config: agentConfig, + } + + shouldReloadAgent, shouldReloadHTTPServer := agent.ShouldReload(newConfig) + assert.True(shouldReloadAgent) + assert.True(shouldReloadHTTPServer) +} diff --git a/command/agent/command.go b/command/agent/command.go index c1be2286e05..74bbf96149e 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -598,6 +598,22 @@ WAIT: } } +// reloadHTTPServer shuts down the existing HTTP server and restarts it. This +// is helpful when reloading the agent configuration. +func (c *Command) reloadHTTPServer() error { + c.agent.logger.Println("[INFO] agent: Reloading HTTP server with new TLS configuration") + + c.httpServer.Shutdown() + + http, err := NewHTTPServer(c.agent, c.agent.config) + if err != nil { + return err + } + c.httpServer = http + + return nil +} + // handleReload is invoked when we should reload our configs, e.g. SIGHUP func (c *Command) handleReload() { c.Ui.Output("Reloading configuration...") @@ -620,20 +636,52 @@ func (c *Command) handleReload() { newConf.LogLevel = c.agent.GetConfig().LogLevel } - // Reloads configuration for an agent running in both client and server mode - err := c.agent.Reload(newConf) - if err != nil { - c.agent.logger.Printf("[ERR] agent: failed to reload the config: %v", err) + shouldReloadAgent, shouldReloadHTTPServer := c.agent.ShouldReload(newConf) + if shouldReloadAgent { + c.agent.logger.Printf("[DEBUG] agent: starting reload of agent config") + err := c.agent.Reload(newConf) + if err != nil { + c.agent.logger.Printf("[ERR] agent: failed to reload the config: %v", err) + return + } + + if s := c.agent.Server(); s != nil { + sconf, err := convertServerConfig(newConf, c.logOutput) + c.agent.logger.Printf("[DEBUG] agent: starting reload of server config") + if err != nil { + c.agent.logger.Printf("[ERR] agent: failed to convert server config: %v", err) + return + } else { + if err := s.Reload(sconf); err != nil { + c.agent.logger.Printf("[ERR] agent: reloading server config failed: %v", err) + return + } + } + } + + if s := c.agent.Client(); s != nil { + clientConfig, err := c.agent.clientConfig() + c.agent.logger.Printf("[DEBUG] agent: starting reload of client config") + if err != nil { + c.agent.logger.Printf("[ERR] agent: reloading client config failed: %v", err) + return + } + if err := c.agent.Client().Reload(clientConfig); err != nil { + c.agent.logger.Printf("[ERR] agent: reloading client config failed: %v", err) + return + } + } } - if s := c.agent.Server(); s != nil { - sconf, err := convertServerConfig(newConf, c.logOutput) + // reload HTTP server after we have reloaded both client and server, in case + // we error in either of the above cases. For example, reloading the http + // server to a TLS connection could succeed, while reloading the server's rpc + // connections could fail. + if shouldReloadHTTPServer { + err := c.reloadHTTPServer() if err != nil { - c.agent.logger.Printf("[ERR] agent: failed to convert server config: %v", err) - } else { - if err := s.Reload(sconf); err != nil { - c.agent.logger.Printf("[ERR] agent: reloading server config failed: %v", err) - } + c.agent.logger.Printf("[ERR] http: failed to reload the config: %v", err) + return } } } diff --git a/command/agent/http.go b/command/agent/http.go index 4146cffd4f6..b38aac7db59 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -47,11 +47,12 @@ var ( // HTTPServer is used to wrap an Agent and expose it over an HTTP interface type HTTPServer struct { - agent *Agent - mux *http.ServeMux - listener net.Listener - logger *log.Logger - Addr string + agent *Agent + mux *http.ServeMux + listener net.Listener + listenerCh chan struct{} + logger *log.Logger + Addr string } // NewHTTPServer starts new HTTP server over the agent @@ -89,11 +90,12 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) { // Create the server srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: ln, - logger: agent.logger, - Addr: ln.Addr().String(), + agent: agent, + mux: mux, + listener: ln, + listenerCh: make(chan struct{}), + logger: agent.logger, + Addr: ln.Addr().String(), } srv.registerHandlers(config.EnableDebug) @@ -103,7 +105,10 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) { return nil, err } - go http.Serve(ln, gzip(mux)) + go func() { + defer close(srv.listenerCh) + http.Serve(ln, gzip(mux)) + }() return srv, nil } @@ -130,6 +135,7 @@ func (s *HTTPServer) Shutdown() { if s != nil { s.logger.Printf("[DEBUG] http: Shutting down http server") s.listener.Close() + <-s.listenerCh // block until http.Serve has returned. } } diff --git a/helper/tlsutil/config.go b/helper/tlsutil/config.go index 3de5b925a7f..8f6b1f01df3 100644 --- a/helper/tlsutil/config.go +++ b/helper/tlsutil/config.go @@ -67,6 +67,18 @@ type Config struct { KeyLoader *config.KeyLoader } +func NewTLSConfiguration(newConf *config.TLSConfig) *Config { + return &Config{ + VerifyIncoming: true, + VerifyOutgoing: true, + VerifyServerHostname: newConf.VerifyServerHostname, + CAFile: newConf.CAFile, + CertFile: newConf.CertFile, + KeyFile: newConf.KeyFile, + KeyLoader: newConf.GetKeyLoader(), + } +} + // AppendCA opens and parses the CA file and adds the certificates to // the provided CertPool. func (c *Config) AppendCA(pool *x509.CertPool) error { diff --git a/nomad/pool.go b/nomad/pool.go index 320e7a3200d..017621c99e8 100644 --- a/nomad/pool.go +++ b/nomad/pool.go @@ -10,7 +10,7 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/net-rpc-msgpackrpc" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/yamux" ) @@ -175,6 +175,19 @@ func (p *ConnPool) Shutdown() error { return nil } +// ReloadTLS reloads TLS configuration on the fly +func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) { + p.Lock() + defer p.Unlock() + + oldPool := p.pool + for _, conn := range oldPool { + conn.Close() + } + p.pool = make(map[string]*Conn) + p.tlsWrap = tlsWrap +} + // Acquire is used to get a connection that is // pooled or to return a new connection func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) { diff --git a/nomad/raft_rpc.go b/nomad/raft_rpc.go index 31ac4d0a7d8..e7f73357d57 100644 --- a/nomad/raft_rpc.go +++ b/nomad/raft_rpc.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "net" "sync" @@ -20,7 +21,8 @@ type RaftLayer struct { connCh chan net.Conn // TLS wrapper - tlsWrap tlsutil.Wrapper + tlsWrap tlsutil.Wrapper + tlsWrapLock sync.RWMutex // Tracks if we are closed closed bool @@ -43,12 +45,14 @@ func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer { // Handoff is used to hand off a connection to the // RaftLayer. This allows it to be Accept()'ed -func (l *RaftLayer) Handoff(c net.Conn) error { +func (l *RaftLayer) Handoff(ctx context.Context, c net.Conn) error { select { case l.connCh <- c: return nil case <-l.closeCh: return fmt.Errorf("Raft RPC layer closed") + case <-ctx.Done(): + return nil } } @@ -75,6 +79,21 @@ func (l *RaftLayer) Close() error { return nil } +// getTLSWrapper is used to retrieve the current TLS wrapper +func (l *RaftLayer) getTLSWrapper() tlsutil.Wrapper { + l.tlsWrapLock.RLock() + defer l.tlsWrapLock.RUnlock() + return l.tlsWrap +} + +// ReloadTLS swaps the TLS wrapper. This is useful when upgrading or +// downgrading TLS connections. +func (l *RaftLayer) ReloadTLS(tlsWrap tlsutil.Wrapper) { + l.tlsWrapLock.Lock() + defer l.tlsWrapLock.Unlock() + l.tlsWrap = tlsWrap +} + // Addr is used to return the address of the listener func (l *RaftLayer) Addr() net.Addr { return l.addr @@ -87,8 +106,10 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net return nil, err } + tlsWrapper := l.getTLSWrapper() + // Check for tls mode - if l.tlsWrap != nil { + if tlsWrapper != nil { // Switch the connection into TLS mode if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil { conn.Close() @@ -96,7 +117,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net } // Wrap the connection in a TLS client - conn, err = l.tlsWrap(conn) + conn, err = tlsWrapper(conn) if err != nil { return nil, err } diff --git a/nomad/rpc.go b/nomad/rpc.go index 828ee0c94c0..1b7e8bccab8 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -68,26 +68,40 @@ func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { } // listen is used to listen for incoming RPC connections -func (s *Server) listen() { +func (s *Server) listen(ctx context.Context) { for { + select { + case <-ctx.Done(): + s.logger.Println("[INFO] nomad.rpc: Closing server RPC connection") + return + default: + } + // Accept a connection conn, err := s.rpcListener.Accept() if err != nil { if s.shutdown { return } + + select { + case <-ctx.Done(): + return + default: + } + s.logger.Printf("[ERR] nomad.rpc: failed to accept RPC conn: %v", err) continue } - go s.handleConn(conn, false) + go s.handleConn(ctx, conn, false) metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1) } } // handleConn is used to determine if this is a Raft or // Nomad type RPC connection and invoke the correct handler -func (s *Server) handleConn(conn net.Conn, isTLS bool) { +func (s *Server) handleConn(ctx context.Context, conn net.Conn, isTLS bool) { // Read a single byte buf := make([]byte, 1) if _, err := conn.Read(buf); err != nil { @@ -110,14 +124,14 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { // Switch on the byte switch RPCType(buf[0]) { case rpcNomad: - s.handleNomadConn(conn) + s.handleNomadConn(ctx, conn) case rpcRaft: metrics.IncrCounter([]string{"nomad", "rpc", "raft_handoff"}, 1) - s.raftLayer.Handoff(conn) + s.raftLayer.Handoff(ctx, conn) case rpcMultiplex: - s.handleMultiplex(conn) + s.handleMultiplex(ctx, conn) case rpcTLS: if s.rpcTLS == nil { @@ -126,7 +140,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { return } conn = tls.Server(conn, s.rpcTLS) - s.handleConn(conn, true) + s.handleConn(ctx, conn, true) default: s.logger.Printf("[ERR] nomad.rpc: unrecognized RPC byte: %v", buf[0]) @@ -137,7 +151,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { // handleMultiplex is used to multiplex a single incoming connection // using the Yamux multiplexer -func (s *Server) handleMultiplex(conn net.Conn) { +func (s *Server) handleMultiplex(ctx context.Context, conn net.Conn) { defer conn.Close() conf := yamux.DefaultConfig() conf.LogOutput = s.config.LogOutput @@ -150,16 +164,19 @@ func (s *Server) handleMultiplex(conn net.Conn) { } return } - go s.handleNomadConn(sub) + go s.handleNomadConn(ctx, sub) } } // handleNomadConn is used to service a single Nomad RPC connection -func (s *Server) handleNomadConn(conn net.Conn) { +func (s *Server) handleNomadConn(ctx context.Context, conn net.Conn) { defer conn.Close() rpcCodec := NewServerCodec(conn) for { select { + case <-ctx.Done(): + s.logger.Println("[INFO] nomad.rpc: Closing server RPC connection") + return case <-s.shutdownCh: return default: diff --git a/nomad/server.go b/nomad/server.go index 7648c74360f..41e58ded0bc 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "crypto/tls" "errors" "fmt" @@ -26,6 +27,7 @@ import ( "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" @@ -83,6 +85,7 @@ const ( // schedulers, and notification bus for agents. type Server struct { config *Config + logger *log.Logger // Connection pool to other Nomad servers @@ -104,12 +107,15 @@ type Server struct { fsm *nomadFSM // rpcListener is used to listen for incoming connections - rpcListener net.Listener + rpcListener net.Listener + listenerCh chan struct{} + rpcServer *rpc.Server rpcAdvertise net.Addr // rpcTLS is the TLS config for incoming TLS requests - rpcTLS *tls.Config + rpcTLS *tls.Config + rpcCancel context.CancelFunc // peers is used to track the known Nomad servers. This is // used for region forwarding and clustering. @@ -226,21 +232,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg } // Configure TLS - var tlsWrap tlsutil.RegionWrapper - var incomingTLS *tls.Config - if config.TLSConfig.EnableRPC { - tlsConf := config.tlsConfig() - tw, err := tlsConf.OutgoingTLSWrapper() - if err != nil { - return nil, err - } - tlsWrap = tw - - itls, err := tlsConf.IncomingTLSConfig() - if err != nil { - return nil, err - } - incomingTLS = itls + tlsConf := config.tlsConfig() + incomingTLS, tlsWrap, err := getTLSConf(config.TLSConfig.EnableRPC, tlsConf) + if err != nil { + return nil, err } // Create the ACL object cache @@ -328,8 +323,8 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg // Start ingesting events for Serf go s.serfEventHandler() - // Start the RPC listeners - go s.listen() + // start the RPC listener for the server + s.startRPCListener() // Emit metrics for the eval broker go evalBroker.EmitStats(time.Second, s.shutdownCh) @@ -353,6 +348,98 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg return s, nil } +// startRPCListener starts the server's the RPC listener +func (s *Server) startRPCListener() { + ctx, cancel := context.WithCancel(context.Background()) + s.rpcCancel = cancel + go func() { + defer close(s.listenerCh) + s.listen(ctx) + }() +} + +// createRPCListener creates the server's RPC listener +func (s *Server) createRPCListener() (*net.TCPListener, error) { + s.listenerCh = make(chan struct{}) + listener, err := net.ListenTCP("tcp", s.config.RPCAddr) + if err != nil { + s.logger.Printf("[ERR] nomad: error when initializing TLS listener %s", err) + return listener, err + } + + s.rpcListener = listener + return listener, nil +} + +// getTLSConf gets the server's TLS configuration based on the config supplied +// by the operator +func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.RegionWrapper, error) { + var tlsWrap tlsutil.RegionWrapper + var incomingTLS *tls.Config + if enableRPC { + tw, err := tlsConf.OutgoingTLSWrapper() + if err != nil { + return nil, nil, err + } + tlsWrap = tw + + itls, err := tlsConf.IncomingTLSConfig() + if err != nil { + return nil, nil, err + } + incomingTLS = itls + } + return incomingTLS, tlsWrap, nil +} + +// reloadTLSConnections updates a server's TLS configuration and reloads RPC +// connections. +func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { + s.logger.Printf("[INFO] nomad: reloading server connections due to configuration changes") + + tlsConf := tlsutil.NewTLSConfiguration(newTLSConfig) + incomingTLS, tlsWrap, err := getTLSConf(newTLSConfig.EnableRPC, tlsConf) + if err != nil { + s.logger.Printf("[ERR] nomad: unable to reset TLS context %s", err) + return err + } + + if s.rpcCancel == nil { + err = fmt.Errorf("No existing RPC server to reset.") + s.logger.Printf("[ERR] nomad: %s", err) + return err + } + + s.rpcCancel() + + // Keeping configuration in sync is important for other places that require + // access to config information, such as rpc.go, where we decide on what kind + // of network connections to accept depending on the server configuration + s.config.TLSConfig = newTLSConfig + + s.rpcTLS = incomingTLS + s.connPool.ReloadTLS(tlsWrap) + + // reinitialize our rpc listener + s.rpcListener.Close() + <-s.listenerCh + s.startRPCListener() + + listener, err := s.createRPCListener() + if err != nil { + listener.Close() + return err + } + + // Close and reload existing Raft connections + wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap) + s.raftLayer.ReloadTLS(wrapper) + s.raftTransport.CloseStreams() + + s.logger.Printf("[DEBUG] nomad: finished reloading server connections") + return nil +} + // Shutdown is used to shutdown the server func (s *Server) Shutdown() error { s.logger.Printf("[INFO] nomad: shutting down server") @@ -497,9 +584,10 @@ func (s *Server) Leave() error { return nil } -// Reload handles a config reload. Not all config fields can handle a reload. -func (s *Server) Reload(config *Config) error { - if config == nil { +// Reload handles a config reload specific to server-only configuration. Not +// all config fields can handle a reload. +func (s *Server) Reload(newConfig *Config) error { + if newConfig == nil { return fmt.Errorf("Reload given a nil config") } @@ -507,7 +595,14 @@ func (s *Server) Reload(config *Config) error { // Handle the Vault reload. Vault should never be nil but just guard. if s.vault != nil { - if err := s.vault.SetConfig(config.VaultConfig); err != nil { + if err := s.vault.SetConfig(newConfig.VaultConfig); err != nil { + multierror.Append(&mErr, err) + } + } + + if !newConfig.TLSConfig.Equals(s.config.TLSConfig) { + if err := s.reloadTLSConnections(newConfig.TLSConfig); err != nil { + s.logger.Printf("[ERR] nomad: error reloading server TLS configuration: %s", err) multierror.Append(&mErr, err) } } @@ -771,11 +866,11 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { s.rpcServer.Register(s.endpoints.Search) s.endpoints.Enterprise.Register(s) - list, err := net.ListenTCP("tcp", s.config.RPCAddr) + listener, err := s.createRPCListener() if err != nil { + listener.Close() return err } - s.rpcListener = list if s.config.RPCAdvertise != nil { s.rpcAdvertise = s.config.RPCAdvertise @@ -786,11 +881,11 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { // Verify that we have a usable advertise address addr, ok := s.rpcAdvertise.(*net.TCPAddr) if !ok { - list.Close() + listener.Close() return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr) } if addr.IP.IsUnspecified() { - list.Close() + listener.Close() return fmt.Errorf("RPC advertise address is not advertisable: %v", addr) } diff --git a/nomad/server_test.go b/nomad/server_test.go index 04175a2900a..bb3381293a1 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -8,17 +8,20 @@ import ( "net" "os" "path" + "strings" "sync/atomic" "testing" "time" "github.com/hashicorp/consul/lib/freeport" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" ) var ( @@ -276,3 +279,186 @@ func TestServer_Reload_Vault(t *testing.T) { t.Fatalf("Vault client should be running") } } + +func connectionReset(msg string) bool { + return strings.Contains(msg, "EOF") || strings.Contains(msg, "connection reset by peer") +} + +// Tests that the server will successfully reload its network connections, +// upgrading from plaintext to TLS if the server's TLS configuration changes. +func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + s1 := testServer(t, func(c *Config) { + c.DataDir = path.Join(dir, "nodeA") + }) + defer s1.Shutdown() + + // assert that the server started in plaintext mode + assert.Equal(s1.config.TLSConfig.CertFile, "") + + newTLSConfig := &config.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + + err := s1.reloadTLSConnections(newTLSConfig) + assert.Nil(err) + assert.True(s1.config.TLSConfig.Equals(newTLSConfig)) + + codec := rpcClient(t, s1) + + node := mock.Node() + req := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) + assert.NotNil(err) + assert.True(connectionReset(err.Error())) +} + +// Tests that the server will successfully reload its network connections, +// downgrading from TLS to plaintext if the server's TLS configuration changes. +func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + + dir := tmpDir(t) + defer os.RemoveAll(dir) + + s1 := testServer(t, func(c *Config) { + c.DataDir = path.Join(dir, "nodeB") + c.TLSConfig = &config.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + }) + defer s1.Shutdown() + + newTLSConfig := &config.TLSConfig{} + + err := s1.reloadTLSConnections(newTLSConfig) + assert.Nil(err) + assert.True(s1.config.TLSConfig.Equals(newTLSConfig)) + + codec := rpcClient(t, s1) + + node := mock.Node() + req := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) + assert.Nil(err) +} + +// Test that Raft connections are reloaded as expected when a Nomad server is +// upgraded from plaintext to TLS +func TestServer_Reload_TLSConnections_Raft(t *testing.T) { + assert := assert.New(t) + t.Parallel() + const ( + cafile = "../../helper/tlsutil/testdata/ca.pem" + foocert = "../../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem" + barcert = "../dev/tls_cluster/certs/nomad.pem" + barkey = "../dev/tls_cluster/certs/nomad-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + s1 := testServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.DevMode = false + c.DevDisableBootstrap = true + c.DataDir = path.Join(dir, "node1") + c.NodeName = "node1" + c.Region = "regionFoo" + }) + defer s1.Shutdown() + + s2 := testServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.DevMode = false + c.DevDisableBootstrap = true + c.DataDir = path.Join(dir, "node2") + c.NodeName = "node2" + c.Region = "regionFoo" + }) + defer s2.Shutdown() + + testJoin(t, s1, s2) + servers := []*Server{s1, s2} + + testutil.WaitForLeader(t, s1.RPC) + + newTLSConfig := &config.TLSConfig{ + EnableHTTP: true, + VerifyHTTPSClient: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + + err := s1.reloadTLSConnections(newTLSConfig) + assert.Nil(err) + + { + for _, serv := range servers { + testutil.WaitForResult(func() (bool, error) { + args := &structs.GenericRequest{} + var leader string + err := serv.RPC("Status.Leader", args, &leader) + if leader != "" && err != nil { + return false, fmt.Errorf("Should not have found leader but got %s", leader) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + } + } + + secondNewTLSConfig := &config.TLSConfig{ + EnableHTTP: true, + VerifyHTTPSClient: true, + CAFile: cafile, + CertFile: barcert, + KeyFile: barkey, + } + + // Now, transition the other server to TLS, which should restore their + // ability to communicate. + err = s2.reloadTLSConnections(secondNewTLSConfig) + assert.Nil(err) + + testutil.WaitForLeader(t, s2.RPC) +} diff --git a/nomad/structs/config/tls.go b/nomad/structs/config/tls.go index 65178168523..be1e8f42a1b 100644 --- a/nomad/structs/config/tls.go +++ b/nomad/structs/config/tls.go @@ -141,6 +141,20 @@ func (t *TLSConfig) Copy() *TLSConfig { return new } +func (t *TLSConfig) IsEmpty() bool { + if t == nil { + return true + } + + return t.EnableHTTP == false && + t.EnableRPC == false && + t.VerifyServerHostname == false && + t.CAFile == "" && + t.CertFile == "" && + t.KeyFile == "" && + t.VerifyHTTPSClient == false +} + // Merge is used to merge two TLS configs together func (t *TLSConfig) Merge(b *TLSConfig) *TLSConfig { result := t.Copy() @@ -171,3 +185,21 @@ func (t *TLSConfig) Merge(b *TLSConfig) *TLSConfig { } return result } + +// Equals compares the fields of two TLS configuration objects, returning a +// boolean indicating if they are the same. +// It is possible for either the calling TLSConfig to be nil, or the TLSConfig +// that it is being compared against, so we need to handle both places. See +// server.go Reload for example. +func (t *TLSConfig) Equals(newConfig *TLSConfig) bool { + if t == nil || newConfig == nil { + return t == newConfig + } + + return t.EnableRPC == newConfig.EnableRPC && + t.CAFile == newConfig.CAFile && + t.CertFile == newConfig.CertFile && + t.KeyFile == newConfig.KeyFile && + t.RPCUpgradeMode == newConfig.RPCUpgradeMode && + t.VerifyHTTPSClient == newConfig.VerifyHTTPSClient +} diff --git a/nomad/structs/config/tls_test.go b/nomad/structs/config/tls_test.go index b76360ba84f..e6dc3636340 100644 --- a/nomad/structs/config/tls_test.go +++ b/nomad/structs/config/tls_test.go @@ -25,3 +25,39 @@ func TestTLSConfig_Merge(t *testing.T) { new := a.Merge(b) assert.Equal(b, new) } + +func TestTLS_Equals_TrueWhenEmpty(t *testing.T) { + assert := assert.New(t) + a := &TLSConfig{} + b := &TLSConfig{} + assert.True(a.Equals(b)) +} + +func TestTLS_Equals_FalseWhenUnequal(t *testing.T) { + assert := assert.New(t) + a := &TLSConfig{CAFile: "abc", CertFile: "def", KeyFile: "ghi"} + b := &TLSConfig{CAFile: "jkl", CertFile: "def", KeyFile: "ghi"} + assert.False(a.Equals(b)) +} + +func TestTLS_Equals_TrueWhenEqual(t *testing.T) { + assert := assert.New(t) + a := &TLSConfig{CAFile: "abc", CertFile: "def", KeyFile: "ghi"} + b := &TLSConfig{CAFile: "abc", CertFile: "def", KeyFile: "ghi"} + assert.True(a.Equals(b)) +} + +func TestTLS_Copy(t *testing.T) { + assert := assert.New(t) + a := &TLSConfig{CAFile: "abc", CertFile: "def", KeyFile: "ghi"} + aCopy := a.Copy() + assert.True(a.Equals(aCopy)) +} + +// GetKeyLoader should always return an initialized KeyLoader for a TLSConfig +// object +func TestTLS_GetKeyloader(t *testing.T) { + assert := assert.New(t) + a := &TLSConfig{} + assert.NotNil(a.GetKeyLoader()) +} diff --git a/vendor/github.com/hashicorp/raft/Makefile b/vendor/github.com/hashicorp/raft/Makefile index 0910e2ceb21..75d947f13df 100644 --- a/vendor/github.com/hashicorp/raft/Makefile +++ b/vendor/github.com/hashicorp/raft/Makefile @@ -1,11 +1,14 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) test: - go test -timeout=60s ./... + go test -timeout=60s . integ: test - INTEG_TESTS=yes go test -timeout=25s -run=Integ ./... + INTEG_TESTS=yes go test -timeout=25s -run=Integ . +fuzz: + go test -timeout=300s ./fuzzy + deps: go get -d -v ./... echo $(DEPS) | xargs -n1 go get -d diff --git a/vendor/github.com/hashicorp/raft/README.md b/vendor/github.com/hashicorp/raft/README.md index 8778b13dc5c..a70ec8a08b5 100644 --- a/vendor/github.com/hashicorp/raft/README.md +++ b/vendor/github.com/hashicorp/raft/README.md @@ -3,7 +3,7 @@ raft [![Build Status](https://travis-ci.org/hashicorp/raft.png)](https://travis- raft is a [Go](http://www.golang.org) library that manages a replicated log and can be used with an FSM to manage replicated state machines. It -is library for providing [consensus](http://en.wikipedia.org/wiki/Consensus_(computer_science)). +is a library for providing [consensus](http://en.wikipedia.org/wiki/Consensus_(computer_science)). The use cases for such a library are far-reaching as replicated state machines are a key component of many distributed systems. They enable @@ -32,6 +32,24 @@ A pure Go backend using [BoltDB](https://github.com/boltdb/bolt) is also availab [raft-boltdb](https://github.com/hashicorp/raft-boltdb). It can also be used as a `LogStore` and `StableStore`. +## Tagged Releases + +As of September 2017, Hashicorp will start using tags for this library to clearly indicate +major version updates. We recommend you vendor your application's dependency on this library. + +* v0.1.0 is the original stable version of the library that was in master and has been maintained +with no breaking API changes. This was in use by Consul prior to version 0.7.0. + +* v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version +manages server identities using a UUID, so introduces some breaking API changes. It also versions +the Raft protocol, and requires some special steps when interoperating with Raft servers running +older versions of the library (see the detailed comment in config.go about version compatibility). +You can reference https://github.com/hashicorp/consul/pull/2222 for an idea of what was required +to port Consul to these new interfaces. + + This version includes some new features as well, including non voting servers, a new address + provider abstraction in the transport layer, and more resilient snapshots. + ## Protocol raft is based on ["Raft: In Search of an Understandable Consensus Algorithm"](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 147cde295c1..73f057c9858 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -492,6 +492,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna } r.processConfigurationLogEntry(&entry) } + r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v", r.configurations.latestIndex, r.configurations.latest.Servers) diff --git a/vendor/github.com/hashicorp/raft/configuration.go b/vendor/github.com/hashicorp/raft/configuration.go index 74508c5e530..8afc38bd93e 100644 --- a/vendor/github.com/hashicorp/raft/configuration.go +++ b/vendor/github.com/hashicorp/raft/configuration.go @@ -283,7 +283,7 @@ func encodePeers(configuration Configuration, trans Transport) []byte { var encPeers [][]byte for _, server := range configuration.Servers { if server.Suffrage == Voter { - encPeers = append(encPeers, trans.EncodePeer(server.Address)) + encPeers = append(encPeers, trans.EncodePeer(server.ID, server.Address)) } } diff --git a/vendor/github.com/hashicorp/raft/file_snapshot.go b/vendor/github.com/hashicorp/raft/file_snapshot.go index 119bfd308db..ffc9414542f 100644 --- a/vendor/github.com/hashicorp/raft/file_snapshot.go +++ b/vendor/github.com/hashicorp/raft/file_snapshot.go @@ -12,6 +12,7 @@ import ( "log" "os" "path/filepath" + "runtime" "sort" "strings" "time" @@ -406,17 +407,18 @@ func (s *FileSnapshotSink) Close() error { return err } - // fsync the parent directory, to sync directory edits to disk - parentFH, err := os.Open(s.parentDir) - defer parentFH.Close() - if err != nil { - s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) - return err - } + if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems + parentFH, err := os.Open(s.parentDir) + defer parentFH.Close() + if err != nil { + s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) + return err + } - if err = parentFH.Sync(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) - return err + if err = parentFH.Sync(); err != nil { + s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) + return err + } } // Reap any old snapshots diff --git a/vendor/github.com/hashicorp/raft/inmem_transport.go b/vendor/github.com/hashicorp/raft/inmem_transport.go index 3693cd5ad1e..ce37f63aa84 100644 --- a/vendor/github.com/hashicorp/raft/inmem_transport.go +++ b/vendor/github.com/hashicorp/raft/inmem_transport.go @@ -75,7 +75,7 @@ func (i *InmemTransport) LocalAddr() ServerAddress { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { +func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { i.RLock() peer, ok := i.peers[target] i.RUnlock() @@ -90,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipe } // AppendEntries implements the Transport interface. -func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { +func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -103,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntries } // RequestVote implements the Transport interface. -func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { +func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -116,7 +116,7 @@ func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequ } // InstallSnapshot implements the Transport interface. -func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout) if err != nil { return err @@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re } // EncodePeer implements the Transport interface. -func (i *InmemTransport) EncodePeer(p ServerAddress) []byte { +func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte { return []byte(p) } diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index 7c55ac5371f..454fc2a577b 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -2,6 +2,7 @@ package raft import ( "bufio" + "context" "errors" "fmt" "io" @@ -68,16 +69,45 @@ type NetworkTransport struct { maxPool int + serverAddressProvider ServerAddressProvider + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex stream StreamLayer + // streamCtx is used to cancel existing connection handlers. + streamCtx context.Context + streamCancel context.CancelFunc + streamCtxLock sync.RWMutex + timeout time.Duration TimeoutScale int } +// NetworkTransportConfig encapsulates configuration for the network transport layer. +type NetworkTransportConfig struct { + // ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC + ServerAddressProvider ServerAddressProvider + + Logger *log.Logger + + // Dialer + Stream StreamLayer + + // MaxPool controls how many connections we will pool + MaxPool int + + // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply + // the timeout by (SnapshotSize / TimeoutScale). + Timeout time.Duration +} + +type ServerAddressProvider interface { + ServerAddr(id ServerID) (ServerAddress, error) +} + // StreamLayer is used with the NetworkTransport to provide // the low level stream abstraction. type StreamLayer interface { @@ -112,6 +142,32 @@ type netPipeline struct { shutdownLock sync.Mutex } +// NewNetworkTransportWithConfig creates a new network transport with the given config struct +func NewNetworkTransportWithConfig( + config *NetworkTransportConfig, +) *NetworkTransport { + if config.Logger == nil { + config.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + trans := &NetworkTransport{ + connPool: make(map[ServerAddress][]*netConn), + consumeCh: make(chan RPC), + logger: config.Logger, + maxPool: config.MaxPool, + shutdownCh: make(chan struct{}), + stream: config.Stream, + timeout: config.Timeout, + TimeoutScale: DefaultTimeoutScale, + serverAddressProvider: config.ServerAddressProvider, + } + + // Create the connection context and then start our listener. + trans.setupStreamContext() + go trans.listen() + + return trans +} + // NewNetworkTransport creates a new network transport with the given dialer // and listener. The maxPool controls how many connections we will pool. The // timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply @@ -125,10 +181,12 @@ func NewNetworkTransport( if logOutput == nil { logOutput = os.Stderr } - return NewNetworkTransportWithLogger(stream, maxPool, timeout, log.New(logOutput, "", log.LstdFlags)) + logger := log.New(logOutput, "", log.LstdFlags) + config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} + return NewNetworkTransportWithConfig(config) } -// NewNetworkTransportWithLogger creates a new network transport with the given dialer +// NewNetworkTransportWithLogger creates a new network transport with the given logger, dialer // and listener. The maxPool controls how many connections we will pool. The // timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). @@ -138,21 +196,23 @@ func NewNetworkTransportWithLogger( timeout time.Duration, logger *log.Logger, ) *NetworkTransport { - if logger == nil { - logger = log.New(os.Stderr, "", log.LstdFlags) - } - trans := &NetworkTransport{ - connPool: make(map[ServerAddress][]*netConn), - consumeCh: make(chan RPC), - logger: logger, - maxPool: maxPool, - shutdownCh: make(chan struct{}), - stream: stream, - timeout: timeout, - TimeoutScale: DefaultTimeoutScale, - } - go trans.listen() - return trans + config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} + return NewNetworkTransportWithConfig(config) +} + +// setupStreamContext is used to create a new stream context. This should be +// called with the stream lock held. +func (n *NetworkTransport) setupStreamContext() { + ctx, cancel := context.WithCancel(context.Background()) + n.streamCtx = ctx + n.streamCancel = cancel +} + +// getStreamContext is used retrieve the current stream context. +func (n *NetworkTransport) getStreamContext() context.Context { + n.streamCtxLock.RLock() + defer n.streamCtxLock.RUnlock() + return n.streamCtx } // SetHeartbeatHandler is used to setup a heartbeat handler @@ -164,6 +224,31 @@ func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) { n.heartbeatFn = cb } +// CloseStreams closes the current streams. +func (n *NetworkTransport) CloseStreams() { + n.connPoolLock.Lock() + defer n.connPoolLock.Unlock() + + // Close all the connections in the connection pool and then remove their + // entry. + for k, e := range n.connPool { + for _, conn := range e { + conn.Release() + } + + delete(n.connPool, k) + } + + // Cancel the existing connections and create a new context. Both these + // operations must always be done with the lock held otherwise we can create + // connection handlers that are holding a context that will never be + // cancelable. + n.streamCtxLock.Lock() + n.streamCancel() + n.setupStreamContext() + n.streamCtxLock.Unlock() +} + // Close is used to stop the network transport. func (n *NetworkTransport) Close() error { n.shutdownLock.Lock() @@ -214,6 +299,24 @@ func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn { return conn } +// getConnFromAddressProvider returns a connection from the server address provider if available, or defaults to a connection using the target server address +func (n *NetworkTransport) getConnFromAddressProvider(id ServerID, target ServerAddress) (*netConn, error) { + address := n.getProviderAddressOrFallback(id, target) + return n.getConn(address) +} + +func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target ServerAddress) ServerAddress { + if n.serverAddressProvider != nil { + serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) + if err != nil { + n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err) + } else { + return serverAddressOverride + } + } + return target +} + // getConn is used to get a connection from the pool. func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) { // Check for a pooled conn @@ -260,9 +363,9 @@ func (n *NetworkTransport) returnConn(conn *netConn) { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { +func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { // Get a connection - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return nil, err } @@ -272,19 +375,19 @@ func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPi } // AppendEntries implements the Transport interface. -func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { - return n.genericRPC(target, rpcAppendEntries, args, resp) +func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { + return n.genericRPC(id, target, rpcAppendEntries, args, resp) } // RequestVote implements the Transport interface. -func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { - return n.genericRPC(target, rpcRequestVote, args, resp) +func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { + return n.genericRPC(id, target, rpcRequestVote, args, resp) } // genericRPC handles a simple request/response RPC. -func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { +func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { // Get a conn - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return err } @@ -308,9 +411,9 @@ func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args } // InstallSnapshot implements the Transport interface. -func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { // Get a conn, always close for InstallSnapshot - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return err } @@ -346,8 +449,9 @@ func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSn } // EncodePeer implements the Transport interface. -func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte { - return []byte(p) +func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte { + address := n.getProviderAddressOrFallback(id, p) + return []byte(address) } // DecodePeer implements the Transport interface. @@ -370,12 +474,14 @@ func (n *NetworkTransport) listen() { n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) // Handle the connection in dedicated routine - go n.handleConn(conn) + go n.handleConn(n.getStreamContext(), conn) } } -// handleConn is used to handle an inbound connection for its lifespan. -func (n *NetworkTransport) handleConn(conn net.Conn) { +// handleConn is used to handle an inbound connection for its lifespan. The +// handler will exit when the passed context is cancelled or the connection is +// closed. +func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { defer conn.Close() r := bufio.NewReader(conn) w := bufio.NewWriter(conn) @@ -383,6 +489,13 @@ func (n *NetworkTransport) handleConn(conn net.Conn) { enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) for { + select { + case <-connCtx.Done(): + n.logger.Println("[DEBUG] raft-net: stream layer is closed") + return + default: + } + if err := n.handleCommand(r, dec, enc); err != nil { if err != io.EOF { n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err) diff --git a/vendor/github.com/hashicorp/raft/observer.go b/vendor/github.com/hashicorp/raft/observer.go index 76c4d555dfa..bce17ef19a4 100644 --- a/vendor/github.com/hashicorp/raft/observer.go +++ b/vendor/github.com/hashicorp/raft/observer.go @@ -13,6 +13,11 @@ type Observation struct { Data interface{} } +// LeaderObservation is used for the data when leadership changes. +type LeaderObservation struct { + leader ServerAddress +} + // nextObserverId is used to provide a unique ID for each observer to aid in // deregistration. var nextObserverID uint64 diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index 0f89ccaa4d6..b5cc9ca980c 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -88,8 +88,12 @@ type leaderState struct { // setLeader is used to modify the current leader of the cluster func (r *Raft) setLeader(leader ServerAddress) { r.leaderLock.Lock() + oldLeader := r.leader r.leader = leader r.leaderLock.Unlock() + if oldLeader != leader { + r.observe(LeaderObservation{leader: leader}) + } } // requestConfigChange is a helper for the above functions that make @@ -1379,7 +1383,7 @@ func (r *Raft) electSelf() <-chan *voteResult { req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), - Candidate: r.trans.EncodePeer(r.localAddr), + Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, } @@ -1389,7 +1393,7 @@ func (r *Raft) electSelf() <-chan *voteResult { r.goFunc(func() { defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now()) resp := &voteResult{voterID: peer.ID} - err := r.trans.RequestVote(peer.Address, req, &resp.RequestVoteResponse) + err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err) resp.Term = req.Term diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index 68392734397..e631b5a09ba 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -157,7 +157,7 @@ PIPELINE: goto RPC } -// replicateTo is a hepler to replicate(), used to replicate the logs up to a +// replicateTo is a helper to replicate(), used to replicate the logs up to a // given last index. // If the follower log is behind, we take care to bring them up to date. func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) { @@ -183,7 +183,7 @@ START: // Make the RPC call start = time.Now() - if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err) s.failures++ return @@ -278,7 +278,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { RPCHeader: r.getRPCHeader(), SnapshotVersion: meta.Version, Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localAddr), + Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, Peers: meta.Peers, @@ -290,7 +290,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Make the call start := time.Now() var resp InstallSnapshotResponse - if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil { + if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err) s.failures++ return false, err @@ -332,7 +332,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localAddr), + Leader: r.trans.EncodePeer(r.localID, r.localAddr), } var resp AppendEntriesResponse for { @@ -345,7 +345,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { } start := time.Now() - if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err) failures++ select { @@ -367,7 +367,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { // back to the standard replication which can handle more complex situations. func (r *Raft) pipelineReplicate(s *followerReplication) error { // Create a new pipeline - pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address) + pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address) if err != nil { return err } @@ -476,7 +476,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm - req.Leader = r.trans.EncodePeer(r.localAddr) + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { return err diff --git a/vendor/github.com/hashicorp/raft/tag.sh b/vendor/github.com/hashicorp/raft/tag.sh new file mode 100755 index 00000000000..cd16623a70d --- /dev/null +++ b/vendor/github.com/hashicorp/raft/tag.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +# The version must be supplied from the environment. Do not include the +# leading "v". +if [ -z $VERSION ]; then + echo "Please specify a version." + exit 1 +fi + +# Generate the tag. +echo "==> Tagging version $VERSION..." +git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION" +git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master + +exit 0 diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index 9281508a050..29b2740f624 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -28,7 +28,7 @@ func NewTCPTransport( timeout time.Duration, logOutput io.Writer, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransport(stream, maxPool, timeout, logOutput) }) } @@ -42,15 +42,26 @@ func NewTCPTransportWithLogger( timeout time.Duration, logger *log.Logger, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger) }) } +// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of +// a TCP streaming transport layer, using a default logger and the address provider +func NewTCPTransportWithConfig( + bindAddr string, + advertise net.Addr, + config *NetworkTransportConfig, +) (*NetworkTransport, error) { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { + config.Stream = stream + return NewNetworkTransportWithConfig(config) + }) +} + func newTCPTransport(bindAddr string, advertise net.Addr, - maxPool int, - timeout time.Duration, transportCreator func(stream StreamLayer) *NetworkTransport) (*NetworkTransport, error) { // Try to bind list, err := net.Listen("tcp", bindAddr) diff --git a/vendor/github.com/hashicorp/raft/transport.go b/vendor/github.com/hashicorp/raft/transport.go index 633f97a8c5c..85459b221d1 100644 --- a/vendor/github.com/hashicorp/raft/transport.go +++ b/vendor/github.com/hashicorp/raft/transport.go @@ -35,20 +35,20 @@ type Transport interface { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. - AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) + AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) // AppendEntries sends the appropriate RPC to the target node. - AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error // RequestVote sends the appropriate RPC to the target node. - RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error + RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error // InstallSnapshot is used to push a snapshot down to a follower. The data is read from // the ReadCloser and streamed to the client. - InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error + InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error // EncodePeer is used to serialize a peer's address. - EncodePeer(ServerAddress) []byte + EncodePeer(id ServerID, addr ServerAddress) []byte // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress diff --git a/vendor/vendor.json b/vendor/vendor.json index c1f64dda14a..672ee1484b0 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -162,7 +162,7 @@ {"path":"github.com/hashicorp/logutils","revision":"0dc08b1671f34c4250ce212759ebd880f743d883"}, {"path":"github.com/hashicorp/memberlist","checksumSHA1":"1zk7IeGClUqBo+Phsx89p7fQ/rQ=","revision":"23ad4b7d7b38496cd64c241dfd4c60b7794c254a","revisionTime":"2017-02-08T21:15:06Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"}, - {"path":"github.com/hashicorp/raft","checksumSHA1":"ecpaHOImbL/NaivWrUDUUe5461E=","revision":"3a6f3bdfe4fc69e300c6d122b1a92051af6f0b95","revisionTime":"2017-08-07T22:22:24Z"}, + {"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"pvLOzocYsZtxuJ9pREHRTxYnoa4=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"}, diff --git a/website/source/guides/securing-nomad.html.md b/website/source/guides/securing-nomad.html.md index d4898f3dd62..e3a379405fc 100644 --- a/website/source/guides/securing-nomad.html.md +++ b/website/source/guides/securing-nomad.html.md @@ -472,6 +472,18 @@ NOTE: Dynamically reloading certificates will _not_ close existing connections. If you need to rotate certificates due to a security incident, you will still need to completely shutdown and restart the Nomad agent. +## Migrating a cluster to TLS + +Nomad supports dynamically reloading it's TLS configuration. To reload Nomad's +configuration, first update the configuration file and then send the Nomad +agent a SIGHUP signal. Note that this will only reload a subset of the +configuration file, including the TLS configuration. + +When reloading the configuration, if there is a change to the TLS +configuration, the agent will reload all network connections and when +establishing new connections, will use the new configuration. This process +works for both upgrading and downgrading TLS (but we recommend upgrading). + [cfssl]: https://cfssl.org/ [cfssl.json]: https://raw.githubusercontent.com/hashicorp/nomad/master/demo/vagrant/cfssl.json