Skip to content

Commit

Permalink
agent: Replace client/server with delegate interface
Browse files Browse the repository at this point in the history
This patch adds a new internal interface clientServer
which defines the common methods of consul.Client and
consul.Server. This allows to replace the following
code

    if a.server != nil {
        a.server.do()
    } else {
        a.client.do()
    }

with

    a.delegate.do()

In case a specific type is required a type check can
be performed:

    if srv, ok := a.delegate.(*consul.Server); ok {
        srv.doSrv()
    }
  • Loading branch information
magiconair committed May 15, 2017
1 parent 9b1bd51 commit e2c37b4
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 95 deletions.
130 changes: 52 additions & 78 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,31 @@ const (
"service, but no reason was provided. This is a default message."
)

var (
// dnsNameRe checks if a name or tag is dns-compatible.
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
)
// dnsNameRe checks if a name or tag is dns-compatible.
var dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)

// clientServer defines the interface shared by both
// consul.Client and consul.Server.
type clientServer interface {
Encrypted() bool
GetLANCoordinate() (*coordinate.Coordinate, error)
Leave() error
LANMembers() []serf.Member
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string) error
RPC(method string, args interface{}, reply interface{}) error
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn consul.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
}

/*
The agent is the long running process that is run on every machine.
It exposes an RPC interface that is used by the CLI to control the
agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
However, it can run in either a client, or server mode. In server
mode, it runs a full Consul server. In client-only mode, it only forwards
requests to other Consul servers.
*/
// The agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
config *Config

Expand All @@ -73,10 +85,9 @@ type Agent struct {
// Used for streaming logs to
logWriter *logger.LogWriter

// We have one of a client or a server, depending
// on our configuration
server *consul.Server
client *consul.Client
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate clientServer

// acls is an object that helps manage local ACL enforcement.
acls *aclManager
Expand Down Expand Up @@ -187,7 +198,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
// Setup either the client or the server.
if config.Server {
err = agent.setupServer()
agent.state.SetIface(agent.server)
agent.state.SetIface(agent.delegate)

// Automatically register the "consul" service on server nodes
consulService := structs.NodeService{
Expand All @@ -200,7 +211,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
agent.state.AddService(&consulService, agent.config.GetTokenForAgent())
} else {
err = agent.setupClient()
agent.state.SetIface(agent.client)
agent.state.SetIface(agent.delegate)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -605,7 +616,7 @@ func (a *Agent) setupServer() error {
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.server = server
a.delegate = server
return nil
}

Expand All @@ -622,7 +633,7 @@ func (a *Agent) setupClient() error {
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
a.client = client
a.delegate = client
return nil
}

Expand Down Expand Up @@ -784,10 +795,7 @@ LOAD:
// RPC is used to make an RPC call to the Consul servers
// This allows the agent to implement the Consul.Interface
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
if a.server != nil {
return a.server.RPC(method, args, reply)
}
return a.client.RPC(method, args, reply)
return a.delegate.RPC(method, args, reply)
}

// SnapshotRPC performs the requested snapshot RPC against the Consul server in
Expand All @@ -796,19 +804,12 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
// return payload will be written to out.
func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn consul.SnapshotReplyFn) error {

if a.server != nil {
return a.server.SnapshotRPC(args, in, out, replyFn)
}
return a.client.SnapshotRPC(args, in, out, replyFn)
return a.delegate.SnapshotRPC(args, in, out, replyFn)
}

// Leave is used to prepare the agent for a graceful shutdown
func (a *Agent) Leave() error {
if a.server != nil {
return a.server.Leave()
}
return a.client.Leave()
return a.delegate.Leave()
}

// Shutdown is used to hard stop the agent. Should be
Expand Down Expand Up @@ -840,12 +841,7 @@ func (a *Agent) Shutdown() error {
}

a.logger.Println("[INFO] agent: requesting shutdown")
var err error
if a.server != nil {
err = a.server.Shutdown()
} else {
err = a.client.Shutdown()
}
err := a.delegate.Shutdown()

pidErr := a.deletePid()
if pidErr != nil {
Expand All @@ -867,20 +863,16 @@ func (a *Agent) ShutdownCh() <-chan struct{} {
// JoinLAN is used to have the agent join a LAN cluster
func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs)
if a.server != nil {
n, err = a.server.JoinLAN(addrs)
} else {
n, err = a.client.JoinLAN(addrs)
}
n, err = a.delegate.JoinLAN(addrs)
a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err)
return
}

// JoinWAN is used to have the agent join a WAN cluster
func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs)
if a.server != nil {
n, err = a.server.JoinWAN(addrs)
if srv, ok := a.delegate.(*consul.Server); ok {
n, err = srv.JoinWAN(addrs)
} else {
err = fmt.Errorf("Must be a server to join WAN cluster")
}
Expand All @@ -891,11 +883,7 @@ func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
// ForceLeave is used to remove a failed node from the cluster
func (a *Agent) ForceLeave(node string) (err error) {
a.logger.Printf("[INFO] Force leaving node: %v", node)
if a.server != nil {
err = a.server.RemoveFailedNode(node)
} else {
err = a.client.RemoveFailedNode(node)
}
err = a.delegate.RemoveFailedNode(node)
if err != nil {
a.logger.Printf("[WARN] Failed to remove node: %v", err)
}
Expand All @@ -904,24 +892,18 @@ func (a *Agent) ForceLeave(node string) (err error) {

// LocalMember is used to return the local node
func (a *Agent) LocalMember() serf.Member {
if a.server != nil {
return a.server.LocalMember()
}
return a.client.LocalMember()
return a.delegate.LocalMember()
}

// LANMembers is used to retrieve the LAN members
func (a *Agent) LANMembers() []serf.Member {
if a.server != nil {
return a.server.LANMembers()
}
return a.client.LANMembers()
return a.delegate.LANMembers()
}

// WANMembers is used to retrieve the WAN members
func (a *Agent) WANMembers() []serf.Member {
if a.server != nil {
return a.server.WANMembers()
if srv, ok := a.delegate.(*consul.Server); ok {
return srv.WANMembers()
}
return nil
}
Expand All @@ -943,13 +925,10 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}

// Returns the coordinate of this node in the local pool (assumes coordinates
// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetCoordinate() (*coordinate.Coordinate, error) {
if a.config.Server {
return a.server.GetLANCoordinate()
}
return a.client.GetCoordinate()
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) {
return a.delegate.GetLANCoordinate()
}

// sendCoordinate is a long-running loop that periodically sends our coordinate
Expand All @@ -974,7 +953,7 @@ func (a *Agent) sendCoordinate() {
continue
}

c, err := a.GetCoordinate()
c, err := a.GetLANCoordinate()
if err != nil {
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
continue
Expand Down Expand Up @@ -1205,7 +1184,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string, persist bool) error {
// Protect "consul" service from deletion by a user
if a.server != nil && serviceID == consul.ConsulServiceID {
if _, ok := a.delegate.(*consul.Server); ok && serviceID == consul.ConsulServiceID {
return fmt.Errorf(
"Deregistering the %s service is not allowed",
consul.ConsulServiceID)
Expand Down Expand Up @@ -1563,12 +1542,7 @@ func (a *Agent) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
var stats map[string]map[string]string
if a.server != nil {
stats = a.server.Stats()
} else {
stats = a.client.Stats()
}
stats := a.delegate.Stats()
stats["agent"] = map[string]string{
"check_monitors": toString(uint64(len(a.checkMonitors))),
"check_ttls": toString(uint64(len(a.checkTTLs))),
Expand Down Expand Up @@ -1955,11 +1929,11 @@ func (a *Agent) DisableNodeMaintenance() {
// that not all agent methods use this mechanism, and that is should only
// be used for testing.
func (a *Agent) InjectEndpoint(endpoint string, handler interface{}) error {
if a.server == nil {
srv, ok := a.delegate.(*consul.Server)
if !ok {
return fmt.Errorf("agent must be a server")
}

if err := a.server.InjectEndpoint(handler); err != nil {
if err := srv.InjectEndpoint(handler); err != nil {
return err
}
name := reflect.Indirect(reflect.ValueOf(handler)).Type().Name()
Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
var c *coordinate.Coordinate
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetCoordinate(); err != nil {
if c, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("incorrect port: %v", obj)
}

c, err := srv.agent.server.GetLANCoordinate()
c, err := srv.agent.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1978,7 +1978,7 @@ func TestAgent_GetCoordinate(t *testing.T) {
// sure that the agent chooses the correct Serf instance,
// depending on how it's configured as a client or a server.
// If it chooses the wrong one, this will crash.
if _, err := agent.GetCoordinate(); err != nil {
if _, err := agent.GetLANCoordinate(); err != nil {
t.Fatalf("err: %s", err)
}
}
Expand Down
20 changes: 9 additions & 11 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/armon/go-metrics/circonus"
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/command/base"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
Expand Down Expand Up @@ -657,13 +658,15 @@ func (c *Command) gossipEncrypted() bool {
return true
}

server := c.agent.server
if server != nil {
server, ok := c.agent.delegate.(*consul.Server)
if ok {
return server.KeyManagerLAN() != nil || server.KeyManagerWAN() != nil
}

client := c.agent.client
return client != nil && client.KeyManagerLAN() != nil
client, ok := c.agent.delegate.(*consul.Client)
if ok {
return client != nil && client.KeyManagerLAN() != nil
}
panic(fmt.Sprintf("delegate is neither server nor client: %T", c.agent.delegate))
}

func (c *Command) Run(args []string) int {
Expand Down Expand Up @@ -846,12 +849,7 @@ func (c *Command) Run(args []string) int {
}

// Figure out if gossip is encrypted
var gossipEncrypted bool
if config.Server {
gossipEncrypted = c.agent.server.Encrypted()
} else {
gossipEncrypted = c.agent.client.Encrypted()
}
gossipEncrypted := c.agent.delegate.Encrypted()

// Let the agent know we've finished registration
c.agent.StartSync()
Expand Down
4 changes: 3 additions & 1 deletion command/agent/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
Expand Down Expand Up @@ -111,7 +112,8 @@ func loadKeyringFile(c *serf.Config) error {
// performing various operations on the encryption keyring.
func (a *Agent) keyringProcess(args *structs.KeyringRequest) (*structs.KeyringResponses, error) {
var reply structs.KeyringResponses
if a.server == nil {

if _, ok := a.delegate.(*consul.Server); !ok {
return nil, fmt.Errorf("keyring operations must run against a server node")
}
if err := a.RPC("Internal.KeyringOperation", args, &reply); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// GetCoordinate returns the network coordinate of the current node, as
// GetLANCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) {
func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) {
return c.serf.GetCoordinate()
}

0 comments on commit e2c37b4

Please sign in to comment.