Skip to content

Commit

Permalink
Merge pull request #6707 from hashicorp/f-cluster-id
Browse files Browse the repository at this point in the history
nomad: ensure a unique ClusterID exists when leader (gh-6702)
  • Loading branch information
shoenig authored Dec 9, 2019
2 parents 4e0dd92 + 0fa99ad commit 13b46bc
Show file tree
Hide file tree
Showing 10 changed files with 574 additions and 10 deletions.
52 changes: 52 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
ACLPolicySnapshot
ACLTokenSnapshot
SchedulerConfigSnapshot
ClusterMetadataSnapshot
)

// LogApplier is the definition of a function that can apply a Raft log
Expand Down Expand Up @@ -251,6 +252,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
case structs.ClusterMetadataRequestType:
return n.applyClusterMetadata(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand All @@ -267,6 +270,24 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
panic(fmt.Errorf("failed to apply request: %#v", buf))
}

func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "cluster_meta"}, time.Now())

var req structs.ClusterMetadata
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.ClusterSetMetadata(index, &req); err != nil {
n.logger.Error("ClusterSetMetadata failed", "error", err)
return err
}

n.logger.Trace("ClusterSetMetadata", "cluster_id", req.ClusterID, "create_time", req.CreateTime)

return nil
}

func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
Expand Down Expand Up @@ -1255,6 +1276,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

case ClusterMetadataSnapshot:
meta := new(structs.ClusterMetadata)
if err := dec.Decode(meta); err != nil {
return err
}
if err := restore.ClusterMetadataRestore(meta); err != nil {
return err
}

default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
Expand Down Expand Up @@ -1527,6 +1557,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistClusterMetadata(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -1874,6 +1908,24 @@ func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink,
return nil
}

func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink,
encoder *codec.Encoder) error {

// Get the cluster metadata
clusterMetadata, err := s.snap.ClusterMetadata()
if err != nil {
return err
}

// Write out the cluster metadata
sink.Write([]byte{byte(ClusterMetadataSnapshot)})
if err := encoder.Encode(clusterMetadata); err != nil {
return err
}

return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
58 changes: 57 additions & 1 deletion nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestFSM_UpsertNode_Canonicalize(t *testing.T) {
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

// Setup a node without eligiblity
// Setup a node without eligibility
node := mock.Node()
node.SchedulingEligibility = ""

Expand Down Expand Up @@ -2698,7 +2698,25 @@ func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) {
require.Nil(err)
require.EqualValues(1000, index)
require.Equal(schedConfig, out)
}

func TestFSM_SnapshotRestore_ClusterMetadata(t *testing.T) {
t.Parallel()

fsm := testFSM(t)
state := fsm.State()
clusterID := "12345678-1234-1234-1234-1234567890"
now := time.Now().UnixNano()
meta := &structs.ClusterMetadata{ClusterID: clusterID, CreateTime: now}
state.ClusterSetMetadata(1000, meta)

// Verify the contents
require := require.New(t)
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, err := state2.ClusterMetadata()
require.NoError(err)
require.Equal(clusterID, out.ClusterID)
}

func TestFSM_ReconcileSummaries(t *testing.T) {
Expand Down Expand Up @@ -2972,3 +2990,41 @@ func TestFSM_SchedulerConfig(t *testing.T) {
require.True(config.PreemptionConfig.SystemSchedulerEnabled)
require.True(config.PreemptionConfig.BatchSchedulerEnabled)
}

func TestFSM_ClusterMetadata(t *testing.T) {
t.Parallel()
r := require.New(t)

fsm := testFSM(t)
clusterID := "12345678-1234-1234-1234-1234567890"
now := time.Now().UnixNano()
meta := structs.ClusterMetadata{
ClusterID: clusterID,
CreateTime: now,
}
buf, err := structs.Encode(structs.ClusterMetadataRequestType, meta)
r.NoError(err)

result := fsm.Apply(makeLog(buf))
r.Nil(result)

// Verify the clusterID is set directly in the state store
storedMetadata, err := fsm.state.ClusterMetadata()
r.NoError(err)
r.Equal(clusterID, storedMetadata.ClusterID)

// Check that the sanity check prevents accidental UUID regeneration
erroneous := structs.ClusterMetadata{
ClusterID: "99999999-9999-9999-9999-9999999999",
}
buf, err = structs.Encode(structs.ClusterMetadataRequestType, erroneous)
r.NoError(err)

result = fsm.Apply(makeLog(buf))
r.Error(result.(error))

storedMetadata, err = fsm.state.ClusterMetadata()
r.NoError(err)
r.Equal(clusterID, storedMetadata.ClusterID)
r.Equal(now, storedMetadata.CreateTime)
}
26 changes: 24 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"strings"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/pkg/errors"
)

const (
Expand All @@ -44,6 +44,8 @@ var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))

var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0"))

var minClusterIDVersion = version.Must(version.NewVersion("0.10.3"))

// Default configuration for scheduler with preemption enabled for system jobs
var defaultSchedulerConfig = &structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
Expand Down Expand Up @@ -201,6 +203,10 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Initialize scheduler configuration
s.getOrCreateSchedulerConfig()

// Initialize the ClusterID
_, _ = s.ClusterID()
// todo: use cluster ID for stuff, later!

// Enable the plan queue, since we are now the leader
s.planQueue.SetEnabled(true)

Expand Down Expand Up @@ -1327,3 +1333,19 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {

return config
}

func (s *Server) generateClusterID() (string, error) {
if !ServersMeetMinimumVersion(s.Members(), minClusterIDVersion, false) {
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
return "", errors.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
}

newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate(), CreateTime: time.Now().UnixNano()}
if _, _, err := s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil {
s.logger.Named("core").Error("failed to create cluster ID", "error", err)
return "", errors.Wrap(err, "failed to create cluster ID")
}

s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime)
return newMeta.ClusterID, nil
}
Loading

0 comments on commit 13b46bc

Please sign in to comment.