Skip to content

Commit

Permalink
Merge pull request #926 from segmentio/parallel-plans-upstream
Browse files Browse the repository at this point in the history
Add support for parallel plans
  • Loading branch information
lkysow authored May 25, 2020
2 parents f109edc + 67ef5fc commit 6418b57
Show file tree
Hide file tree
Showing 25 changed files with 630 additions and 179 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/pelletier/go-toml v1.0.0 // indirect
github.com/petergtz/pegomock v2.7.0+incompatible
github.com/pkg/errors v0.8.0
github.com/remeh/sizedwaitgroup v1.0.0
github.com/shurcooL/githubv4 v0.0.0-20191127044304-8f68eb5628d0
github.com/shurcooL/graphql v0.0.0-20181231061246-d48a9a75455f // indirect
github.com/sirupsen/logrus v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/githubv4 v0.0.0-20191127044304-8f68eb5628d0 h1:T9uus1QvcPgeLShS30YOnnzk3r9Vvygp45muhlrufgY=
Expand Down
70 changes: 68 additions & 2 deletions server/events/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package events

import (
"fmt"
"sync"

"github.com/google/go-github/v28/github"
"github.com/mcdafydd/go-azuredevops/azuredevops"
"github.com/pkg/errors"
"github.com/remeh/sizedwaitgroup"
"github.com/runatlantis/atlantis/server/events/db"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/vcs"
Expand Down Expand Up @@ -143,7 +145,15 @@ func (c *DefaultCommandRunner) RunAutoplanCommand(baseRepo models.Repo, headRepo
ctx.Log.Warn("unable to update commit status: %s", err)
}

result := c.runProjectCmds(projectCmds, models.PlanCommand)
// Only run commands in parallel if enabled
var result CommandResult
if c.parallelPlanEnabled(ctx, projectCmds) {
ctx.Log.Info("Running plans in parallel")
result = c.runProjectCmdsParallel(projectCmds, models.PlanCommand)
} else {
result = c.runProjectCmds(projectCmds, models.PlanCommand)
}

if c.automergeEnabled(ctx, projectCmds) && result.HasErrors() {
ctx.Log.Info("deleting plans because there were errors and automerge requires all plans succeed")
c.deletePlans(ctx)
Expand Down Expand Up @@ -254,7 +264,18 @@ func (c *DefaultCommandRunner) RunCommentCommand(baseRepo models.Repo, maybeHead
return
}

result := c.runProjectCmds(projectCmds, cmd.Name)
// Only run commands in parallel if enabled
var result CommandResult
if cmd.Name == models.ApplyCommand && c.parallelApplyEnabled(ctx, projectCmds) {
ctx.Log.Info("Running applies in parallel")
result = c.runProjectCmdsParallel(projectCmds, cmd.Name)
} else if cmd.Name == models.PlanCommand && c.parallelPlanEnabled(ctx, projectCmds) {
ctx.Log.Info("Running plans in parallel")
result = c.runProjectCmdsParallel(projectCmds, cmd.Name)
} else {
result = c.runProjectCmds(projectCmds, cmd.Name)
}

if cmd.Name == models.PlanCommand && c.automergeEnabled(ctx, projectCmds) && result.HasErrors() {
ctx.Log.Info("deleting plans because there were errors and automerge requires all plans succeed")
c.deletePlans(ctx)
Expand Down Expand Up @@ -339,6 +360,41 @@ func (c *DefaultCommandRunner) automerge(ctx *CommandContext, pullStatus models.
}
}

func (c *DefaultCommandRunner) runProjectCmdsParallel(cmds []models.ProjectCommandContext, cmdName models.CommandName) CommandResult {
var results []models.ProjectResult
mux := &sync.Mutex{}

wg := sizedwaitgroup.New(15)
for _, pCmd := range cmds {
pCmd := pCmd
var execute func()
wg.Add()

switch cmdName {
case models.PlanCommand:
execute = func() {
defer wg.Done()
res := c.ProjectCommandRunner.Plan(pCmd)
mux.Lock()
results = append(results, res)
mux.Unlock()
}
case models.ApplyCommand:
execute = func() {
defer wg.Done()
res := c.ProjectCommandRunner.Apply(pCmd)
mux.Lock()
results = append(results, res)
mux.Unlock()
}
}
go execute()
}

wg.Wait()
return CommandResult{ProjectResults: results}
}

func (c *DefaultCommandRunner) runProjectCmds(cmds []models.ProjectCommandContext, cmdName models.CommandName) CommandResult {
var results []models.ProjectResult
for _, pCmd := range cmds {
Expand Down Expand Up @@ -496,6 +552,16 @@ func (c *DefaultCommandRunner) automergeEnabled(ctx *CommandContext, projectCmds
(len(projectCmds) > 0 && projectCmds[0].AutomergeEnabled)
}

// parallelApplyEnabled returns true if parallel apply is enabled in this context.
func (c *DefaultCommandRunner) parallelApplyEnabled(ctx *CommandContext, projectCmds []models.ProjectCommandContext) bool {
return len(projectCmds) > 0 && projectCmds[0].ParallelApplyEnabled
}

// parallelPlanEnabled returns true if parallel plan is enabled in this context.
func (c *DefaultCommandRunner) parallelPlanEnabled(ctx *CommandContext, projectCmds []models.ProjectCommandContext) bool {
return len(projectCmds) > 0 && projectCmds[0].ParallelPlanEnabled
}

// automergeComment is the comment that gets posted when Atlantis automatically
// merges the PR.
var automergeComment = `Automatically merging because all plans have been successfully applied.`
Expand Down
6 changes: 5 additions & 1 deletion server/events/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,13 @@ type ProjectCommandContext struct {
// ApplyRequirements is the list of requirements that must be satisfied
// before we will run the apply stage.
ApplyRequirements []string
// AutoplanEnabled is true if automerge is enabled for the repo that this
// AutomergeEnabled is true if automerge is enabled for the repo that this
// project is in.
AutomergeEnabled bool
// ParallelApplyEnabled is true if parallel apply is enabled for this project.
ParallelApplyEnabled bool
// ParallelPlanEnabled is true if parallel plan is enabled for this project.
ParallelPlanEnabled bool
// AutoplanEnabled is true if autoplanning is enabled for this project.
AutoplanEnabled bool
// BaseRepo is the repository that the pull request will be merged into.
Expand Down
56 changes: 34 additions & 22 deletions server/events/project_command_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
DefaultWorkspace = "default"
// DefaultAutomergeEnabled is the default for the automerge setting.
DefaultAutomergeEnabled = false
// DefaultParallelApplyEnabled is the default for the parallel apply setting.
DefaultParallelApplyEnabled = false
// DefaultParallelPlanEnabled is the default for the parallel plan setting.
DefaultParallelPlanEnabled = false
)

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_command_builder.go ProjectCommandBuilder
Expand Down Expand Up @@ -141,7 +145,7 @@ func (p *DefaultProjectCommandBuilder) buildPlanAllCommands(ctx *CommandContext,
for _, mp := range matchingProjects {
ctx.Log.Debug("determining config for project at dir: %q workspace: %q", mp.Dir, mp.Workspace)
mergedCfg := p.GlobalCfg.MergeProjectCfg(ctx.Log, ctx.BaseRepo.ID(), mp, repoCfg)
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, mergedCfg, commentFlags, repoCfg.Automerge, verbose, repoDir))
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, mergedCfg, commentFlags, repoCfg.Automerge, repoCfg.ParallelApply, repoCfg.ParallelPlan, verbose, repoDir))
}
} else {
// If there is no config file, then we'll plan each project that
Expand All @@ -152,7 +156,7 @@ func (p *DefaultProjectCommandBuilder) buildPlanAllCommands(ctx *CommandContext,
for _, mp := range modifiedProjects {
ctx.Log.Debug("determining config for project at dir: %q", mp.Path)
pCfg := p.GlobalCfg.DefaultProjCfg(ctx.Log, ctx.BaseRepo.ID(), mp.Path, DefaultWorkspace)
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, pCfg, commentFlags, DefaultAutomergeEnabled, verbose, repoDir))
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, pCfg, commentFlags, DefaultAutomergeEnabled, DefaultParallelApplyEnabled, DefaultParallelPlanEnabled, verbose, repoDir))
}
}

Expand Down Expand Up @@ -283,10 +287,14 @@ func (p *DefaultProjectCommandBuilder) buildProjectCommandCtx(
}

automerge := DefaultAutomergeEnabled
parallelApply := DefaultParallelApplyEnabled
parallelPlan := DefaultParallelPlanEnabled
if repoCfgPtr != nil {
automerge = repoCfgPtr.Automerge
parallelApply = repoCfgPtr.ParallelApply
parallelPlan = repoCfgPtr.ParallelPlan
}
return p.buildCtx(ctx, cmd, projCfg, commentFlags, automerge, verbose, repoDir), nil
return p.buildCtx(ctx, cmd, projCfg, commentFlags, automerge, parallelApply, parallelPlan, verbose, repoDir), nil
}

// getCfg returns the atlantis.yaml config (if it exists) for this project. If
Expand Down Expand Up @@ -375,6 +383,8 @@ func (p *DefaultProjectCommandBuilder) buildCtx(ctx *CommandContext,
projCfg valid.MergedProjectCfg,
commentArgs []string,
automergeEnabled bool,
parallelApplyEnabled bool,
parallelPlanEnabled bool,
verbose bool,
absRepoDir string) models.ProjectCommandContext {

Expand All @@ -393,25 +403,27 @@ func (p *DefaultProjectCommandBuilder) buildCtx(ctx *CommandContext,
}

return models.ProjectCommandContext{
ApplyCmd: p.CommentBuilder.BuildApplyComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name),
BaseRepo: ctx.BaseRepo,
EscapedCommentArgs: p.escapeArgs(commentArgs),
AutomergeEnabled: automergeEnabled,
AutoplanEnabled: projCfg.AutoplanEnabled,
Steps: steps,
HeadRepo: ctx.HeadRepo,
Log: ctx.Log,
PullMergeable: ctx.PullMergeable,
Pull: ctx.Pull,
ProjectName: projCfg.Name,
ApplyRequirements: projCfg.ApplyRequirements,
RePlanCmd: p.CommentBuilder.BuildPlanComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name, commentArgs),
RepoRelDir: projCfg.RepoRelDir,
RepoConfigVersion: projCfg.RepoCfgVersion,
TerraformVersion: projCfg.TerraformVersion,
User: ctx.User,
Verbose: verbose,
Workspace: projCfg.Workspace,
ApplyCmd: p.CommentBuilder.BuildApplyComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name),
BaseRepo: ctx.BaseRepo,
EscapedCommentArgs: p.escapeArgs(commentArgs),
AutomergeEnabled: automergeEnabled,
ParallelApplyEnabled: parallelApplyEnabled,
ParallelPlanEnabled: parallelPlanEnabled,
AutoplanEnabled: projCfg.AutoplanEnabled,
Steps: steps,
HeadRepo: ctx.HeadRepo,
Log: ctx.Log,
PullMergeable: ctx.PullMergeable,
Pull: ctx.Pull,
ProjectName: projCfg.Name,
ApplyRequirements: projCfg.ApplyRequirements,
RePlanCmd: p.CommentBuilder.BuildPlanComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name, commentArgs),
RepoRelDir: projCfg.RepoRelDir,
RepoConfigVersion: projCfg.RepoCfgVersion,
TerraformVersion: projCfg.TerraformVersion,
User: ctx.User,
Verbose: verbose,
Workspace: projCfg.Workspace,
}
}

Expand Down
11 changes: 9 additions & 2 deletions server/events/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type DefaultClient struct {

// versionsLock is used to ensure versions isn't being concurrently written to.
versionsLock *sync.Mutex

// usePluginCache determines whether or not to set the TF_PLUGIN_CACHE_DIR env var
usePluginCache bool
}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_downloader.go Downloader
Expand Down Expand Up @@ -107,7 +110,8 @@ func NewClient(
defaultVersionStr string,
defaultVersionFlagName string,
tfDownloadURL string,
tfDownloader Downloader) (*DefaultClient, error) {
tfDownloader Downloader,
usePluginCache bool) (*DefaultClient, error) {
var finalDefaultVersion *version.Version
var localVersion *version.Version
versions := make(map[string]string)
Expand Down Expand Up @@ -179,6 +183,7 @@ func NewClient(
downloadBaseURL: tfDownloadURL,
versionsLock: &versionsLock,
versions: versions,
usePluginCache: usePluginCache,
}, nil
}

Expand Down Expand Up @@ -259,11 +264,13 @@ func (c *DefaultClient) prepCmd(log *logging.SimpleLogger, v *version.Version, w
// Will de-emphasize specific commands to run in output.
"TF_IN_AUTOMATION=true",
// Cache plugins so terraform init runs faster.
fmt.Sprintf("TF_PLUGIN_CACHE_DIR=%s", c.terraformPluginCacheDir),
fmt.Sprintf("WORKSPACE=%s", workspace),
fmt.Sprintf("ATLANTIS_TERRAFORM_VERSION=%s", v.String()),
fmt.Sprintf("DIR=%s", path),
}
if c.usePluginCache {
envVars = append(envVars, fmt.Sprintf("TF_PLUGIN_CACHE_DIR=%s", c.terraformPluginCacheDir))
}
// Append current Atlantis process's environment variables, ex.
// AWS_ACCESS_KEY.
envVars = append(envVars, os.Environ()...)
Expand Down
2 changes: 2 additions & 0 deletions server/events/terraform/terraform_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) {
defaultVersion: v,
terraformPluginCacheDir: tmp,
overrideTF: "echo",
usePluginCache: true,
}

args := []string{
Expand Down Expand Up @@ -143,6 +144,7 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) {
defaultVersion: v,
terraformPluginCacheDir: tmp,
overrideTF: "echo",
usePluginCache: true,
}

args := []string{
Expand Down
18 changes: 9 additions & 9 deletions server/events/terraform/terraform_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand Down Expand Up @@ -96,7 +96,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -116,7 +116,7 @@ func TestNewClient_NoTF(t *testing.T) {
// Set PATH to only include our empty directory.
defer tempSetEnv(t, "PATH", tmp)()

_, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
_, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
ErrEquals(t, "terraform not found in $PATH. Set --default-tf-version or download terraform from https://www.terraform.io/downloads.html", err)
}

Expand All @@ -133,7 +133,7 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) {
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -157,7 +157,7 @@ func TestNewClient_DefaultTFFlagInBinDir(t *testing.T) {
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(logging.NewNoopLogger(), tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(logging.NewNoopLogger(), tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -183,7 +183,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) {
err := ioutil.WriteFile(params[0].(string), []byte("#!/bin/sh\necho '\nTerraform v0.11.10\n'"), 0755)
return []pegomock.ReturnValue{err}
})
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, "https://my-mirror.releases.mycompany.com", mockDownloader)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, "https://my-mirror.releases.mycompany.com", mockDownloader, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -207,7 +207,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) {
func TestNewClient_BadVersion(t *testing.T) {
tmp, cleanup := TempDir(t)
defer cleanup()
_, err := terraform.NewClient(nil, tmp, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
_, err := terraform.NewClient(nil, tmp, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
ErrEquals(t, "Malformed version: malformed", err)
}

Expand All @@ -230,7 +230,7 @@ func TestRunCommandWithVersion_DLsTF(t *testing.T) {
return []pegomock.ReturnValue{err}
})

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true)
Ok(t, err)
Equals(t, "0.11.10", c.DefaultVersion().String())

Expand All @@ -249,7 +249,7 @@ func TestEnsureVersion_downloaded(t *testing.T) {

mockDownloader := mocks.NewMockDownloader()

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true)
Ok(t, err)

Equals(t, "0.11.10", c.DefaultVersion().String())
Expand Down
Loading

0 comments on commit 6418b57

Please sign in to comment.