Skip to content

Commit

Permalink
jobspec: time based task execution (#22201)
Browse files Browse the repository at this point in the history
this is the CE side of an Enterprise-only feature.
a job trying to use this in CE will fail to validate.

to enable daily-scheduled execution entirely client-side,
a job may now contain:

task "name" {
  schedule {
    cron {
      start    = "0 12 * * * *" # may not include "," or "/"
      end      = "0 16"         # partial cron, with only {minute} {hour}
      timezone = "EST"          # anything in your tzdb
    }
  }
...

and everything about the allocation will be placed as usual,
but if outside the specified schedule, the taskrunner will block
on the client, waiting on the schedule start, before proceeding
with the task driver execution, etc.

this includes a taksrunner hook, which watches for the end of
the schedule, at which point it will kill the task.

then, restarts-allowing, a new task will start and again block
waiting for start, and so on.

this also includes all the plumbing required to pipe API calls
through from command->api->agent->server->client, so that
tasks can be force-run, force-paused, or resume the schedule
on demand.
  • Loading branch information
gulducat authored May 22, 2024
1 parent 6a25c2f commit 4415fab
Show file tree
Hide file tree
Showing 40 changed files with 1,512 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .changelog/22201.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
jobspec: Add a schedule{} block for time based task execution (Enterprise)
```
33 changes: 33 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,27 @@ func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal st
return err
}

// SetPauseState sets the schedule behavior of one task in the allocation.
func (a *Allocations) SetPauseState(alloc *Allocation, q *QueryOptions, task, state string) error {
req := AllocPauseRequest{
ScheduleState: state,
Task: task,
}
var resp GenericResponse
_, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/pause", &req, &resp, q)
return err
}

// GetPauseState gets the schedule behavior of one task in the allocation.
//
// The ?task=<task> query parameter must be set.
func (a *Allocations) GetPauseState(alloc *Allocation, q *QueryOptions, task string) (string, *QueryMeta, error) {
var resp AllocGetPauseResponse
qm, err := a.client.query("/v1/client/allocation/"+alloc.ID+"/pause", &resp, q)
state := resp.ScheduleState
return state, qm, err
}

// Services is used to return a list of service registrations associated to the
// specified allocID.
func (a *Allocations) Services(allocID string, q *QueryOptions) ([]*ServiceRegistration, *QueryMeta, error) {
Expand Down Expand Up @@ -517,6 +538,18 @@ type AllocSignalRequest struct {
Signal string
}

type AllocPauseRequest struct {
Task string

// ScheduleState must be one of "pause", "run", "scheduled".
ScheduleState string
}

type AllocGetPauseResponse struct {
// ScheduleState will be one of "pause", "run", "scheduled".
ScheduleState string
}

// GenericResponse is used to respond to a request where no
// specific response information is needed.
type GenericResponse struct {
Expand Down
14 changes: 14 additions & 0 deletions api/task_sched.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package api

type TaskSchedule struct {
Cron *TaskScheduleCron `hcl:"cron,block"`
}

type TaskScheduleCron struct {
Start string `hcl:"start,optional"`
End string `hcl:"end,optional"`
Timezone string `hcl:"timezone,optional"`
}
2 changes: 2 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ type Task struct {
Identities []*WorkloadIdentity `hcl:"identity,block"`

Actions []*Action `hcl:"action,block"`

Schedule *TaskSchedule `hcl:"schedule,block"`
}

func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
Expand Down
40 changes: 40 additions & 0 deletions client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,46 @@ func (a *Allocations) Signal(args *nstructs.AllocSignalRequest, reply *nstructs.
return a.c.SignalAllocation(args.AllocID, args.Task, args.Signal)
}

func (a *Allocations) SetPauseState(args *nstructs.AllocPauseRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "pause_set"}, time.Now())

alloc, err := a.c.GetAlloc(args.AllocID)
if err != nil {
return err
}

if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilitySubmitJob) {
return nstructs.ErrPermissionDenied
}

return a.c.PauseAllocation(args.AllocID, args.Task, args.ScheduleState)
}

func (a *Allocations) GetPauseState(args *nstructs.AllocGetPauseStateRequest, reply *nstructs.AllocGetPauseStateResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "pause_get"}, time.Now())

alloc, err := a.c.GetAlloc(args.AllocID)
if err != nil {
return err
}

if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadJob) {
return nstructs.ErrPermissionDenied
}

state, err := a.c.GetPauseAllocation(args.AllocID, args.Task)
if err != nil {
return err
}

reply.ScheduleState = state
return nil
}

// Restart is used to trigger a restart of an allocation or a subtask on a client.
func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now())
Expand Down
21 changes: 21 additions & 0 deletions client/allocrunner/alloc_runner_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !ent
// +build !ent

package allocrunner

import (
"fmt"

"github.com/hashicorp/nomad/nomad/structs"
)

func (ar *allocRunner) SetTaskPauseState(string, structs.TaskScheduleState) error {
return fmt.Errorf("Enterprise only")
}

func (ar *allocRunner) GetTaskPauseState(taskName string) (structs.TaskScheduleState, error) {
return "", fmt.Errorf("Enterprise only")
}
2 changes: 2 additions & 0 deletions client/allocrunner/interfaces/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type AllocRunner interface {
StatsReporter() AllocStatsReporter
Listener() *cstructs.AllocListener
GetAllocDir() allocdir.Interface
SetTaskPauseState(taskName string, ps structs.TaskScheduleState) error
GetTaskPauseState(taskName string) (structs.TaskScheduleState, error)
}

// TaskStateHandler exposes a handler to be called when a task's state changes
Expand Down
11 changes: 11 additions & 0 deletions client/allocrunner/taskrunner/sched_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package taskrunner

const (
// taskPauseHookName is the name of the task pause schedule hook. As an
// enterprise only feature the implementation is split between
// sched_hook_ce.go and sched_hook_ent.
taskPauseHookName = "pause"
)
34 changes: 34 additions & 0 deletions client/allocrunner/taskrunner/sched_hook_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !ent

package taskrunner

import (
"fmt"

"github.com/hashicorp/nomad/nomad/structs"
)

type pauseHook struct{}

func (pauseHook) Name() string { return taskPauseHookName }

func newPauseHook(...any) pauseHook {
return pauseHook{}
}

type pauseGate struct{}

func newPauseGate(...any) *pauseGate {
return &pauseGate{}
}

func (*pauseGate) Wait() error {
return nil
}

func (tr *TaskRunner) SetTaskPauseState(structs.TaskScheduleState) error {
return fmt.Errorf("Enterprise only")
}
14 changes: 14 additions & 0 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ type TaskRunner struct {

// users manages the pool of dynamic workload users
users dynamic.Pool

// pauser controls whether the task should be run or stopped based on a
// schedule. (Enterprise)
pauser *pauseGate
}

type Config struct {
Expand Down Expand Up @@ -426,6 +430,9 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
// Create the logger based on the allocation ID
tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name)

// Create the pauser
tr.pauser = newPauseGate(tr)

// Pull out the task's resources
ares := tr.alloc.AllocatedResources
if ares == nil {
Expand Down Expand Up @@ -613,6 +620,13 @@ MAIN:
goto RESTART
}

// Unblocks when the task runner is allowed to continue. (Enterprise)
if err := tr.pauser.Wait(); err != nil {
tr.logger.Error("pause scheduled failed", "error", err)
tr.restartTracker.SetStartError(err)
break MAIN
}

// Check for a terminal allocation once more before proceeding as the
// prestart hooks may have been skipped.
if tr.shouldShutdown() {
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ func (tr *TaskRunner) initHooks() {
if tr.driverCapabilities.RemoteTasks {
tr.runnerHooks = append(tr.runnerHooks, newRemoteTaskHook(tr, hookLogger))
}

// If this task has a pause schedule, initialize the pause (Enterprise)
if task.Schedule != nil {
tr.runnerHooks = append(tr.runnerHooks, newPauseHook(tr, hookLogger))
}
}

func (tr *TaskRunner) emitHookError(err error, hookName string) {
Expand Down
18 changes: 18 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,24 @@ func (c *Client) SignalAllocation(allocID, task, signal string) error {
return ar.Signal(task, signal)
}

// PauseAllocation sets the pause state of the given task for the allocation.
func (c *Client) PauseAllocation(allocID, task string, scheduleState structs.TaskScheduleState) error {
ar, err := c.getAllocRunner(allocID)
if err != nil {
return err
}
return ar.SetTaskPauseState(task, scheduleState)
}

// GetPauseAllocation gets the pause state of the given task for the allocation.
func (c *Client) GetPauseAllocation(allocID, task string) (structs.TaskScheduleState, error) {
ar, err := c.getAllocRunner(allocID)
if err != nil {
return "", err
}
return ar.GetTaskPauseState(task)
}

// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
Expand Down
8 changes: 8 additions & 0 deletions client/client_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,11 @@ func (ar *emptyAllocRunner) LatestAllocStats(taskFilter string) (*cstructs.Alloc
Timestamp: 0,
}, nil
}

func (ar *emptyAllocRunner) SetTaskPauseState(taskName string, ps structs.TaskScheduleState) error {
return nil
}

func (ar *emptyAllocRunner) GetTaskPauseState(taskName string) (structs.TaskScheduleState, error) {
return "", nil
}
3 changes: 2 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,8 @@ func (a *Agent) setupClient() error {
a.consulCatalog, // self service discovery
a.consulProxiesFunc, // supported Envoy versions fingerprinting
a.consulServices, // workload service discovery
nil)
nil, // use the standard set of rpcs
)
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
Expand Down
Loading

0 comments on commit 4415fab

Please sign in to comment.