Skip to content

Commit

Permalink
Merge pull request #19 from martin-helmich/feature/lazy-activation
Browse files Browse the repository at this point in the history
Lazy activation for jobs
  • Loading branch information
martin-helmich authored Mar 19, 2020
2 parents 1e172c9 + 4739c46 commit d2ba37e
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 60 deletions.
66 changes: 63 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,66 @@ CMD ["up","--config-dir", "/etc/mittnite.d"]
```
## Configuration
The directory specified with `--config-dir`, or the shorthand `-c`, can contain any number of `.hcl` configuration files.
The directory specified with `--config-dir`, or the shorthand `-c`, can contain any number of `.hcl` configuration files.
All files in that directory are loaded by `mittnite` on startup and can contain any of the configuration directives.
### Directives
#### Job
Possible directives to use in a job definition.
A `Job` describes a runnable process that should be started by mittnite on startup.
A `Job` consists of a `command` and (optional) `args`. When a process started by a Job fails, it will be restarted for a maximum of `maxAttempts` attempts. If it fails for more than `maxAttempts` time, mittnite itself will terminate to allow your container runtime to handle the failure.
```hcl
job "foo" {
command = "/usr/local/bin/foo"
args = "bar"
args = ["bar"]
maxAttempts = 3
canFail = false
}
```
You can configure a Job to watch files and to send a signal to the managed process if that file changes. This can be used, for example, to send a `SIGHUP` to a process to reload its configuration file when it changes.
```hcl
job "foo" {
// ...
watch "/etc/conf.d/barfoo" {
signal = 12
}
}
```
You can also configure a Job to start its process only on the first incoming request (a bit like [systemd's socket activation](https://www.freedesktop.org/software/systemd/man/systemd.socket.html)). In order to configure this, you need a `listener` and a `lazy` configuration:
```hcl
job "foo" {
command = "http-server"
args = ["-p8081", "-a127.0.0.1"]

lazy {
spinUpTimeout = "5s"
coolDownTimeout = "15m"
}

listen "0.0.0.0:8080" {
forward = "127.0.0.1:8081"
}
}
```
The `listen` block will instruct mittnite itself to listen on the specified address; each connection accepted on that port will be forwarded to the address specified by `forward` (**NOTE**: mittnite will do some simple layer-4 forwarding; if your upstream service depends on working with the actual client IP addresses, you'll only see the local IP address).
If there is a `lazy` block present, the process itself will only be started when the first connection is opened. If the process takes some time to start up, the connection will be held for that time (the client will not notice any of this, except for the time the process needs to spin up). mittnite will wait for a duration of at most `.lazy.spinUpTimeout` for the process to accept connection; if this timeout is exceeded, the client connection will be closed.
When no connections have been active for a duration of at least `.lazy.coolDownTimeout`, mittnite will terminate the process again.
#### File
Possible directives to use in a file definition.
```hcl
Expand All @@ -129,7 +168,9 @@ file "/path/to/second_file.txt" {
```
#### Probe
Possible directives to use in a probe definition.
```hcl
probe "probe-name" {
wait = true
Expand Down Expand Up @@ -192,6 +233,7 @@ probe "probe-name" {
Specifying a `port` is optional and defaults to the services default port.
### HCL examples
#### Start a process
```hcl
Expand All @@ -204,6 +246,24 @@ job webserver {
}
```
#### Start a process lazily on first request
```hcl
job webserver {
command = "/usr/bin/http-server"
args = ["-p8081", "-a127.0.0.1"]
lazy {
spinUpTimeout = "5s"
coolDownTimeout = "15m"
}
listen "0.0.0.0:8080" {
forward = "127.0.0.1:8081"
}
}
```
#### Render a file on startup
```hcl
Expand Down
10 changes: 8 additions & 2 deletions examples/test.d/database.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ probe mysql {
mysql {
user = "test"
password = "test"
host = "localhost"
host {
hostname = "localhost"
port = 3306
}
database = "test"
}
}

probe redis {
wait = true
redis {
host = "localhost"
host {
hostname = "localhost"
port = 6379
}
}
}

Expand Down
20 changes: 16 additions & 4 deletions examples/test.d/processes.hcl
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
file test.txt {
from = "test.d/test.txt.tpl"
file "test.txt" {
from = "examples/test.d/test.txt.tpl"

params = {
foo = "bar"
}
}

job webserver {
command = "/usr/bin/http-server"
command = "/usr/local/bin/http-server"
args = ["-p", "8080", "-a", "127.0.0.1"]

watch "./test.txt" {
signal = 12 # USR2
}

// lazy {
// spinUpTimeout = "5s"
// coolDownTimeout = "1m"
// }

listen "0.0.0.0:8081" {
forward = "127.0.0.1:8080"
}
}

probe http {
wait = true
http {
host = "google.de"
host {
hostname = "google.de"
}
timeout = "3s"
}
}
29 changes: 21 additions & 8 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,28 @@ type Watch struct {
Signal int
}

type Listener struct {
Address string `hcl:",key"`
Forward string `hcl:"forward"`
Protocol string `hcl:"protocol"`
}

type Laziness struct {
SpinUpTimeout string `hcl:"spinUpTimeout"`
CoolDownTimeout string `hcl:"coolDownTimeout"`
}

type JobConfig struct {
Name string `hcl:",key"`
Command string `hcl:"command"`
Args []string `hcl:"args"`
Watches []Watch `hcl:"watch"`
MaxAttempts_ int `hcl:"max_attempts"` // deprecated
MaxAttempts int `hcl:"maxAttempts"`
CanFail bool `hcl:"canFail"`
OneTime bool `hcl:"oneTime"`
Name string `hcl:",key"`
Command string `hcl:"command"`
Args []string `hcl:"args"`
Watches []Watch `hcl:"watch"`
MaxAttempts_ int `hcl:"max_attempts"` // deprecated
MaxAttempts int `hcl:"maxAttempts"`
Laziness *Laziness `hcl:"lazy"`
Listeners []Listener `hcl:"listen"`
CanFail bool `hcl:"canFail"`
OneTime bool `hcl:"oneTime"`
}

type File struct {
Expand Down
104 changes: 65 additions & 39 deletions pkg/proc/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -33,54 +33,76 @@ func (job *Job) Init() {
}
}

func (job *Job) Run(ctx context.Context) error {
attempts := 0
maxAttempts := job.Config.MaxAttempts
func (job *Job) Run(ctx context.Context, errors chan<- error) error {
ctx, job.cancelAll = context.WithCancel(ctx)

if maxAttempts == 0 {
maxAttempts = 3
}

for { // restart failed jobs as long mittnite is running

log.Infof("starting job %s", job.Config.Name)
job.cmd = exec.CommandContext(ctx, job.Config.Command, job.Config.Args...)
job.cmd.Stdout = os.Stdout
job.cmd.Stderr = os.Stderr
listerWaitGroup := sync.WaitGroup{}
defer listerWaitGroup.Wait()

err := job.cmd.Start()
for i := range job.Config.Listeners {
listener, err := NewListener(job, &job.Config.Listeners[i])
if err != nil {
return fmt.Errorf("failed to start job %s: %s", job.Config.Name, err.Error())
return err
}

err = job.cmd.Wait()
if err != nil {
log.Errorf("job %s exited with error: %s", job.Config.Name, err)
} else {
if job.Config.OneTime {
log.Infof("one-time job %s has ended successfully", job.Config.Name)
return nil
}
log.Warnf("job %s exited without errors", job.Config.Name)
}
listerWaitGroup.Add(1)

if ctx.Err() != nil { // execution cancelled
return nil
}
go func() {
listerWaitGroup.Wait()
}()

attempts++
if attempts < maxAttempts {
log.Infof("job %s has %d attempts remaining", job.Config.Name, maxAttempts-attempts)
continue
}
go func() {
if err := listener.Run(ctx); err != nil {
log.WithError(err).Error("listener stopped with error")
errors <- err
}

if job.Config.CanFail {
log.Warnf("")
return nil
}
listerWaitGroup.Done()
}()
}

if job.CanStartLazily() {
job.startProcessReaper(ctx)

return fmt.Errorf("reached max retries for job %s", job.Config.Name)
log.Infof("holding off starting job %s until first request", job.Config.Name)
return nil
}

p := make(chan *os.Process)
go func() {
job.process = <-p
}()

return job.start(ctx, p)
}

func (job *Job) startProcessReaper(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if job.activeConnections > 0 {
continue
}

diff := time.Since(job.lastConnectionClosed)
if diff < job.coolDownTimeout {
continue
}

job.lazyStartLock.Lock()

if job.cancelProcess != nil {
job.cancelProcess()
}

job.lazyStartLock.Unlock()
}
}
}()
}

func (job *Job) Signal(sig os.Signal) {
Expand All @@ -90,6 +112,10 @@ func (job *Job) Signal(sig os.Signal) {
}
}

if sig == syscall.SIGTERM && job.cancelAll != nil {
job.cancelAll()
}

if job.cmd == nil || job.cmd.Process == nil {
errFunc(
fmt.Errorf("job is not running"),
Expand Down
Loading

0 comments on commit d2ba37e

Please sign in to comment.