Skip to content

Commit

Permalink
Added multiple policies for renaming nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
pierresouchay committed Nov 27, 2018
1 parent 71a0aa5 commit a10683e
Show file tree
Hide file tree
Showing 19 changed files with 276 additions and 66 deletions.
3 changes: 3 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,9 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.NonVotingServer {
base.NonVoter = a.config.NonVotingServer
}
if a.config.NodeRenamingPolicy != "" {
base.NodeRenamingPolicy = a.config.NodeRenamingPolicy
}

// These are fully specified in the agent defaults, so we can simply
// copy them over.
Expand Down
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
NodeID: types.NodeID(b.stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
NodeRenamingPolicy: b.stringValWithDefault(c.NodeRenamingPolicy, "legacy"),
NonVotingServer: b.boolVal(c.NonVotingServer),
PidFile: b.stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type Config struct {
NodeID *string `json:"node_id,omitempty" hcl:"node_id" mapstructure:"node_id"`
NodeMeta map[string]string `json:"node_meta,omitempty" hcl:"node_meta" mapstructure:"node_meta"`
NodeName *string `json:"node_name,omitempty" hcl:"node_name" mapstructure:"node_name"`
NodeRenamingPolicy *string `json:"node_renaming_policy,omitempty" hcl:"node_renaming_policy" mapstructure:"node_renaming_policy"`
NonVotingServer *bool `json:"non_voting_server,omitempty" hcl:"non_voting_server" mapstructure:"non_voting_server"`
Performance Performance `json:"performance,omitempty" hcl:"performance" mapstructure:"performance"`
PidFile *string `json:"pid_file,omitempty" hcl:"pid_file" mapstructure:"pid_file"`
Expand Down
1 change: 1 addition & 0 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func AddFlags(fs *flag.FlagSet, f *Flags) {
add(&f.Config.NodeName, "node", "Name of this node. Must be unique in the cluster.")
add(&f.Config.NodeID, "node-id", "A unique ID for this node across space and time. Defaults to a randomly-generated ID that persists in the data-dir.")
add(&f.Config.NodeMeta, "node-meta", "An arbitrary metadata key/value pair for this node, of the format `key:value`. Can be specified multiple times.")
add(&f.Config.NodeRenamingPolicy, "node-renaming-policy", "Choose between behaviours when renaming nodes, can be any of legacy|dead|strict")
add(&f.Config.NonVotingServer, "non-voting-server", "(Enterprise-only) This flag is used to make the server not participate in the Raft quorum, and have it only receive the data replication stream. This can be used to add read scalability to a cluster in cases where a high volume of reads to servers are needed.")
add(&f.Config.PidFile, "pid-file", "Path to file to store agent PID.")
add(&f.Config.RPCProtocol, "protocol", "Sets the protocol version. Defaults to latest.")
Expand Down
6 changes: 6 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,12 @@ type RuntimeConfig struct {
// flag: -node-meta "key:value" -node-meta "key:value" ...
NodeMeta map[string]string

// NodeRenamingPolicy exposes how renaming is handled.
//
// hcl: node_renaming_policy = (legacy|dead|strict)
// flag: -node-renaming-policy
NodeRenamingPolicy string

// NonVotingServer is whether this server will act as a non-voting member
// of the cluster to help provide read scalability. (Enterprise-only)
//
Expand Down
13 changes: 13 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
rt.DataDir = dataDir
},
},
{
desc: "-node-renaming-policy",
args: []string{
`-node-renaming-policy=dead`,
`-data-dir=` + dataDir,
},
patch: func(rt *RuntimeConfig) {
rt.NodeRenamingPolicy = "dead"
rt.DataDir = dataDir
},
},
{
desc: "-non-voting-server",
args: []string{
Expand Down Expand Up @@ -4196,6 +4207,7 @@ func TestFullConfig(t *testing.T) {
NodeID: types.NodeID("AsUIlw99"),
NodeMeta: map[string]string{"5mgGQMBk": "mJLtVMSG", "A7ynFMJB": "0Nx6RGab"},
NodeName: "otlLxGaI",
NodeRenamingPolicy: "legacy",
NonVotingServer: true,
PidFile: "43xN80Km",
PrimaryDatacenter: "ejtmd43d",
Expand Down Expand Up @@ -4994,6 +5006,7 @@ func TestSanitize(t *testing.T) {
"NodeID": "",
"NodeMeta": {},
"NodeName": "",
"NodeRenamingPolicy": "",
"NonVotingServer": false,
"PidFile": "",
"PrimaryDatacenter": "",
Expand Down
2 changes: 1 addition & 1 deletion agent/connect/ca/provider_consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) error {
}

func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate {
s, err := state.NewStateStore(nil)
s, err := state.NewStateStore(nil, state.NodeRenamingDefault)
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down
3 changes: 3 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ type Config struct {
// warning and discard the remaining updates.
CoordinateUpdateMaxBatches int

// NodeRenamingPolicy allow to handle node renaming behavior
NodeRenamingPolicy string

// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
// This is used to paper over a loss of leadership by instead holding RPCs,
// so that the caller experiences a slow response rather than an error.
Expand Down
44 changes: 22 additions & 22 deletions agent/consul/fsm/commands_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func generateRandomCoordinate() *coordinate.Coordinate {

func TestFSM_RegisterNode(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestFSM_RegisterNode(t *testing.T) {

func TestFSM_RegisterNode_Service(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {

func TestFSM_DeregisterService(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestFSM_DeregisterService(t *testing.T) {

func TestFSM_DeregisterCheck(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {

func TestFSM_DeregisterNode(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestFSM_DeregisterNode(t *testing.T) {

func TestFSM_KVSDelete(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestFSM_KVSDelete(t *testing.T) {

func TestFSM_KVSDeleteTree(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {

func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {

func TestFSM_KVSCheckAndSet(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {

func TestFSM_KVSLock(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -600,7 +600,7 @@ func TestFSM_KVSLock(t *testing.T) {

func TestFSM_KVSUnlock(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -663,7 +663,7 @@ func TestFSM_KVSUnlock(t *testing.T) {

func TestFSM_CoordinateUpdate(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {

func TestFSM_SessionCreate_Destroy(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -784,7 +784,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {

func TestFSM_ACL_CRUD(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -902,7 +902,7 @@ func TestFSM_ACL_CRUD(t *testing.T) {

func TestFSM_PreparedQuery_CRUD(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {

func TestFSM_TombstoneReap(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1048,7 +1048,7 @@ func TestFSM_TombstoneReap(t *testing.T) {

func TestFSM_Txn(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestFSM_Txn(t *testing.T) {

func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func TestFSM_Intention_CRUD(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
assert.Nil(err)

// Create a new intention.
Expand Down Expand Up @@ -1223,7 +1223,7 @@ func TestFSM_CAConfig(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
assert.Nil(err)

// Set the autopilot config using a request.
Expand Down Expand Up @@ -1290,7 +1290,7 @@ func TestFSM_CARoots(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
assert.Nil(err)

// Roots
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func TestFSM_CABuiltinProvider(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
assert.Nil(err)

// Provider state.
Expand Down
9 changes: 5 additions & 4 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ type FSM struct {
stateLock sync.RWMutex
state *state.Store

gc *state.TombstoneGC
gc *state.TombstoneGC
nodeRenamingPolicy string
}

// New is used to construct a new FSM with a blank state.
func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) {
stateNew, err := state.NewStateStore(gc)
func New(gc *state.TombstoneGC, logOutput io.Writer, nodeRenamingPolicy string) (*FSM, error) {
stateNew, err := state.NewStateStore(gc, nodeRenamingPolicy)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
defer old.Close()

// Create a new state store.
stateNew, err := state.NewStateStore(c.gc)
stateNew, err := state.NewStateStore(c.gc, c.nodeRenamingPolicy)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/fsm/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func makeLog(buf []byte) *raft.Log {

func TestFSM_IgnoreUnknown(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
assert.Nil(t, err)

// Create a new reap request
Expand Down
6 changes: 3 additions & 3 deletions agent/consul/fsm/snapshot_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
}

// Try to restore on a new FSM
fsm2, err := New(nil, os.Stderr)
fsm2, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
func TestFSM_BadRestore_OSS(t *testing.T) {
t.Parallel()
// Create an FSM with some state.
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func makeLog(buf []byte) *raft.Log {
// Testing for GH-300 and GH-279
func TestHealthCheckRace(t *testing.T) {
t.Parallel()
fsm, err := consulfsm.New(nil, os.Stderr)
fsm, err := consulfsm.New(nil, os.Stderr, "")
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (s *Server) setupRaft() error {

// Create the FSM.
var err error
s.fsm, err = fsm.New(s.tombstoneGC, s.config.LogOutput)
s.fsm, err = fsm.New(s.tombstoneGC, s.config.LogOutput, s.config.NodeRenamingPolicy)
if err != nil {
return err
}
Expand Down Expand Up @@ -603,7 +603,7 @@ func (s *Server) setupRaft() error {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}

tmpFsm, err := fsm.New(s.tombstoneGC, s.config.LogOutput)
tmpFsm, err := fsm.New(s.tombstoneGC, s.config.LogOutput, s.config.NodeRenamingPolicy)
if err != nil {
return fmt.Errorf("recovery failed to make temp FSM: %v", err)
}
Expand Down
Loading

0 comments on commit a10683e

Please sign in to comment.