From 06beb79601d83cb77d21c10575aeda425b333706 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 31 Aug 2022 13:54:50 -0600 Subject: [PATCH] hfexec: add InterruptChannel option to allow a user triggered interrupt This is an alternative to #332, adding an option that enables the user to implement graceful shutdowns for Apply and Destroy operations. Rather than adding an option that changes the behavior of the input context, we instead add an option that specifically sends an interrupt to the terraform process. The input context behavior remains unchanged. This requires the caller to do a bit more orchestration work for timeouts, but keeps context truer to the "abandon work" intent. This also allows users to force quit _even if_ they are in the middle of a graceful shutdown, rathern than having one behavior mutually exclusive with the other. --- tfexec/apply.go | 19 +++-- tfexec/apply_test.go | 7 +- tfexec/cmd.go | 8 +- tfexec/cmd_default.go | 14 ++++ tfexec/cmd_default_test.go | 24 ++++++ tfexec/cmd_linux.go | 14 ++++ tfexec/destroy.go | 17 +++- tfexec/destroy_test.go | 12 ++- .../e2etest/graceful_termination_test.go | 84 +++++++++++++++++++ .../e2etest/testdata/infinite_loop/main.tf | 8 ++ tfexec/options.go | 15 ++++ 11 files changed, 209 insertions(+), 13 deletions(-) create mode 100644 tfexec/internal/e2etest/graceful_termination_test.go create mode 100644 tfexec/internal/e2etest/testdata/infinite_loop/main.tf diff --git a/tfexec/apply.go b/tfexec/apply.go index 40d9e69b..dbe02f31 100644 --- a/tfexec/apply.go +++ b/tfexec/apply.go @@ -8,6 +8,8 @@ import ( ) type applyConfig struct { + interruptCh <-chan struct{} + backup string dirOrPlan string lock bool @@ -42,6 +44,10 @@ func (opt *ParallelismOption) configureApply(conf *applyConfig) { conf.parallelism = opt.parallelism } +func (opt *InterruptChannelOption) configureApply(conf *applyConfig) { + conf.interruptCh = opt.interrupt +} + func (opt *BackupOption) configureApply(conf *applyConfig) { conf.backup = opt.path } @@ -92,14 +98,17 @@ func (opt *ReattachOption) configureApply(conf *applyConfig) { // Apply represents the terraform apply subcommand. func (tf *Terraform) Apply(ctx context.Context, opts ...ApplyOption) error { - cmd, err := tf.applyCmd(ctx, opts...) + cmd, cfg, err := tf.applyCmd(ctx, opts...) if err != nil { return err } + if cfg.interruptCh != nil { + ctx = context.WithValue(ctx, interruptContext, cfg.interruptCh) + } return tf.runTerraformCmd(ctx, cmd) } -func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.Cmd, error) { +func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.Cmd, *applyConfig, error) { c := defaultApplyOptions for _, o := range opts { @@ -134,7 +143,7 @@ func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.C if c.replaceAddrs != nil { err := tf.compatible(ctx, tf0_15_2, nil) if err != nil { - return nil, fmt.Errorf("replace option was introduced in Terraform 0.15.2: %w", err) + return nil, nil, fmt.Errorf("replace option was introduced in Terraform 0.15.2: %w", err) } for _, addr := range c.replaceAddrs { args = append(args, "-replace="+addr) @@ -160,10 +169,10 @@ func (tf *Terraform) applyCmd(ctx context.Context, opts ...ApplyOption) (*exec.C if c.reattachInfo != nil { reattachStr, err := c.reattachInfo.marshalString() if err != nil { - return nil, err + return nil, nil, err } mergeEnv[reattachEnvVar] = reattachStr } - return tf.buildTerraformCmd(ctx, mergeEnv, args...), nil + return tf.buildTerraformCmd(ctx, mergeEnv, args...), &c, nil } diff --git a/tfexec/apply_test.go b/tfexec/apply_test.go index 1cf2f562..f8f751e4 100644 --- a/tfexec/apply_test.go +++ b/tfexec/apply_test.go @@ -19,7 +19,7 @@ func TestApplyCmd(t *testing.T) { tf.SetEnv(map[string]string{}) t.Run("basic", func(t *testing.T) { - applyCmd, err := tf.applyCmd(context.Background(), + applyCmd, cfg, err := tf.applyCmd(context.Background(), Backup("testbackup"), LockTimeout("200s"), State("teststate"), @@ -36,6 +36,7 @@ func TestApplyCmd(t *testing.T) { Var("var1=foo"), Var("var2=bar"), DirOrPlan("testfile"), + InterruptChannel(make(chan struct{})), ) if err != nil { t.Fatal(err) @@ -63,5 +64,9 @@ func TestApplyCmd(t *testing.T) { "-var", "var2=bar", "testfile", }, nil, applyCmd) + + if cfg.interruptCh == nil { + t.Fatal("interrupt signal is unexpectedly nil") + } }) } diff --git a/tfexec/cmd.go b/tfexec/cmd.go index 56393a00..a8cb82d5 100644 --- a/tfexec/cmd.go +++ b/tfexec/cmd.go @@ -16,6 +16,12 @@ import ( "github.com/hashicorp/terraform-exec/internal/version" ) +// If using the InterruptSignal option, we stuff the interrupt channel into the +// context to keep our APIs simpler (and non-changing). +// +// context.WithValue(ctx, interruptContext, interruptCh) +var interruptContext = new(struct{}) + const ( checkpointDisableEnvVar = "CHECKPOINT_DISABLE" cliArgsEnvVar = "TF_CLI_ARGS" @@ -191,7 +197,7 @@ func (tf *Terraform) buildTerraformCmd(ctx context.Context, mergeEnv map[string] } func (tf *Terraform) runTerraformCmdJSON(ctx context.Context, cmd *exec.Cmd, v interface{}) error { - var outbuf = bytes.Buffer{} + var outbuf bytes.Buffer cmd.Stdout = mergeWriters(cmd.Stdout, &outbuf) err := tf.runTerraformCmd(ctx, cmd) diff --git a/tfexec/cmd_default.go b/tfexec/cmd_default.go index 6d7b768e..66ef7434 100644 --- a/tfexec/cmd_default.go +++ b/tfexec/cmd_default.go @@ -5,6 +5,7 @@ package tfexec import ( "context" + "os" "os/exec" "strings" "sync" @@ -47,6 +48,19 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { return tf.wrapExitError(ctx, err, "") } + if interruptCh := ctx.Value(interruptContext); interruptCh != nil { + exited := make(chan struct{}) + defer close(exited) + go func() { + select { + case <-interruptCh.(<-chan struct{}): + cmd.Process.Signal(os.Interrupt) + case <-exited: + case <-ctx.Done(): + } + }() + } + var errStdout, errStderr error var wg sync.WaitGroup wg.Add(1) diff --git a/tfexec/cmd_default_test.go b/tfexec/cmd_default_test.go index 0245ddaa..0f8b7760 100644 --- a/tfexec/cmd_default_test.go +++ b/tfexec/cmd_default_test.go @@ -6,6 +6,7 @@ package tfexec import ( "bytes" "context" + "errors" "log" "strings" "testing" @@ -37,3 +38,26 @@ func Test_runTerraformCmd_default(t *testing.T) { t.Fatal("canceling context should not lead to logging an error") } } + +func Test_runTerraformCmdCancel_default(t *testing.T) { + var buf bytes.Buffer + + tf := &Terraform{ + logger: log.New(&buf, "", 0), + execPath: "sleep", + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cmd := tf.buildTerraformCmd(ctx, nil, "10") + go func() { + time.Sleep(time.Second) + cancel() + }() + + err := tf.runTerraformCmd(ctx, cmd) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %T %s", err, err) + } +} diff --git a/tfexec/cmd_linux.go b/tfexec/cmd_linux.go index 6fa40e0a..d44977d0 100644 --- a/tfexec/cmd_linux.go +++ b/tfexec/cmd_linux.go @@ -2,6 +2,7 @@ package tfexec import ( "context" + "os" "os/exec" "strings" "sync" @@ -52,6 +53,19 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { return tf.wrapExitError(ctx, err, "") } + if interruptCh := ctx.Value(interruptContext); interruptCh != nil { + exited := make(chan struct{}) + defer close(exited) + go func() { + select { + case <-interruptCh.(<-chan struct{}): + cmd.Process.Signal(os.Interrupt) + case <-exited: + case <-ctx.Done(): + } + }() + } + var errStdout, errStderr error var wg sync.WaitGroup wg.Add(1) diff --git a/tfexec/destroy.go b/tfexec/destroy.go index 8011c0ba..17fc1f04 100644 --- a/tfexec/destroy.go +++ b/tfexec/destroy.go @@ -8,6 +8,8 @@ import ( ) type destroyConfig struct { + interruptCh <-chan struct{} + backup string dir string lock bool @@ -46,6 +48,10 @@ func (opt *ParallelismOption) configureDestroy(conf *destroyConfig) { conf.parallelism = opt.parallelism } +func (opt *InterruptChannelOption) configureDestroy(conf *destroyConfig) { + conf.interruptCh = opt.interrupt +} + func (opt *BackupOption) configureDestroy(conf *destroyConfig) { conf.backup = opt.path } @@ -88,14 +94,17 @@ func (opt *ReattachOption) configureDestroy(conf *destroyConfig) { // Destroy represents the terraform destroy subcommand. func (tf *Terraform) Destroy(ctx context.Context, opts ...DestroyOption) error { - cmd, err := tf.destroyCmd(ctx, opts...) + cmd, cfg, err := tf.destroyCmd(ctx, opts...) if err != nil { return err } + if cfg.interruptCh != nil { + ctx = context.WithValue(ctx, interruptContext, cfg.interruptCh) + } return tf.runTerraformCmd(ctx, cmd) } -func (tf *Terraform) destroyCmd(ctx context.Context, opts ...DestroyOption) (*exec.Cmd, error) { +func (tf *Terraform) destroyCmd(ctx context.Context, opts ...DestroyOption) (*exec.Cmd, *destroyConfig, error) { c := defaultDestroyOptions for _, o := range opts { @@ -147,10 +156,10 @@ func (tf *Terraform) destroyCmd(ctx context.Context, opts ...DestroyOption) (*ex if c.reattachInfo != nil { reattachStr, err := c.reattachInfo.marshalString() if err != nil { - return nil, err + return nil, nil, err } mergeEnv[reattachEnvVar] = reattachStr } - return tf.buildTerraformCmd(ctx, mergeEnv, args...), nil + return tf.buildTerraformCmd(ctx, mergeEnv, args...), &c, nil } diff --git a/tfexec/destroy_test.go b/tfexec/destroy_test.go index eb28f58a..e429eafb 100644 --- a/tfexec/destroy_test.go +++ b/tfexec/destroy_test.go @@ -19,7 +19,7 @@ func TestDestroyCmd(t *testing.T) { tf.SetEnv(map[string]string{}) t.Run("defaults", func(t *testing.T) { - destroyCmd, err := tf.destroyCmd(context.Background()) + destroyCmd, cfg, err := tf.destroyCmd(context.Background()) if err != nil { t.Fatal(err) } @@ -34,10 +34,14 @@ func TestDestroyCmd(t *testing.T) { "-parallelism=10", "-refresh=true", }, nil, destroyCmd) + + if cfg.interruptCh != nil { + t.Fatal("interrupt signal is unexpectedly non-nil") + } }) t.Run("override all defaults", func(t *testing.T) { - destroyCmd, err := tf.destroyCmd(context.Background(), Backup("testbackup"), LockTimeout("200s"), State("teststate"), StateOut("teststateout"), VarFile("testvarfile"), Lock(false), Parallelism(99), Refresh(false), Target("target1"), Target("target2"), Var("var1=foo"), Var("var2=bar"), Dir("destroydir")) + destroyCmd, cfg, err := tf.destroyCmd(context.Background(), Backup("testbackup"), LockTimeout("200s"), State("teststate"), StateOut("teststateout"), VarFile("testvarfile"), Lock(false), Parallelism(99), Refresh(false), Target("target1"), Target("target2"), Var("var1=foo"), Var("var2=bar"), Dir("destroydir"), InterruptChannel(make(chan struct{}))) if err != nil { t.Fatal(err) } @@ -61,5 +65,9 @@ func TestDestroyCmd(t *testing.T) { "-var", "var2=bar", "destroydir", }, nil, destroyCmd) + + if cfg.interruptCh == nil { + t.Fatal("interrupt signal is unexpectedly nil") + } }) } diff --git a/tfexec/internal/e2etest/graceful_termination_test.go b/tfexec/internal/e2etest/graceful_termination_test.go new file mode 100644 index 00000000..99bcdc7b --- /dev/null +++ b/tfexec/internal/e2etest/graceful_termination_test.go @@ -0,0 +1,84 @@ +package e2etest + +import ( + "bytes" + "context" + "strings" + "testing" + "time" + + "github.com/hashicorp/go-version" + "github.com/hashicorp/terraform-exec/tfexec" + "github.com/hashicorp/terraform-exec/tfexec/internal/testutil" +) + +func Test_gracefulTerminationRunTerraformCmd(t *testing.T) { + runTestVersions(t, []string{testutil.Latest_v1_1}, "infinite_loop", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) { + var bufStdout bytes.Buffer + var bufStderr bytes.Buffer + tf.SetStderr(&bufStdout) + tf.SetStdout(&bufStderr) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := tf.Init(ctx) + if err != nil { + t.Fatalf("error running Init in test directory: %s", err) + } + + doneCh := make(chan error) + shutdown := make(chan struct{}) + go func() { + doneCh <- tf.Apply(ctx, tfexec.InterruptChannel(shutdown)) + }() + + time.Sleep(3 * time.Second) + close(shutdown) + err = <-doneCh + close(doneCh) + if err != nil { + t.Log(err) + } + output := bufStderr.String() + bufStdout.String() + t.Log(output) + if !strings.Contains(output, "Gracefully shutting down...") { + t.Fatal("canceling context should gracefully shut terraform down") + } + }) +} + +func Test_gracefulTerminationRunTerraformCmdWithNoGracefulShutdownTimeout(t *testing.T) { + runTestVersions(t, []string{testutil.Latest_v1_1}, "infinite_loop", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) { + var bufStdout bytes.Buffer + var bufStderr bytes.Buffer + tf.SetStderr(&bufStdout) + tf.SetStdout(&bufStderr) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := tf.Init(ctx) + if err != nil { + t.Fatalf("error running Init in test directory: %s", err) + } + + doneCh := make(chan error) + go func() { + doneCh <- tf.Apply(ctx, tfexec.InterruptChannel(make(chan struct{}))) + }() + + time.Sleep(3 * time.Second) + cancel() + err = <-doneCh + close(doneCh) + if err != nil { + t.Log(err) + } + output := bufStderr.String() + bufStdout.String() + t.Log(output) + if strings.Contains(output, "Gracefully shutting down...") { + t.Fatal("canceling context with no graceful shutdown timeout should immediately kill the process and not start a graceful cancellation") + } + }) +} diff --git a/tfexec/internal/e2etest/testdata/infinite_loop/main.tf b/tfexec/internal/e2etest/testdata/infinite_loop/main.tf new file mode 100644 index 00000000..1fc938ee --- /dev/null +++ b/tfexec/internal/e2etest/testdata/infinite_loop/main.tf @@ -0,0 +1,8 @@ +resource "null_resource" "example1" { + triggers = { + always_run = "${timestamp()}" + } + provisioner "local-exec" { + command = " while true; do echo 'Hit CTRL+C'; sleep 1; done" + } +} diff --git a/tfexec/options.go b/tfexec/options.go index ad3cc65c..d05f4f34 100644 --- a/tfexec/options.go +++ b/tfexec/options.go @@ -240,6 +240,21 @@ func GraphPlan(file string) *GraphPlanOption { return &GraphPlanOption{file} } +type InterruptChannelOption struct { + interrupt <-chan struct{} +} + +// InterruptChannel accepts a channel that can trigger an interrupt signal to +// be sent to the running Terraform process. This allows you to orchestrate a +// graceful shutdown as a caller: close the interrupt channel to begin a +// graceful shutdown, and if enough time has elapsed, cancel the context to +// force a hard shutdown. +// +// The interrupt channel is received from once. +func InterruptChannel(interrupt <-chan struct{}) *InterruptChannelOption { + return &InterruptChannelOption{interrupt} +} + type PlatformOption struct { platform string }