Skip to content

Commit

Permalink
Switch to using the external autopilot module
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeler committed Nov 5, 2020
1 parent 52f3714 commit 3c73d4a
Show file tree
Hide file tree
Showing 91 changed files with 14,726 additions and 1,602 deletions.
4 changes: 4 additions & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,10 @@ func (b *Builder) Validate(rt RuntimeConfig) error {
// check required params we cannot recover from first
//

if rt.RaftProtocol != 3 {
return fmt.Errorf("raft_protocol version %d is not supported by this version of Consul", rt.RaftProtocol)
}

if err := validateBasicName("datacenter", rt.Datacenter, false); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func DefaultSource() Source {
expose_min_port = 21500
expose_max_port = 21755
}
raft_protocol = 3
telemetry = {
metrics_prefix = "consul"
filter_default = true
Expand Down
21 changes: 15 additions & 6 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,14 +662,22 @@ func TestBuilder_BuildAndValidate_ConfigFlagsAndEdgecases(t *testing.T) {
{
desc: "-raft-protocol",
args: []string{
`-raft-protocol=1`,
`-raft-protocol=3`,
`-data-dir=` + dataDir,
},
patch: func(rt *RuntimeConfig) {
rt.RaftProtocol = 1
rt.RaftProtocol = 3
rt.DataDir = dataDir
},
},
{
desc: "-raft-protocol unsupported",
args: []string{
`-raft-protocol=2`,
`-data-dir=` + dataDir,
},
err: "raft_protocol version 2 is not supported by this version of Consul",
},
{
desc: "-recursor",
args: []string{
Expand Down Expand Up @@ -5302,7 +5310,7 @@ func TestFullConfig(t *testing.T) {
"primary_datacenter": "ejtmd43d",
"primary_gateways": [ "aej8eeZo", "roh2KahS" ],
"primary_gateways_interval": "18866s",
"raft_protocol": 19016,
"raft_protocol": 3,
"raft_snapshot_threshold": 16384,
"raft_snapshot_interval": "30s",
"raft_trailing_logs": 83749,
Expand Down Expand Up @@ -5991,7 +5999,7 @@ func TestFullConfig(t *testing.T) {
primary_datacenter = "ejtmd43d"
primary_gateways = [ "aej8eeZo", "roh2KahS" ]
primary_gateways_interval = "18866s"
raft_protocol = 19016
raft_protocol = 3
raft_snapshot_threshold = 16384
raft_snapshot_interval = "30s"
raft_trailing_logs = 83749
Expand Down Expand Up @@ -6753,7 +6761,7 @@ func TestFullConfig(t *testing.T) {
RPCRateLimit: 12029.43,
RPCMaxBurst: 44848,
RPCMaxConnsPerClient: 2954,
RaftProtocol: 19016,
RaftProtocol: 3,
RaftSnapshotThreshold: 16384,
RaftSnapshotInterval: 30 * time.Second,
RaftTrailingLogs: 83749,
Expand Down Expand Up @@ -7434,6 +7442,7 @@ func TestSanitize(t *testing.T) {
EntryFetchRate: 0.334,
},
ConsulCoordinateUpdatePeriod: 15 * time.Second,
RaftProtocol: 3,
RetryJoinLAN: []string{
"foo=bar key=baz secret=boom bang=bar",
},
Expand Down Expand Up @@ -7690,7 +7699,7 @@ func TestSanitize(t *testing.T) {
"RPCConfig": {
"EnableStreaming": false
},
"RaftProtocol": 0,
"RaftProtocol": 3,
"RaftSnapshotInterval": "0s",
"RaftSnapshotThreshold": 0,
"RaftTrailingLogs": 0,
Expand Down
132 changes: 87 additions & 45 deletions agent/consul/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package consul
import (
"context"
"fmt"
"net"
"strconv"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf"
)

Expand All @@ -19,68 +18,111 @@ type AutopilotDelegate struct {
}

func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
return d.server.getOrCreateAutopilotConfig()
return d.server.getOrCreateAutopilotConfig().ToAutopilotLibraryConfig()
}

func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
func (d *AutopilotDelegate) KnownServers() map[raft.ServerID]*autopilot.Server {
return d.server.autopilotServers()
}

func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) {
if m.Tags["role"] != "consul" {
return nil, nil
}

portStr := m.Tags["port"]
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}

buildVersion, err := metadata.Build(&m)
if err != nil {
return nil, err
}

server := &autopilot.ServerInfo{
Name: m.Name,
ID: m.Tags["id"],
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
Build: *buildVersion,
Status: m.Status,
}
return server, nil
func (d *AutopilotDelegate) FetchServerStats(ctx context.Context, servers map[raft.ServerID]*autopilot.Server) map[raft.ServerID]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
}

// Heartbeat a metric for monitoring if we're the leader
func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) {
func (d *AutopilotDelegate) NotifyState(state *autopilot.State) {
// emit metrics if we are the leader regarding overall healthiness and the failure tolerance
if d.server.raft.State() == raft.Leader {
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
if health.Healthy {
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(state.FailureTolerance))
if state.Healthy {
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
}
}

func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
future := d.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, fmt.Errorf("failed to get raft configuration: %v", err)
func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) error {
if err := d.server.RemoveFailedNode(srv.Name, false); err != nil {
return fmt.Errorf("failed to remove server: %w", err)
}

return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
return nil
}

func (d *AutopilotDelegate) Raft() *raft.Raft {
return d.server.raft
func (s *Server) initAutopilot(config *Config) {
apDelegate := &AutopilotDelegate{s}

s.autopilot = autopilot.New(
s.raft,
apDelegate,
autopilot.WithLogger(s.logger),
autopilot.WithReconcileInterval(config.AutopilotInterval),
autopilot.WithUpdateInterval(config.ServerHealthInterval),
autopilot.WithPromoter(s.autopilotPromoter()),
)
}

func (d *AutopilotDelegate) SerfLAN() *serf.Serf {
return d.server.serfLAN
func (s *Server) autopilotServers() map[raft.ServerID]*autopilot.Server {
servers := make(map[raft.ServerID]*autopilot.Server)
for _, member := range s.serfLAN.Members() {
srv, err := s.autopilotServer(member)
if err != nil {
s.logger.Warn("Error parsing server info", "name", member.Name, "error", err)
continue
} else if srv == nil {
// this member was a client
continue
}

servers[srv.ID] = srv
}

return servers
}

func (s *Server) autopilotServer(m serf.Member) (*autopilot.Server, error) {
ok, srv := metadata.IsConsulServer(m)
if !ok {
return nil, nil
}

return s.autopilotServerFromMetadata(srv)
}

func (d *AutopilotDelegate) SerfWAN() *serf.Serf {
return d.server.serfWAN
func (s *Server) autopilotServerFromMetadata(srv *metadata.Server) (*autopilot.Server, error) {
server := &autopilot.Server{
Name: srv.ShortName,
ID: raft.ServerID(srv.ID),
Address: raft.ServerAddress(srv.Addr.String()),
Version: srv.Build.String(),
RaftVersion: srv.RaftVersion,
Ext: s.autopilotServerExt(srv),
}

switch srv.Status {
case serf.StatusLeft:
server.NodeStatus = autopilot.NodeLeft
case serf.StatusAlive, serf.StatusLeaving:
// we want to treat leaving as alive to prevent autopilot from
// prematurely removing the node.
server.NodeStatus = autopilot.NodeAlive
case serf.StatusFailed:
server.NodeStatus = autopilot.NodeFailed
default:
server.NodeStatus = autopilot.NodeUnknown
}

// populate the node meta if there is any. When a node first joins or if
// there are ACL issues then this could be empty if the server has not
// yet been able to register itself in the catalog
_, node, err := s.fsm.State().GetNodeID(types.NodeID(srv.ID))
if err != nil {
return nil, fmt.Errorf("error retrieving node from state store: %w", err)
}

if node != nil {
server.Meta = node.Meta
}

return server, nil
}
Loading

0 comments on commit 3c73d4a

Please sign in to comment.