Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parallel plans #926

Merged
merged 11 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/pelletier/go-toml v1.0.0 // indirect
github.com/petergtz/pegomock v2.7.0+incompatible
github.com/pkg/errors v0.8.0
github.com/remeh/sizedwaitgroup v1.0.0
github.com/shurcooL/githubv4 v0.0.0-20191127044304-8f68eb5628d0
github.com/shurcooL/graphql v0.0.0-20181231061246-d48a9a75455f // indirect
github.com/sirupsen/logrus v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/githubv4 v0.0.0-20191127044304-8f68eb5628d0 h1:T9uus1QvcPgeLShS30YOnnzk3r9Vvygp45muhlrufgY=
Expand Down
70 changes: 68 additions & 2 deletions server/events/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package events

import (
"fmt"
"sync"

"github.com/google/go-github/v28/github"
"github.com/mcdafydd/go-azuredevops/azuredevops"
"github.com/pkg/errors"
"github.com/remeh/sizedwaitgroup"
"github.com/runatlantis/atlantis/server/events/db"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/vcs"
Expand Down Expand Up @@ -143,7 +145,15 @@ func (c *DefaultCommandRunner) RunAutoplanCommand(baseRepo models.Repo, headRepo
ctx.Log.Warn("unable to update commit status: %s", err)
}

result := c.runProjectCmds(projectCmds, models.PlanCommand)
// Only run commands in parallel if enabled
var result CommandResult
if c.parallelPlanEnabled(ctx, projectCmds) {
ctx.Log.Info("Running plans in parallel")
result = c.runProjectCmdsParallel(projectCmds, models.PlanCommand)
} else {
result = c.runProjectCmds(projectCmds, models.PlanCommand)
}

if c.automergeEnabled(ctx, projectCmds) && result.HasErrors() {
ctx.Log.Info("deleting plans because there were errors and automerge requires all plans succeed")
c.deletePlans(ctx)
Expand Down Expand Up @@ -254,7 +264,18 @@ func (c *DefaultCommandRunner) RunCommentCommand(baseRepo models.Repo, maybeHead
return
}

result := c.runProjectCmds(projectCmds, cmd.Name)
// Only run commands in parallel if enabled
var result CommandResult
if cmd.Name == models.ApplyCommand && c.parallelApplyEnabled(ctx, projectCmds) {
ctx.Log.Info("Running applies in parallel")
result = c.runProjectCmdsParallel(projectCmds, cmd.Name)
} else if cmd.Name == models.PlanCommand && c.parallelPlanEnabled(ctx, projectCmds) {
ctx.Log.Info("Running plans in parallel")
result = c.runProjectCmdsParallel(projectCmds, cmd.Name)
} else {
result = c.runProjectCmds(projectCmds, cmd.Name)
}

if cmd.Name == models.PlanCommand && c.automergeEnabled(ctx, projectCmds) && result.HasErrors() {
ctx.Log.Info("deleting plans because there were errors and automerge requires all plans succeed")
c.deletePlans(ctx)
Expand Down Expand Up @@ -339,6 +360,41 @@ func (c *DefaultCommandRunner) automerge(ctx *CommandContext, pullStatus models.
}
}

func (c *DefaultCommandRunner) runProjectCmdsParallel(cmds []models.ProjectCommandContext, cmdName models.CommandName) CommandResult {
var results []models.ProjectResult
mux := &sync.Mutex{}

wg := sizedwaitgroup.New(15)
for _, pCmd := range cmds {
pCmd := pCmd
var execute func()
wg.Add()

switch cmdName {
case models.PlanCommand:
execute = func() {
defer wg.Done()
res := c.ProjectCommandRunner.Plan(pCmd)
mux.Lock()
results = append(results, res)
mux.Unlock()
}
case models.ApplyCommand:
Fauzyy marked this conversation as resolved.
Show resolved Hide resolved
execute = func() {
defer wg.Done()
res := c.ProjectCommandRunner.Apply(pCmd)
mux.Lock()
results = append(results, res)
mux.Unlock()
}
}
go execute()
}

wg.Wait()
return CommandResult{ProjectResults: results}
}

func (c *DefaultCommandRunner) runProjectCmds(cmds []models.ProjectCommandContext, cmdName models.CommandName) CommandResult {
var results []models.ProjectResult
for _, pCmd := range cmds {
Expand Down Expand Up @@ -496,6 +552,16 @@ func (c *DefaultCommandRunner) automergeEnabled(ctx *CommandContext, projectCmds
(len(projectCmds) > 0 && projectCmds[0].AutomergeEnabled)
}

// parallelApplyEnabled returns true if parallel apply is enabled in this context.
func (c *DefaultCommandRunner) parallelApplyEnabled(ctx *CommandContext, projectCmds []models.ProjectCommandContext) bool {
return len(projectCmds) > 0 && projectCmds[0].ParallelApplyEnabled
}

// parallelPlanEnabled returns true if parallel plan is enabled in this context.
func (c *DefaultCommandRunner) parallelPlanEnabled(ctx *CommandContext, projectCmds []models.ProjectCommandContext) bool {
return len(projectCmds) > 0 && projectCmds[0].ParallelPlanEnabled
}

// automergeComment is the comment that gets posted when Atlantis automatically
// merges the PR.
var automergeComment = `Automatically merging because all plans have been successfully applied.`
Expand Down
6 changes: 5 additions & 1 deletion server/events/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,13 @@ type ProjectCommandContext struct {
// ApplyRequirements is the list of requirements that must be satisfied
// before we will run the apply stage.
ApplyRequirements []string
// AutoplanEnabled is true if automerge is enabled for the repo that this
// AutomergeEnabled is true if automerge is enabled for the repo that this
// project is in.
AutomergeEnabled bool
// ParallelApplyEnabled is true if parallel apply is enabled for this project.
ParallelApplyEnabled bool
// ParallelPlanEnabled is true if parallel plan is enabled for this project.
ParallelPlanEnabled bool
// AutoplanEnabled is true if autoplanning is enabled for this project.
AutoplanEnabled bool
// BaseRepo is the repository that the pull request will be merged into.
Expand Down
56 changes: 34 additions & 22 deletions server/events/project_command_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
DefaultWorkspace = "default"
// DefaultAutomergeEnabled is the default for the automerge setting.
DefaultAutomergeEnabled = false
// DefaultParallelApplyEnabled is the default for the parallel apply setting.
DefaultParallelApplyEnabled = false
// DefaultParallelPlanEnabled is the default for the parallel plan setting.
DefaultParallelPlanEnabled = false
)

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_command_builder.go ProjectCommandBuilder
Expand Down Expand Up @@ -141,7 +145,7 @@ func (p *DefaultProjectCommandBuilder) buildPlanAllCommands(ctx *CommandContext,
for _, mp := range matchingProjects {
ctx.Log.Debug("determining config for project at dir: %q workspace: %q", mp.Dir, mp.Workspace)
mergedCfg := p.GlobalCfg.MergeProjectCfg(ctx.Log, ctx.BaseRepo.ID(), mp, repoCfg)
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, mergedCfg, commentFlags, repoCfg.Automerge, verbose, repoDir))
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, mergedCfg, commentFlags, repoCfg.Automerge, repoCfg.ParallelApply, repoCfg.ParallelPlan, verbose, repoDir))
}
} else {
// If there is no config file, then we'll plan each project that
Expand All @@ -152,7 +156,7 @@ func (p *DefaultProjectCommandBuilder) buildPlanAllCommands(ctx *CommandContext,
for _, mp := range modifiedProjects {
ctx.Log.Debug("determining config for project at dir: %q", mp.Path)
pCfg := p.GlobalCfg.DefaultProjCfg(ctx.Log, ctx.BaseRepo.ID(), mp.Path, DefaultWorkspace)
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, pCfg, commentFlags, DefaultAutomergeEnabled, verbose, repoDir))
projCtxs = append(projCtxs, p.buildCtx(ctx, models.PlanCommand, pCfg, commentFlags, DefaultAutomergeEnabled, DefaultParallelApplyEnabled, DefaultParallelPlanEnabled, verbose, repoDir))
}
}

Expand Down Expand Up @@ -283,10 +287,14 @@ func (p *DefaultProjectCommandBuilder) buildProjectCommandCtx(
}

automerge := DefaultAutomergeEnabled
parallelApply := DefaultParallelApplyEnabled
parallelPlan := DefaultParallelPlanEnabled
if repoCfgPtr != nil {
automerge = repoCfgPtr.Automerge
parallelApply = repoCfgPtr.ParallelApply
parallelPlan = repoCfgPtr.ParallelPlan
}
return p.buildCtx(ctx, cmd, projCfg, commentFlags, automerge, verbose, repoDir), nil
return p.buildCtx(ctx, cmd, projCfg, commentFlags, automerge, parallelApply, parallelPlan, verbose, repoDir), nil
}

// getCfg returns the atlantis.yaml config (if it exists) for this project. If
Expand Down Expand Up @@ -375,6 +383,8 @@ func (p *DefaultProjectCommandBuilder) buildCtx(ctx *CommandContext,
projCfg valid.MergedProjectCfg,
commentArgs []string,
automergeEnabled bool,
parallelApplyEnabled bool,
parallelPlanEnabled bool,
verbose bool,
absRepoDir string) models.ProjectCommandContext {

Expand All @@ -393,25 +403,27 @@ func (p *DefaultProjectCommandBuilder) buildCtx(ctx *CommandContext,
}

return models.ProjectCommandContext{
ApplyCmd: p.CommentBuilder.BuildApplyComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name),
BaseRepo: ctx.BaseRepo,
EscapedCommentArgs: p.escapeArgs(commentArgs),
AutomergeEnabled: automergeEnabled,
AutoplanEnabled: projCfg.AutoplanEnabled,
Steps: steps,
HeadRepo: ctx.HeadRepo,
Log: ctx.Log,
PullMergeable: ctx.PullMergeable,
Pull: ctx.Pull,
ProjectName: projCfg.Name,
ApplyRequirements: projCfg.ApplyRequirements,
RePlanCmd: p.CommentBuilder.BuildPlanComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name, commentArgs),
RepoRelDir: projCfg.RepoRelDir,
RepoConfigVersion: projCfg.RepoCfgVersion,
TerraformVersion: projCfg.TerraformVersion,
User: ctx.User,
Verbose: verbose,
Workspace: projCfg.Workspace,
ApplyCmd: p.CommentBuilder.BuildApplyComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name),
BaseRepo: ctx.BaseRepo,
EscapedCommentArgs: p.escapeArgs(commentArgs),
AutomergeEnabled: automergeEnabled,
ParallelApplyEnabled: parallelApplyEnabled,
ParallelPlanEnabled: parallelPlanEnabled,
AutoplanEnabled: projCfg.AutoplanEnabled,
Steps: steps,
HeadRepo: ctx.HeadRepo,
Log: ctx.Log,
PullMergeable: ctx.PullMergeable,
Pull: ctx.Pull,
ProjectName: projCfg.Name,
ApplyRequirements: projCfg.ApplyRequirements,
RePlanCmd: p.CommentBuilder.BuildPlanComment(projCfg.RepoRelDir, projCfg.Workspace, projCfg.Name, commentArgs),
RepoRelDir: projCfg.RepoRelDir,
RepoConfigVersion: projCfg.RepoCfgVersion,
TerraformVersion: projCfg.TerraformVersion,
User: ctx.User,
Verbose: verbose,
Workspace: projCfg.Workspace,
}
}

Expand Down
11 changes: 9 additions & 2 deletions server/events/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type DefaultClient struct {

// versionsLock is used to ensure versions isn't being concurrently written to.
versionsLock *sync.Mutex

// usePluginCache determines whether or not to set the TF_PLUGIN_CACHE_DIR env var
usePluginCache bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to expose this via a config file flag? Curious as to your use case for why you want to be able to turn this off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could definitely expose this as a config file flag, this was just to fix the e2e test suite for parallel plans. They would transiently fail with the plugin cache enabled, we never saw this error in production however. The error mentions that the null_resource provider is missing when a plan is run, I wonder if the nature of these tests exacerbates the problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh okay

}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_downloader.go Downloader
Expand Down Expand Up @@ -107,7 +110,8 @@ func NewClient(
defaultVersionStr string,
defaultVersionFlagName string,
tfDownloadURL string,
tfDownloader Downloader) (*DefaultClient, error) {
tfDownloader Downloader,
usePluginCache bool) (*DefaultClient, error) {
var finalDefaultVersion *version.Version
var localVersion *version.Version
versions := make(map[string]string)
Expand Down Expand Up @@ -179,6 +183,7 @@ func NewClient(
downloadBaseURL: tfDownloadURL,
versionsLock: &versionsLock,
versions: versions,
usePluginCache: usePluginCache,
}, nil
}

Expand Down Expand Up @@ -259,11 +264,13 @@ func (c *DefaultClient) prepCmd(log *logging.SimpleLogger, v *version.Version, w
// Will de-emphasize specific commands to run in output.
"TF_IN_AUTOMATION=true",
// Cache plugins so terraform init runs faster.
fmt.Sprintf("TF_PLUGIN_CACHE_DIR=%s", c.terraformPluginCacheDir),
fmt.Sprintf("WORKSPACE=%s", workspace),
fmt.Sprintf("ATLANTIS_TERRAFORM_VERSION=%s", v.String()),
fmt.Sprintf("DIR=%s", path),
}
if c.usePluginCache {
envVars = append(envVars, fmt.Sprintf("TF_PLUGIN_CACHE_DIR=%s", c.terraformPluginCacheDir))
}
// Append current Atlantis process's environment variables, ex.
// AWS_ACCESS_KEY.
envVars = append(envVars, os.Environ()...)
Expand Down
2 changes: 2 additions & 0 deletions server/events/terraform/terraform_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) {
defaultVersion: v,
terraformPluginCacheDir: tmp,
overrideTF: "echo",
usePluginCache: true,
}

args := []string{
Expand Down Expand Up @@ -143,6 +144,7 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) {
defaultVersion: v,
terraformPluginCacheDir: tmp,
overrideTF: "echo",
usePluginCache: true,
}

args := []string{
Expand Down
18 changes: 9 additions & 9 deletions server/events/terraform/terraform_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand Down Expand Up @@ -96,7 +96,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -116,7 +116,7 @@ func TestNewClient_NoTF(t *testing.T) {
// Set PATH to only include our empty directory.
defer tempSetEnv(t, "PATH", tmp)()

_, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
_, err := terraform.NewClient(nil, tmp, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
ErrEquals(t, "terraform not found in $PATH. Set --default-tf-version or download terraform from https://www.terraform.io/downloads.html", err)
}

Expand All @@ -133,7 +133,7 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) {
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -157,7 +157,7 @@ func TestNewClient_DefaultTFFlagInBinDir(t *testing.T) {
Ok(t, err)
defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))()

c, err := terraform.NewClient(logging.NewNoopLogger(), tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
c, err := terraform.NewClient(logging.NewNoopLogger(), tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -183,7 +183,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) {
err := ioutil.WriteFile(params[0].(string), []byte("#!/bin/sh\necho '\nTerraform v0.11.10\n'"), 0755)
return []pegomock.ReturnValue{err}
})
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, "https://my-mirror.releases.mycompany.com", mockDownloader)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, "https://my-mirror.releases.mycompany.com", mockDownloader, true)
Ok(t, err)

Ok(t, err)
Expand All @@ -207,7 +207,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) {
func TestNewClient_BadVersion(t *testing.T) {
tmp, cleanup := TempDir(t)
defer cleanup()
_, err := terraform.NewClient(nil, tmp, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil)
_, err := terraform.NewClient(nil, tmp, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true)
ErrEquals(t, "Malformed version: malformed", err)
}

Expand All @@ -230,7 +230,7 @@ func TestRunCommandWithVersion_DLsTF(t *testing.T) {
return []pegomock.ReturnValue{err}
})

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true)
Ok(t, err)
Equals(t, "0.11.10", c.DefaultVersion().String())

Expand All @@ -249,7 +249,7 @@ func TestEnsureVersion_downloaded(t *testing.T) {

mockDownloader := mocks.NewMockDownloader()

c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader)
c, err := terraform.NewClient(nil, tmp, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true)
Ok(t, err)

Equals(t, "0.11.10", c.DefaultVersion().String())
Expand Down
Loading