Skip to content

Commit

Permalink
Support for "boot" jobs that are run to completion before "regular" jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-helmich committed Jul 30, 2020
1 parent 03f8ce6 commit 3410424
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 23 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ If there is a `lazy` block present, the process itself will only be started when
When no connections have been active for a duration of at least `.lazy.coolDownTimeout`, mittnite will terminate the process again.
#### Boot Jobs
Boot jobs are "special" jobs that are executed before regular `job` definitions. Boot jobs are required to run to completion before any regular jobs are started.
```hcl
boot "setup" {
command = "/bin/bash"
args = ["/init-script.sh"]
timeout = "30s"
env = [
"FOO=bar"
]
}
```
#### File
Possible directives to use in a file definition.
Expand Down
5 changes: 4 additions & 1 deletion cmd/renderfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ func init() {
rootCmd.AddCommand(renderFiles)
}

// TODO(@hermsi1337): WTH do we even need this for!?
var renderFiles = &cobra.Command{
Use: "renderfiles",
Use: "renderfiles",
Short: "Renders configuration files",
Long: "This command renders the configured configuration files, before (optionally) starting another process",
Run: func(cmd *cobra.Command, args []string) {
ignitionConfig := &config.Ignition{
Probes: nil,
Expand Down
17 changes: 12 additions & 5 deletions cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func init() {
}

var up = &cobra.Command{
Use: "up",
Use: "up",
Short: "Render config files, start probes and processes",
Long: "This sub-command renders the configuration files, starts the probes and launches all configured processes",
Run: func(cmd *cobra.Command, args []string) {
ignitionConfig := &config.Ignition{
Probes: nil,
Expand Down Expand Up @@ -75,15 +77,20 @@ var up = &cobra.Command{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

runner := proc.NewRunner(ctx, ignitionConfig)
runner := proc.NewRunner(ignitionConfig)
go func() {
<-procSignals
cancel()
}()

err = runner.Run()
if err != nil {
log.Fatalf("service runner stopped with error: %s", err)
if err := runner.Boot(ctx); err != nil {
log.WithError(err).Fatal("runner error'ed during initialization")
} else {
log.Info("initialization complete")
}

if err := runner.Run(ctx); err != nil {
log.WithError(err).Fatal("service runner stopped with error")
} else {
log.Print("service runner stopped without error")
}
Expand Down
5 changes: 5 additions & 0 deletions examples/mittnite.d/jobs/sleep.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ job sleep {
args = ["500s"]
canFail = true
oneTime = true
}

boot hello {
command = "cowsay"
args = ["moo"]
}
4 changes: 4 additions & 0 deletions internal/config/ignitionconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (ignitionConfig *Ignition) GenerateFromConfigDir(configDir string) error {
}

for _, job := range ignitionConfig.Jobs {
if job.OneTime {
log.Warnf("field oneTime in job %s is deprecated in favor of 'bootJob' directies", job.Name)
}

if job.MaxAttempts_ != 0 {
log.Warnf("field max_attempts in job %s is deprecated in favor of maxAttempts", job.Name)
job.MaxAttempts = job.MaxAttempts_
Expand Down
20 changes: 16 additions & 4 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,18 @@ type JobConfig struct {
Laziness *Laziness `hcl:"lazy"`
Listeners []Listener `hcl:"listen"`
CanFail bool `hcl:"canFail"`
OneTime bool `hcl:"oneTime"`

// DEPRECATED: Use "BootJob"s instead
OneTime bool `hcl:"oneTime"`
}

type BootJobConfig struct {
Name string `hcl:",key"`
Command string `hcl:"command"`
Args []string `hcl:"args"`
Env []string `hcl:"env"`
CanFail bool `hcl:"canFail"`
Timeout string `hcl:"timeout"`
}

type File struct {
Expand All @@ -98,7 +109,8 @@ type File struct {
}

type Ignition struct {
Probes []Probe `hcl:"probe"`
Files []File `hcl:"file"`
Jobs []JobConfig `hcl:"job"`
Probes []Probe `hcl:"probe"`
Files []File `hcl:"file"`
Jobs []JobConfig `hcl:"job"`
BootJobs []BootJobConfig `hcl:"boot"`
}
55 changes: 55 additions & 0 deletions pkg/proc/bootjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package proc

import (
"context"
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"os"
"os/exec"
)

func (job *BootJob) Run(ctx context.Context) error {
l := log.WithField("job.name", job.Config.Name)

ctx, job.cancelProcess = context.WithCancel(ctx)

job.cmd = exec.CommandContext(ctx, job.Config.Command, job.Config.Args...)
job.cmd.Stdout = os.Stdout
job.cmd.Stderr = os.Stderr
if job.Config.Env != nil {
job.cmd.Env = append(os.Environ(), job.Config.Env...)
}

l.Info("starting boot job")

err := job.cmd.Start()
if err != nil {
return fmt.Errorf("failed to start job %s: %s", job.Config.Name, err.Error())
}

job.process = job.cmd.Process

err = job.cmd.Wait()
if err != nil {
l.WithError(err).Error("job exited with error")
} else {
l.Info("boot job completed")
}

if ctx.Err() != nil { // execution cancelled
return ctx.Err()
}

if err != nil {
if job.Config.CanFail {
l.WithError(err).Warn("job failed, but is allowed to fail")
return nil
}

l.WithError(err).Error("boot job failed")
return errors.Wrapf(err, "error while exec'ing boot job '%s'", job.Config.Name)
}

return nil
}
70 changes: 58 additions & 12 deletions pkg/proc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,66 @@ import (
log "github.com/sirupsen/logrus"
)

func NewRunner(ctx context.Context, ignitionConfig *config.Ignition) *Runner {
func NewRunner(ignitionConfig *config.Ignition) *Runner {
return &Runner{
IgnitionConfig: ignitionConfig,
jobs: []*Job{},
ctx: ctx,
jobs: make([]*Job, 0, len(ignitionConfig.Jobs)),
bootJobs: make([]*BootJob, 0, len(ignitionConfig.BootJobs)),
}
}

func waitGroupToChannel(wg *sync.WaitGroup) <-chan struct{} {
d := make(chan struct{})
go func() {
wg.Wait()
close(d)
}()

return d
}

func (r *Runner) Boot(ctx context.Context) error {
wg := sync.WaitGroup{}
errs := make(chan error)

for j := range r.IgnitionConfig.BootJobs {
job, err := NewBootJob(&r.IgnitionConfig.BootJobs[j])
if err != nil {
return err
}

r.bootJobs = append(r.bootJobs, job)
}

for _, job := range r.bootJobs {
wg.Add(1)
go func(job *BootJob) {
defer wg.Done()

if err := job.Run(ctx); err != nil {
errs <- err
}
}(job)
}

select {
case <-waitGroupToChannel(&wg):
return nil

case <-ctx.Done():
log.Warn("context cancelled")
return ctx.Err()

case err, ok := <-errs:
if ok && err != nil {
log.Error("job return error, shutting down other services")
return err
}
}

return nil
}

func (r *Runner) exec(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error) {
for j := range r.IgnitionConfig.Jobs {
job, err := NewJob(&r.IgnitionConfig.Jobs[j])
Expand All @@ -42,24 +94,18 @@ func (r *Runner) exec(ctx context.Context, wg *sync.WaitGroup, errChan chan<- er
}
}

func (r *Runner) Run() error {
func (r *Runner) Run(ctx context.Context) error {
errChan := make(chan error)
ticker := time.NewTicker(5 * time.Second)

wg := sync.WaitGroup{}

r.exec(r.ctx, &wg, errChan)

allDone := make(chan struct{})
go func() {
wg.Wait()
close(allDone)
}()
r.exec(ctx, &wg, errChan)

for {
select {
// wait for them all to finish, or one to fail
case <-allDone:
case <-waitGroupToChannel(&wg):
return nil

// watch files
Expand Down
29 changes: 28 additions & 1 deletion pkg/proc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ import (
type Runner struct {
IgnitionConfig *config.Ignition
jobs []*Job
ctx context.Context
bootJobs []*BootJob
}

type BootJob struct {
Config *config.BootJobConfig
cmd *exec.Cmd
process *os.Process
timeout time.Duration
cancelProcess context.CancelFunc
}

type Job struct {
Expand Down Expand Up @@ -63,3 +71,22 @@ func NewJob(c *config.JobConfig) (*Job, error) {

return &j, nil
}

func NewBootJob(c *config.BootJobConfig) (*BootJob, error) {
bj := BootJob{
Config: c,
}

if ts := c.Timeout; ts != "" {
t, err := time.ParseDuration(ts)
if err != nil {
return nil, err
}

bj.timeout = t
} else {
bj.timeout = 30 * time.Second
}

return &bj, nil
}

0 comments on commit 3410424

Please sign in to comment.