Skip to content

Commit

Permalink
Merge pull request #1199 from hashicorp/f-scada-reload
Browse files Browse the repository at this point in the history
SCADA client is reload-able
  • Loading branch information
ryanuber committed Aug 26, 2015
2 parents 6ad48db + 5ad8bfb commit b0fcb6c
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 37 deletions.
2 changes: 2 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func nextConfig() *Config {
idx := int(atomic.AddUint64(&offset, 1))
conf := DefaultConfig()

conf.Version = "a.b"
conf.VersionPrerelease = "c.d"
conf.AdvertiseAddr = "127.0.0.1"
conf.Bootstrap = true
conf.Datacenter = "dc1"
Expand Down
68 changes: 53 additions & 15 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Command struct {
httpServers []*HTTPServer
dnsServer *DNSServer
scadaProvider *scada.Provider
scadaHttp *HTTPServer
}

// readConfig is responsible for setup of our configuration using
Expand Down Expand Up @@ -345,20 +346,14 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter)

// Enable the SCADA integration
var scadaList net.Listener
if config.AtlasInfrastructure != "" {
provider, list, err := NewProvider(config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}
c.scadaProvider = provider
scadaList = list
if err := c.setupScadaConn(config); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}

if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil {
servers, err := NewHTTPServers(agent, config, scadaList, logOutput)
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent, config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err))
Expand Down Expand Up @@ -684,9 +679,16 @@ AFTER_MIGRATE:
for _, server := range c.httpServers {
defer server.Shutdown()
}
if c.scadaProvider != nil {
defer c.scadaProvider.Shutdown()
}

// Check and shut down the SCADA listeners at the end
defer func() {
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
}()

// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
Expand Down Expand Up @@ -904,9 +906,45 @@ func (c *Command) handleReload(config *Config) *Config {
}(wp)
}

// Reload SCADA client if we have a change
if newConf.AtlasInfrastructure != config.AtlasInfrastructure ||
newConf.AtlasToken != config.AtlasToken {
if err := c.setupScadaConn(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err))
return nil
}
}

return newConf
}

// startScadaClient is used to start a new SCADA provider and listener,
// replacing any existing listeners.
func (c *Command) setupScadaConn(config *Config) error {
// Shut down existing SCADA listeners
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}

// No-op if we don't have an infrastructure
if config.AtlasInfrastructure == "" {
return nil
}

// Create the new provider and listener
c.Ui.Output("Connecting to Atlas: " + config.AtlasInfrastructure)
provider, list, err := NewProvider(config, c.logOutput)
if err != nil {
return err
}
c.scadaProvider = provider
c.scadaHttp = newScadaHttp(c.agent, list)
return nil
}

func (c *Command) Synopsis() string {
return "Runs a Consul agent"
}
Expand Down
53 changes: 53 additions & 0 deletions command/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"os"
"strings"
"testing"

"github.com/hashicorp/consul/testutil"
Expand Down Expand Up @@ -246,3 +247,55 @@ func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) {
t.Fatalf("bad permissions: %s", fi.Mode())
}
}

func TestSetupScadaConn(t *testing.T) {
// Create a config and assign an infra name
conf1 := nextConfig()
conf1.AtlasInfrastructure = "hashicorp/test1"
conf1.AtlasToken = "abc"

dir, agent := makeAgent(t, conf1)
defer os.RemoveAll(dir)
defer agent.Shutdown()

cmd := &Command{
ShutdownCh: make(chan struct{}),
Ui: new(cli.MockUi),
agent: agent,
}

// First start creates the scada conn
if err := cmd.setupScadaConn(conf1); err != nil {
t.Fatalf("err: %s", err)
}
list := cmd.scadaHttp.listener.(*scadaListener)
if list == nil || list.addr.infra != "hashicorp/test1" {
t.Fatalf("bad: %#v", list)
}
http1 := cmd.scadaHttp
provider1 := cmd.scadaProvider

// Performing setup again tears down original and replaces
// with a new SCADA client.
conf2 := nextConfig()
conf2.AtlasInfrastructure = "hashicorp/test2"
conf2.AtlasToken = "123"
if err := cmd.setupScadaConn(conf2); err != nil {
t.Fatalf("err: %s", err)
}
if cmd.scadaHttp == http1 || cmd.scadaProvider == provider1 {
t.Fatalf("should change: %#v %#v", cmd.scadaHttp, cmd.scadaProvider)
}
list = cmd.scadaHttp.listener.(*scadaListener)
if list == nil || list.addr.infra != "hashicorp/test2" {
t.Fatalf("bad: %#v", list)
}

// Original provider and listener must be closed
if !provider1.IsShutdown() {
t.Fatalf("should be shutdown")
}
if _, err := http1.listener.Accept(); !strings.Contains(err.Error(), "closed") {
t.Fatalf("should be closed")
}
}
39 changes: 20 additions & 19 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type HTTPServer struct {

// NewHTTPServers starts new HTTP servers to provide an interface to
// the agent.
func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) {
func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) {
var servers []*HTTPServer

if config.Ports.HTTPS > 0 {
Expand Down Expand Up @@ -142,27 +142,28 @@ func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput
servers = append(servers, srv)
}

if scada != nil {
// Create the mux
mux := http.NewServeMux()

// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: scada,
logger: log.New(logOutput, "", log.LstdFlags),
uiDir: config.UiDir,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA
return servers, nil
}

// Start the server
go http.Serve(scada, mux)
servers = append(servers, srv)
// newScadaHttp creates a new HTTP server wrapping the SCADA
// listener such that HTTP calls can be sent from the brokers.
func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer {
// Create the mux
mux := http.NewServeMux()

// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: agent.logger,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA

return servers, nil
// Start the server
go http.Serve(list, mux)
return srv
}

// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
Expand Down
37 changes: 35 additions & 2 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe
t.Fatalf("err: %v", err)
}
conf.UiDir = uiDir
servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput)
servers, err := NewHTTPServers(agent, conf, agent.logOutput)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) {
defer os.RemoveAll(dir)

// Try to start the server with the same path anyways.
if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil {
if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil {
t.Fatalf("err: %s", err)
}

Expand Down Expand Up @@ -516,6 +516,39 @@ func TestACLResolution(t *testing.T) {
})
}

func TestScadaHTTP(t *testing.T) {
// Create the agent
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()

// Create a generic listener
list, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("err: %s", err)
}
defer list.Close()

// Create the SCADA HTTP server
scadaHttp := newScadaHttp(agent, list)

// Returned server uses the listener and scada addr
if scadaHttp.listener != list {
t.Fatalf("bad listener: %#v", scadaHttp)
}
if scadaHttp.addr != scadaHTTPAddr {
t.Fatalf("expected %v, got: %v", scadaHttp.addr, scadaHTTPAddr)
}

// Check that debug endpoints were not enabled. This will cause
// the serve mux to panic if the routes are already handled.
mockFn := func(w http.ResponseWriter, r *http.Request) {}
scadaHttp.mux.HandleFunc("/debug/pprof/", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/cmdline", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/profile", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/symbol", mockFn)
}

// assertIndex tests that X-Consul-Index is set and non-zero
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
header := resp.Header().Get("X-Consul-Index")
Expand Down
2 changes: 1 addition & 1 deletion command/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper {

conf.Addresses.HTTP = "127.0.0.1"
httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP)
http, err := agent.NewHTTPServers(a, conf, nil, os.Stderr)
http, err := agent.NewHTTPServers(a, conf, os.Stderr)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
Expand Down
2 changes: 2 additions & 0 deletions website/source/docs/agent/options.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,5 @@ items which are reloaded include:
* Services
* Watches
* HTTP Client Address
* Atlas Token
* Atlas Infrastructure

0 comments on commit b0fcb6c

Please sign in to comment.