Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows disabling WAN federation by setting serf WAN port to -1 #3984

Merged
merged 4 commits into from
Mar 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,16 +703,21 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.SerfLANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfLANProbeTimeout
base.SerfLANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfLANSuspicionMult

base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming
base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing
base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval
base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval
base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout
base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult
if a.config.SerfBindAddrWAN != nil {
base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming
base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing
base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval
base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval
base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout
base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult
} else {
// Disable serf WAN federation
base.SerfWANConfig = nil
}

base.RPCAddr = a.config.RPCBindAddr
base.RPCAdvertise = a.config.RPCAdvertiseAddr
Expand Down Expand Up @@ -1019,6 +1024,7 @@ func (a *Agent) setupNodeID(config *config.RuntimeConfig) error {
func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
// If the keyring file is disabled then just poke the provided key
// into the in-memory keyring.
federationEnabled := config.SerfWANConfig != nil
if a.config.DisableKeyringFile {
if a.config.EncryptKey == "" {
return nil
Expand All @@ -1028,7 +1034,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
if err := loadKeyring(config.SerfLANConfig, keys); err != nil {
return err
}
if a.config.ServerMode {
if a.config.ServerMode && federationEnabled {
if err := loadKeyring(config.SerfWANConfig, keys); err != nil {
return err
}
Expand All @@ -1048,7 +1054,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
return err
}
}
if a.config.ServerMode {
if a.config.ServerMode && federationEnabled {
if _, err := os.Stat(fileWAN); err != nil {
if err := initKeyring(fileWAN, a.config.EncryptKey); err != nil {
return err
Expand All @@ -1063,7 +1069,7 @@ LOAD:
if err := loadKeyringFile(config.SerfLANConfig); err != nil {
return err
}
if a.config.ServerMode {
if a.config.ServerMode && federationEnabled {
if _, err := os.Stat(fileWAN); err == nil {
config.SerfWANConfig.KeyringFile = fileWAN
}
Expand Down
23 changes: 16 additions & 7 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,6 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
if ipaddr.IsAny(b.stringVal(c.AdvertiseAddrWAN)) {
return RuntimeConfig{}, fmt.Errorf("Advertise WAN address cannot be 0.0.0.0, :: or [::]")
}
if serfPortWAN < 0 {
return RuntimeConfig{}, fmt.Errorf("ports.serf_wan must be a valid port from 1 to 65535")
}

bindAddr := bindAddrs[0].(*net.IPAddr)
advertiseAddr := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), bindAddr)
Expand Down Expand Up @@ -411,14 +408,23 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
// derive other bind addresses from the bindAddr
rpcBindAddr := b.makeTCPAddr(bindAddr, nil, serverPort)
serfBindAddrLAN := b.makeTCPAddr(b.expandFirstIP("serf_lan", c.SerfBindAddrLAN), bindAddr, serfPortLAN)
serfBindAddrWAN := b.makeTCPAddr(b.expandFirstIP("serf_wan", c.SerfBindAddrWAN), bindAddr, serfPortWAN)

// Only initialize serf WAN bind address when its enabled
var serfBindAddrWAN *net.TCPAddr
if serfPortWAN >= 0 {
serfBindAddrWAN = b.makeTCPAddr(b.expandFirstIP("serf_wan", c.SerfBindAddrWAN), bindAddr, serfPortWAN)
}

// derive other advertise addresses from the advertise address
advertiseAddrLAN := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), advertiseAddr)
advertiseAddrWAN := b.makeIPAddr(b.expandFirstIP("advertise_addr_wan", c.AdvertiseAddrWAN), advertiseAddrLAN)
rpcAdvertiseAddr := &net.TCPAddr{IP: advertiseAddrLAN.IP, Port: serverPort}
serfAdvertiseAddrLAN := &net.TCPAddr{IP: advertiseAddrLAN.IP, Port: serfPortLAN}
serfAdvertiseAddrWAN := &net.TCPAddr{IP: advertiseAddrWAN.IP, Port: serfPortWAN}
// Only initialize serf WAN advertise address when its enabled
var serfAdvertiseAddrWAN *net.TCPAddr
if serfPortWAN >= 0 {
serfAdvertiseAddrWAN = &net.TCPAddr{IP: advertiseAddrWAN.IP, Port: serfPortWAN}
}

// determine client addresses
clientAddrs := b.expandIPs("client_addr", c.ClientAddr)
Expand Down Expand Up @@ -869,8 +875,11 @@ func (b *Builder) Validate(rt RuntimeConfig) error {
if err := addrUnique(inuse, "Serf Advertise LAN", rt.SerfAdvertiseAddrLAN); err != nil {
return err
}
if err := addrUnique(inuse, "Serf Advertise WAN", rt.SerfAdvertiseAddrWAN); err != nil {
return err
// Validate serf WAN advertise address only when its set
if rt.SerfAdvertiseAddrWAN != nil {
if err := addrUnique(inuse, "Serf Advertise WAN", rt.SerfAdvertiseAddrWAN); err != nil {
return err
}
}
if b.err != nil {
return b.err
Expand Down
14 changes: 12 additions & 2 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
{
desc: "serf wan port > 0",
desc: "allow disabling serf wan port",
args: []string{`-data-dir=` + dataDir},
json: []string{`{
"ports": {
Expand All @@ -1079,7 +1079,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
advertise_addr_wan = "1.2.3.4"
`},
err: "ports.serf_wan must be a valid port from 1 to 65535",
patch: func(rt *RuntimeConfig) {
rt.AdvertiseAddrWAN = ipAddr("1.2.3.4")
rt.SerfAdvertiseAddrWAN = nil
rt.SerfBindAddrWAN = nil
rt.TaggedAddresses = map[string]string{
"lan": "10.0.0.1",
"wan": "1.2.3.4",
}
rt.DataDir = dataDir
rt.SerfPortWAN = -1
},
},
{
desc: "serf bind address lan template",
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
return err
}

if len(dcs) == 0 { // no WAN federation, so return the local data center name
dcs = []string{c.srv.config.Datacenter}
}

*reply = dcs
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/internal_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Internal) KeyringOperation(
}

// Only perform WAN keyring querying and RPC forwarding once
if !args.Forwarded {
if !args.Forwarded && m.srv.serfWAN != nil {
args.Forwarded = true
m.executeKeyringOp(args, reply, true)
return m.srv.globalRPC("Internal.KeyringOperation", args, reply)
Expand Down
82 changes: 49 additions & 33 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -75,6 +74,10 @@ const (
raftRemoveGracePeriod = 5 * time.Second
)

var (
ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
)

// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
Expand Down Expand Up @@ -344,21 +347,23 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// created, so we can pull it out from there reliably, even though it's
// a little gross to be reading the updated config.

// Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}

// See big comment above why we are doing this.
if serfBindPortWAN == 0 {
// Initialize the WAN Serf if enabled
serfBindPortWAN := -1
if config.SerfWANConfig != nil {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
// See big comment above why we are doing this.
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}

// Initialize the LAN segments before the default LAN Serf so we have
Expand All @@ -380,20 +385,22 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
s.floodSegments(config)

// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
if s.serfWAN != nil {
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)

// Fire up the LAN <-> WAN join flooder.
portFn := func(s *metadata.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *metadata.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
}
return 0, false
}
return 0, false
go s.Flood(nil, portFn, s.serfWAN)
}
go s.Flood(nil, portFn, s.serfWAN)

// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
Expand Down Expand Up @@ -831,6 +838,9 @@ func (s *Server) JoinLAN(addrs []string) (int, error) {
// The target address should be another node listening on the
// Serf WAN address
func (s *Server) JoinWAN(addrs []string) (int, error) {
if s.serfWAN == nil {
return 0, ErrWANFederationDisabled
}
return s.serfWAN.Join(addrs, true)
}

Expand All @@ -846,6 +856,9 @@ func (s *Server) LANMembers() []serf.Member {

// WANMembers is used to return the members of the LAN cluster
func (s *Server) WANMembers() []serf.Member {
if s.serfWAN == nil {
return nil
}
return s.serfWAN.Members()
}

Expand All @@ -854,8 +867,10 @@ func (s *Server) RemoveFailedNode(node string) error {
if err := s.serfLAN.RemoveFailedNode(node); err != nil {
return err
}
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
if s.serfWAN != nil {
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
}
}
return nil
}
Expand All @@ -877,7 +892,11 @@ func (s *Server) KeyManagerWAN() *serf.KeyManager {

// Encrypted determines if gossip is encrypted
func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
LANEncrypted := s.serfLAN.EncryptionEnabled()
if s.serfWAN == nil {
return LANEncrypted
}
return LANEncrypted && s.serfWAN.EncryptionEnabled()
}

// LANSegments returns a map of LAN segments by name
Expand Down Expand Up @@ -995,9 +1014,11 @@ func (s *Server) Stats() map[string]map[string]string {
},
"raft": s.raft.Stats(),
"serf_lan": s.serfLAN.Stats(),
"serf_wan": s.serfWAN.Stats(),
"runtime": runtimeStats(),
}
if s.serfWAN != nil {
stats["serf_wan"] = s.serfWAN.Stats()
}
return stats
}

Expand All @@ -1019,11 +1040,6 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
return cs, nil
}

// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup


// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
Expand Down
4 changes: 3 additions & 1 deletion agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
if wanPort > 0 {
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, I checked and it seems the only consumer of that tag in our code already handles the case where it's not present:

wanJoinPortStr, ok := m.Tags["wan_join_port"]
if ok {
wanJoinPort, err = strconv.Atoi(wanJoinPortStr)
if err != nil {
return false, nil
}
}

}
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
Expand Down