Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Job Garbage Collection #3982

Merged
merged 4 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ __BACKWARDS INCOMPATIBILITIES:__
[restart stanza](https://www.nomadproject.io/docs/job-specification/restart.html) for more information.

IMPROVEMENTS:
* core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)]
* core: More efficient garbage collection of large batches of jobs [[GH-3982](https://github.com/hashicorp/nomad/issues/3982)]
* core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)]
* core: Node events are emitted for events such as node registration and
heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)]
Expand Down
59 changes: 47 additions & 12 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,60 @@ OUTER:
return err
}

// Call to the leader to deregister the jobs.
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
// Reap the jobs
return c.jobReap(gcJob, eval.LeaderACL)
}

// jobReap contacts the leader and issues a reap on the passed jobs
func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionJobReap(jobs, leaderACL) {
var resp structs.JobBatchDeregisterResponse
if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: batch job reap failed: %v", err)
return err
}
}

return nil
}

// partitionJobReap returns a list of JobBatchDeregisterRequests to make,
// ensuring a single request does not contain too many jobs. This is necessary
// to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest {
option := &structs.JobDeregisterOptions{Purge: true}
var requests []*structs.JobBatchDeregisterRequest
submittedJobs := 0
for submittedJobs != len(jobs) {
req := &structs.JobBatchDeregisterRequest{
Jobs: make(map[structs.NamespacedID]*structs.JobDeregisterOptions),
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
Namespace: job.Namespace,
AuthToken: eval.LeaderACL,
AuthToken: leaderACL,
},
}
var resp structs.JobDeregisterResponse
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
return err
requests = append(requests, req)
available := maxIdsPerReap

if remaining := len(jobs) - submittedJobs; remaining > 0 {
if remaining <= available {
for _, job := range jobs[submittedJobs:] {
jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace}
req.Jobs[jns] = option
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little nervous about sharing a pointer to the same struct here, but nothing mutates it and it seems unlikely anything would ever go back through and mutate it so I think it's safe to leave.

}
submittedJobs += remaining
} else {
for _, job := range jobs[submittedJobs : submittedJobs+available] {
jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace}
req.Jobs[jns] = option
}
submittedJobs += available
}
}
}

return nil
return requests
}

// evalGC is used to garbage collect old evaluations
Expand Down
27 changes: 27 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,33 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
}
}

func TestCoreScheduler_PartitionJobReap(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a core scheduler
snap, err := s1.fsm.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
maxIdsPerReap = 2

jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
require.Len(requests, 2)

first := requests[0]
second := requests[1]
require.Len(first.Jobs, 2)
require.Len(second.Jobs, 1)
}

// Tests various scenarios when allocations are eligible to be GCed
func TestAllocation_GCEligible(t *testing.T) {
type testCase struct {
Expand Down
41 changes: 35 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -431,33 +433,60 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil {
n.logger.Printf("[ERR] nomad.fsm: deregistering job failed: %v", err)
return err
}

return nil
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_job"}, time.Now())
var req structs.JobBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil {
n.logger.Printf("[ERR] nomad.fsm: deregistering %v failed: %v", jobNS, err)
return err
}
}

return n.upsertEvals(index, req.Evals)
}

// handleJobDeregister is used to deregister a job.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(req.Namespace, req.JobID); err != nil {
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
return err
}

if req.Purge {
if err := n.state.DeleteJob(index, req.Namespace, req.JobID); err != nil {
if purge {
if err := n.state.DeleteJob(index, namespace, jobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}

// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, req.Namespace, req.JobID)
n.state.DeletePeriodicLaunch(index, namespace, jobID)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByID(ws, req.Namespace, req.JobID)
current, err := n.state.JobByID(ws, namespace, jobID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err)
return err
}

if current == nil {
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", req.JobID, req.Namespace)
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
}

stopped := current.Copy()
Expand Down
78 changes: 78 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,84 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
}
}

func TestFSM_BatchDeregisterJob(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)

job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

job2 := mock.Job()
req2 := structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{
Namespace: job2.Namespace,
},
}

buf, err = structs.Encode(structs.JobRegisterRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)

req3 := structs.JobBatchDeregisterRequest{
Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{
structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}: &structs.JobDeregisterOptions{},
structs.NamespacedID{
ID: job2.ID,
Namespace: job2.Namespace,
}: &structs.JobDeregisterOptions{
Purge: true,
},
},
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err = structs.Encode(structs.JobBatchDeregisterRequestType, req3)
require.Nil(err)

resp = fsm.Apply(makeLog(buf))
require.Nil(resp)

// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
require.Nil(err)
require.NotNil(jobOut)
require.True(jobOut.Stop)

// Verify it was removed from the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
require.NotContains(fsm.periodicDispatcher.tracked, tuple)

// Verify it was not removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
require.Nil(err)
require.NotNil(launchOut)

// Verify the other jbo was purged
jobOut2, err := fsm.State().JobByID(ws, job2.Namespace, job2.ID)
require.Nil(err)
require.Nil(jobOut2)
}

func TestFSM_UpdateEval(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
85 changes: 83 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD

// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
// since all should be able to handle deregistration in the same way.
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down Expand Up @@ -646,6 +645,88 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return nil
}

// BatchDeregister is used to remove a set of jobs from the cluster.
func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *structs.JobBatchDeregisterResponse) error {
if done, err := j.srv.forward("Job.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "batch_deregister"}, time.Now())

// Resolve the ACL token
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}

// Validate the arguments
if len(args.Jobs) == 0 {
return fmt.Errorf("given no jobs to deregister")
}
if len(args.Evals) != 0 {
return fmt.Errorf("evaluations should not be populated")
}

// Loop through checking for permissions
for jobNS := range args.Jobs {
// Check for submit-job permissions
if aclObj != nil && !aclObj.AllowNsOp(jobNS.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
}

// Grab a snapshot
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

// Loop through to create evals
for jobNS, options := range args.Jobs {
if options == nil {
return fmt.Errorf("no deregister options provided for %v", jobNS)
}

job, err := snap.JobByID(nil, jobNS.Namespace, jobNS.ID)
if err != nil {
return err
}

// If the job is periodic or parameterized, we don't create an eval.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler doesn't do anything for those job types. It does work for the derived batch jobs from them

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure, that makes sense. Maybe in future comments add a because ... clause as what the code is doing is fairly obvious, but why it's doing it is not.

if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
continue
}

priority := structs.JobDefaultPriority
jtype := structs.JobTypeService
if job != nil {
priority = job.Priority
jtype = job.Type
}

// Create a new evaluation
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Priority: priority,
Type: jtype,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: jobNS.ID,
Status: structs.EvalStatusPending,
}
args.Evals = append(args.Evals, eval)
}

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: batch deregister failed: %v", err)
return err
}

reply.Index = index
return nil
}

// GetJob is used to request information about a specific job
func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply *structs.SingleJobResponse) error {
Expand Down
Loading