Skip to content

Commit

Permalink
state: ensure the job submission table is persisted and restored. (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell authored Jan 5, 2024
1 parent 2abbd7e commit f3ed406
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/19605.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Ensure job HCL submission data is persisted and restored during the FSM snapshot process
```
38 changes: 38 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
ACLAuthMethodSnapshot SnapshotType = 26
ACLBindingRuleSnapshot SnapshotType = 27
NodePoolSnapshot SnapshotType = 28
JobSubmissionSnapshot SnapshotType = 29

// Namespace appliers were moved from enterprise and therefore start at 64
NamespaceSnapshot SnapshotType = 64
Expand Down Expand Up @@ -1883,6 +1884,18 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error {
return err
}

case JobSubmissionSnapshot:
jobSubmissions := new(structs.JobSubmission)

if err := dec.Decode(jobSubmissions); err != nil {
return err
}

// Perform the restoration.
if err := restore.JobSubmissionRestore(jobSubmissions); err != nil {
return err
}

default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
Expand Down Expand Up @@ -2454,6 +2467,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistJobSubmissions(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -3170,6 +3187,27 @@ func (s *nomadSnapshot) persistACLBindingRules(sink raft.SnapshotSink, encoder *
return nil
}

func (s *nomadSnapshot) persistJobSubmissions(sink raft.SnapshotSink, encoder *codec.Encoder) error {

// Get all the job submissions.
ws := memdb.NewWatchSet()
jobSubmissionsIter, err := s.snap.GetJobSubmissions(ws)
if err != nil {
return err
}

for raw := jobSubmissionsIter.Next(); raw != nil; raw = jobSubmissionsIter.Next() {
jobSubmission := raw.(*structs.JobSubmission)

// write the snapshot
sink.Write([]byte{byte(JobSubmissionSnapshot)})
if err := encoder.Encode(jobSubmission); err != nil {
return err
}
}
return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
54 changes: 54 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2934,6 +2934,60 @@ func TestFSM_SnapshotRestore_ACLBindingRules(t *testing.T) {
must.SliceContainsAll(t, restoredACLBindingRules, mockedACLBindingRoles)
}

func TestFSM_SnapshotRestore_JobSubmissions(t *testing.T) {
ci.Parallel(t)

// Create our initial FSM which will be snapshotted.
fsm := testFSM(t)
testState := fsm.State()

// Create a non-default namespace, so we can later create jobs and
// submissions within it.
mockNamespace := mock.Namespace()
mockNamespace.Name = "platform"

must.NoError(t, testState.UpsertNamespaces(10, []*structs.Namespace{mockNamespace}))

// Generate a some mocked jobs and submissions to insert directly into
// state.
mockJob1 := mock.Job()
mockJobSubmission1 := &structs.JobSubmission{
Source: "job{}",
Namespace: mockJob1.Namespace,
JobID: mockJob1.ID,
Version: mockJob1.Version,
JobModifyIndex: mockJob1.JobModifyIndex,
}

must.NoError(t, testState.UpsertJob(structs.MsgTypeTestSetup, mockJob1.ModifyIndex, mockJobSubmission1, mockJob1))

mockJob2 := mock.Job()
mockJob2.Namespace = mockNamespace.Name
mockJobSubmission2 := &structs.JobSubmission{
Source: "job{}",
Namespace: mockJob2.Namespace,
JobID: mockJob2.ID,
Version: mockJob2.Version,
JobModifyIndex: mockJob2.JobModifyIndex,
}

must.NoError(t, testState.UpsertJob(structs.MsgTypeTestSetup, mockJob2.ModifyIndex, mockJobSubmission2, mockJob2))

// Perform a snapshot restore.
restoredFSM := testSnapshotRestore(t, fsm)
restoredState := restoredFSM.State()

jobSubmission1Resp, err := restoredState.JobSubmission(
nil, mockJobSubmission1.Namespace, mockJobSubmission1.JobID, mockJobSubmission1.Version)
must.NoError(t, err)
must.Eq(t, mockJobSubmission1, jobSubmission1Resp)

jobSubmission2Resp, err := restoredState.JobSubmission(
nil, mockJobSubmission2.Namespace, mockJobSubmission2.JobID, mockJobSubmission2.Version)
must.NoError(t, err)
must.Eq(t, mockJobSubmission2, jobSubmission2Resp)
}

func TestFSM_ReconcileSummaries(t *testing.T) {
ci.Parallel(t)
// Add some state
Expand Down
3 changes: 2 additions & 1 deletion nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
)

const (
Expand Down Expand Up @@ -323,7 +324,7 @@ func jobVersionSchema() *memdb.TableSchema {
// which contain the original source material of each job, per version.
func jobSubmissionSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "job_submission",
Name: TableJobSubmission,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Expand Down
16 changes: 16 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2093,6 +2093,22 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
return nil
}

// GetJobSubmissions returns an iterator that contains all job submissions
// stored within state. This is not currently exposed via RPC and is only used
// for snapshot persist and restore functionality.
func (s *StateStore) GetJobSubmissions(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

// Walk the entire table to get all job submissions.
iter, err := txn.Get(TableJobSubmission, indexID)
if err != nil {
return nil, fmt.Errorf("job submissions lookup failed: %v", err)
}
ws.Add(iter.WatchCh())

return iter, nil
}

// JobSubmission returns the original HCL/Variables context of a job, if available.
//
// Note: it is a normal case for the submission context to be unavailable, in which case
Expand Down
9 changes: 9 additions & 0 deletions nomad/state/state_store_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,12 @@ func (r *StateRestore) ACLBindingRuleRestore(aclBindingRule *structs.ACLBindingR
}
return nil
}

// JobSubmissionRestore is used to restore a single job submission into the
// job_submission table.
func (r *StateRestore) JobSubmissionRestore(jobSubmission *structs.JobSubmission) error {
if err := r.txn.Insert(TableJobSubmission, jobSubmission); err != nil {
return fmt.Errorf("job submission insert failed: %v", err)
}
return nil
}
24 changes: 24 additions & 0 deletions nomad/state/state_store_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,27 @@ func TestStateStore_ACLBindingRuleRestore(t *testing.T) {
must.NoError(t, err)
must.Eq(t, aclBindingRule, out)
}

func TestStateStore_JobSubmissionRestore(t *testing.T) {
ci.Parallel(t)
testState := testStateStore(t)

// Set up our test job submissions.
jobSubmission := structs.JobSubmission{
Source: "{job{}}",
Namespace: "default",
JobID: "example",
}

restore, err := testState.Restore()
must.NoError(t, err)
must.NoError(t, restore.JobSubmissionRestore(&jobSubmission))
must.NoError(t, restore.Commit())

// Check the state is now populated as we expect and that we can find the
// restored job submission.
ws := memdb.NewWatchSet()
out, err := testState.JobSubmission(ws, jobSubmission.Namespace, jobSubmission.JobID, jobSubmission.Version)
must.NoError(t, err)
must.Eq(t, jobSubmission, *out)
}
48 changes: 48 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2566,6 +2566,54 @@ func TestStateStore_UpsertJob_submission(t *testing.T) {
must.Eq(t, 1007, sub.JobModifyIndex)
}

func TestStateStore_GetJobSubmissions(t *testing.T) {
ci.Parallel(t)

state := testStateStore(t)

// Generate some job submissions and upsert these into state.
mockJobSubmissions := []*structs.JobSubmission{
{
Source: "job{}",
Namespace: "default",
JobID: "example",
Version: 10,
JobModifyIndex: 20,
},
{
Source: "job{}",
Namespace: "platform",
JobID: "example",
Version: 20,
JobModifyIndex: 20,
},
}

txn := state.db.WriteTxn(20)

for _, mockSubmission := range mockJobSubmissions {
must.NoError(t, state.updateJobSubmission(
20, mockSubmission, mockSubmission.Namespace, mockSubmission.JobID, mockSubmission.Version, txn))
}

must.NoError(t, txn.Commit())

// List out all the job submissions in state and ensure they match the
// items we previously wrote.
ws := memdb.NewWatchSet()
iter, err := state.GetJobSubmissions(ws)
must.NoError(t, err)

var submissions []*structs.JobSubmission

for raw := iter.Next(); raw != nil; raw = iter.Next() {
submissions = append(submissions, raw.(*structs.JobSubmission))
}

must.SliceLen(t, 2, submissions)
must.Eq(t, mockJobSubmissions, submissions)
}

func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) {
ci.Parallel(t)

Expand Down

0 comments on commit f3ed406

Please sign in to comment.