Skip to content

Commit

Permalink
tfexec: add InterruptChannel option to allow a user triggered interrupt
Browse files Browse the repository at this point in the history
This is an alternative to hashicorp#332, adding an option that enables the user
to implement graceful shutdowns for Apply and Destroy operations.

Rather than adding an option that changes the behavior of the input
context, we instead add an option that specifically sends an interrupt
to the terraform process. The input context behavior remains unchanged.

This requires the caller to do a bit more orchestration work for
timeouts, but keeps context truer to the "abandon work" intent. This
also allows users to force quit _even if_ they are in the middle of a
graceful shutdown, rathern than having one behavior mutually exclusive
with the other.
  • Loading branch information
twmb committed Aug 31, 2022
1 parent 24e3216 commit bfad2c5
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 13 deletions.
19 changes: 14 additions & 5 deletions tfexec/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

type applyConfig struct {
interruptCh <-chan struct{}

backup string
dirOrPlan string
lock bool
Expand Down Expand Up @@ -42,6 +44,10 @@ func (opt *ParallelismOption) configureApply(conf *applyConfig) {
conf.parallelism = opt.parallelism
}

func (opt *InterruptSignalOption) configureApply(conf *applyConfig) {
conf.interruptCh = opt.interrupt
}

func (opt *BackupOption) configureApply(conf *applyConfig) {
conf.backup = opt.path
}
Expand Down Expand Up @@ -92,14 +98,17 @@ func (opt *ReattachOption) configureApply(conf *applyConfig) {

// Apply represents the terraform apply subcommand.
func (tf *Terraform) Apply(ctx context.Context, opts ...ApplyOption) error {
cmd, err := tf.applyCmd(ctx, opts...)
cmd, cfg, err := tf.applyCmd(ctx, opts...)
if err != nil {
return err
}
if cfg.interruptCh != nil {
ctx = context.WithValue(ctx, interruptContext, cfg.interruptCh)
}
return tf.runTerraformCmd(ctx, cmd)
}

func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.Cmd, error) {
func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.Cmd, *applyConfig, error) {
c := defaultApplyOptions

for _, o := range opts {
Expand Down Expand Up @@ -134,7 +143,7 @@ func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.C
if c.replaceAddrs != nil {
err := tf.compatible(ctx, tf0_15_2, nil)
if err != nil {
return nil, fmt.Errorf("replace option was introduced in Terraform 0.15.2: %w", err)
return nil, nil, fmt.Errorf("replace option was introduced in Terraform 0.15.2: %w", err)
}
for _, addr := range c.replaceAddrs {
args = append(args, "-replace="+addr)
Expand All @@ -160,10 +169,10 @@ func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.C
if c.reattachInfo != nil {
reattachStr, err := c.reattachInfo.marshalString()
if err != nil {
return nil, err
return nil, nil, err
}
mergeEnv[reattachEnvVar] = reattachStr
}

return tf.buildTerraformCmd(ctx, mergeEnv, args...), nil
return tf.buildTerraformCmd(ctx, mergeEnv, args...), &c, nil
}
7 changes: 6 additions & 1 deletion tfexec/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestApplyCmd(t *testing.T) {
tf.SetEnv(map[string]string{})

t.Run("basic", func(t *testing.T) {
applyCmd, err := tf.applyCmd(context.Background(),
applyCmd, cfg, err := tf.applyCmd(context.Background(),
Backup("testbackup"),
LockTimeout("200s"),
State("teststate"),
Expand All @@ -36,6 +36,7 @@ func TestApplyCmd(t *testing.T) {
Var("var1=foo"),
Var("var2=bar"),
DirOrPlan("testfile"),
InterruptSignal(make(chan struct{})),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -63,5 +64,9 @@ func TestApplyCmd(t *testing.T) {
"-var", "var2=bar",
"testfile",
}, nil, applyCmd)

if cfg.interruptCh == nil {
t.Fatal("interrupt signal is unexpectedly nil")
}
})
}
8 changes: 7 additions & 1 deletion tfexec/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"github.com/hashicorp/terraform-exec/internal/version"
)

// If using the InterruptSignal option, we stuff the interrupt channel into the
// context to keep our APIs simpler (and non-changing).
//
// context.WithValue(ctx, interruptContext, interruptCh)
var interruptContext = new(struct{})

const (
checkpointDisableEnvVar = "CHECKPOINT_DISABLE"
cliArgsEnvVar = "TF_CLI_ARGS"
Expand Down Expand Up @@ -191,7 +197,7 @@ func (tf *Terraform) buildTerraformCmd(ctx context.Context, mergeEnv map[string]
}

func (tf *Terraform) runTerraformCmdJSON(ctx context.Context, cmd *exec.Cmd, v interface{}) error {
var outbuf = bytes.Buffer{}
var outbuf bytes.Buffer
cmd.Stdout = mergeWriters(cmd.Stdout, &outbuf)

err := tf.runTerraformCmd(ctx, cmd)
Expand Down
14 changes: 14 additions & 0 deletions tfexec/cmd_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package tfexec

import (
"context"
"os"
"os/exec"
"strings"
"sync"
Expand Down Expand Up @@ -47,6 +48,19 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
return tf.wrapExitError(ctx, err, "")
}

if interruptCh := ctx.Value(interruptContext); interruptCh != nil {
exited := make(chan struct{})
defer close(exited)
go func() {
select {
case <-interruptCh.(<-chan struct{}):
cmd.Process.Signal(os.Interrupt)
case <-exited:
case <-ctx.Done():
}
}()
}

var errStdout, errStderr error
var wg sync.WaitGroup
wg.Add(1)
Expand Down
24 changes: 24 additions & 0 deletions tfexec/cmd_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package tfexec
import (
"bytes"
"context"
"errors"
"log"
"strings"
"testing"
Expand Down Expand Up @@ -37,3 +38,26 @@ func Test_runTerraformCmd_default(t *testing.T) {
t.Fatal("canceling context should not lead to logging an error")
}
}

func Test_runTerraformCmdCancel_default(t *testing.T) {
var buf bytes.Buffer

tf := &Terraform{
logger: log.New(&buf, "", 0),
execPath: "sleep",
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cmd := tf.buildTerraformCmd(ctx, nil, "10")
go func() {
time.Sleep(time.Second)
cancel()
}()

err := tf.runTerraformCmd(ctx, cmd)
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %T %s", err, err)
}
}
13 changes: 13 additions & 0 deletions tfexec/cmd_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
return tf.wrapExitError(ctx, err, "")
}

if interruptCh := ctx.Value(interruptContext); interruptCh != nil {
exited := make(chan struct{})
defer close(exited)
go func() {
select {
case <-interruptCh.(<-chan struct{}):
cmd.Process.Signal(os.Interrupt)
case <-exited:
case <-ctx.Done():
}
}()
}

var errStdout, errStderr error
var wg sync.WaitGroup
wg.Add(1)
Expand Down
17 changes: 13 additions & 4 deletions tfexec/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

type destroyConfig struct {
interruptCh <-chan struct{}

backup string
dir string
lock bool
Expand Down Expand Up @@ -46,6 +48,10 @@ func (opt *ParallelismOption) configureDestroy(conf *destroyConfig) {
conf.parallelism = opt.parallelism
}

func (opt *InterruptSignalOption) configureDestroy(conf *destroyConfig) {
conf.interruptCh = opt.interrupt
}

func (opt *BackupOption) configureDestroy(conf *destroyConfig) {
conf.backup = opt.path
}
Expand Down Expand Up @@ -88,14 +94,17 @@ func (opt *ReattachOption) configureDestroy(conf *destroyConfig) {

// Destroy represents the terraform destroy subcommand.
func (tf *Terraform) Destroy(ctx context.Context, opts ...DestroyOption) error {
cmd, err := tf.destroyCmd(ctx, opts...)
cmd, cfg, err := tf.destroyCmd(ctx, opts...)
if err != nil {
return err
}
if cfg.interruptCh != nil {
ctx = context.WithValue(ctx, interruptContext, cfg.interruptCh)
}
return tf.runTerraformCmd(ctx, cmd)
}

func (tf *Terraform) destroyCmd(ctx context.Context, opts ...DestroyOption) (*exec.Cmd, error) {
func (tf *Terraform) destroyCmd(ctx context.Context, opts ...DestroyOption) (*exec.Cmd, *destroyConfig, error) {
c := defaultDestroyOptions

for _, o := range opts {
Expand Down Expand Up @@ -147,10 +156,10 @@ func (tf *Terraform) destroyCmd(ctx context.Context, opts ...DestroyOption) (*ex
if c.reattachInfo != nil {
reattachStr, err := c.reattachInfo.marshalString()
if err != nil {
return nil, err
return nil, nil, err
}
mergeEnv[reattachEnvVar] = reattachStr
}

return tf.buildTerraformCmd(ctx, mergeEnv, args...), nil
return tf.buildTerraformCmd(ctx, mergeEnv, args...), &c, nil
}
12 changes: 10 additions & 2 deletions tfexec/destroy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestDestroyCmd(t *testing.T) {
tf.SetEnv(map[string]string{})

t.Run("defaults", func(t *testing.T) {
destroyCmd, err := tf.destroyCmd(context.Background())
destroyCmd, cfg, err := tf.destroyCmd(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -34,10 +34,14 @@ func TestDestroyCmd(t *testing.T) {
"-parallelism=10",
"-refresh=true",
}, nil, destroyCmd)

if cfg.interruptCh != nil {
t.Fatal("interrupt signal is unexpectedly non-nil")
}
})

t.Run("override all defaults", func(t *testing.T) {
destroyCmd, err := tf.destroyCmd(context.Background(), Backup("testbackup"), LockTimeout("200s"), State("teststate"), StateOut("teststateout"), VarFile("testvarfile"), Lock(false), Parallelism(99), Refresh(false), Target("target1"), Target("target2"), Var("var1=foo"), Var("var2=bar"), Dir("destroydir"))
destroyCmd, cfg, err := tf.destroyCmd(context.Background(), Backup("testbackup"), LockTimeout("200s"), State("teststate"), StateOut("teststateout"), VarFile("testvarfile"), Lock(false), Parallelism(99), Refresh(false), Target("target1"), Target("target2"), Var("var1=foo"), Var("var2=bar"), Dir("destroydir"), InterruptSignal(make(chan struct{})))
if err != nil {
t.Fatal(err)
}
Expand All @@ -61,5 +65,9 @@ func TestDestroyCmd(t *testing.T) {
"-var", "var2=bar",
"destroydir",
}, nil, destroyCmd)

if cfg.interruptCh == nil {
t.Fatal("interrupt signal is unexpectedly nil")
}
})
}
84 changes: 84 additions & 0 deletions tfexec/internal/e2etest/graceful_termination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package e2etest

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/hashicorp/go-version"
"github.com/hashicorp/terraform-exec/tfexec"
"github.com/hashicorp/terraform-exec/tfexec/internal/testutil"
)

func Test_gracefulTerminationRunTerraformCmd(t *testing.T) {
runTestVersions(t, []string{testutil.Latest_v1_1}, "infinite_loop", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) {
var bufStdout bytes.Buffer
var bufStderr bytes.Buffer
tf.SetStderr(&bufStdout)
tf.SetStdout(&bufStderr)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := tf.Init(ctx)
if err != nil {
t.Fatalf("error running Init in test directory: %s", err)
}

doneCh := make(chan error)
shutdown := make(chan struct{})
go func() {
doneCh <- tf.Apply(ctx, tfexec.InterruptChannel(shutdown))
}()

time.Sleep(3 * time.Second)
close(shutdown)
err = <-doneCh
close(doneCh)
if err != nil {
t.Log(err)
}
output := bufStderr.String() + bufStdout.String()
t.Log(output)
if !strings.Contains(output, "Gracefully shutting down...") {
t.Fatal("canceling context should gracefully shut terraform down")
}
})
}

func Test_gracefulTerminationRunTerraformCmdWithNoGracefulShutdownTimeout(t *testing.T) {
runTestVersions(t, []string{testutil.Latest_v1_1}, "infinite_loop", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) {
var bufStdout bytes.Buffer
var bufStderr bytes.Buffer
tf.SetStderr(&bufStdout)
tf.SetStdout(&bufStderr)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := tf.Init(ctx)
if err != nil {
t.Fatalf("error running Init in test directory: %s", err)
}

doneCh := make(chan error)
go func() {
doneCh <- tf.Apply(ctx, tfexec.InterruptChannel(make(chan struct{})))
}()

time.Sleep(3 * time.Second)
cancel()
err = <-doneCh
close(doneCh)
if err != nil {
t.Log(err)
}
output := bufStderr.String() + bufStdout.String()
t.Log(output)
if strings.Contains(output, "Gracefully shutting down...") {
t.Fatal("canceling context with no graceful shutdown timeout should immediately kill the process and not start a graceful cancellation")
}
})
}
8 changes: 8 additions & 0 deletions tfexec/internal/e2etest/testdata/infinite_loop/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
resource "null_resource" "example1" {
triggers = {
always_run = "${timestamp()}"
}
provisioner "local-exec" {
command = " while true; do echo 'Hit CTRL+C'; sleep 1; done"
}
}
15 changes: 15 additions & 0 deletions tfexec/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ func GraphPlan(file string) *GraphPlanOption {
return &GraphPlanOption{file}
}

type InterruptSignalOption struct {
interrupt <-chan struct{}
}

// InterruptChannel accepts a channel that can trigger an interrupt signal to
// be sent to the running Terraform process. This allows you to orchestrate a a
// graceful shutdown as a caller: close the interrupt channel to begin a
// graceful shutdown, and if enough time has elapsed, cancel the context to
// force a hard shutdown.
//
// The interrupt channel is received from once.
func InterruptChannel(interrupt <-chan struct{}) *InterruptSignalOption {
return &InterruptSignalOption{interrupt}
}

type PlatformOption struct {
platform string
}
Expand Down

0 comments on commit bfad2c5

Please sign in to comment.