Skip to content

Commit

Permalink
fix: allow projects in separate directories to run in parallel (#2131)
Browse files Browse the repository at this point in the history
* fix: add path to DefaultWorkingDirLocker.TryLock()

In the release notes for v0.13.0: https://github.com/runatlantis/atlantis/blob/master/CHANGELOG.md#features-4

> Running in parallel is only supported if you're using workspaces to separate your projects. Projects in separate directories can not be run in parallel currently.

This commit adds `path` as an argument to `DefaultWorkingDirLocker.TryLock()` and includes `path` in the `workspaceKey` used to check if a project is locked. This should allow projects in separate directories to run in parallel.

To my knowledge, there is no functional reason that projects in separate directories cannot run in parallel.

All calls to `DefaultWorkingDirLocker.TryLock()` have been updated. A new unit test `TestTryLockWithDifferentPaths` has been added to test the behavior of locking two separate directories with the same workspace name.

* Add documntation for parallel_plan and parallel_apply options

Co-authored-by: Kevin Snyder <kevinsnyder@ip-192-168-4-61.ec2.internal>
Co-authored-by: Kevin Snyder <kevinsnyder@ip-10-60-10-94.ec2.internal>
  • Loading branch information
3 people authored Mar 21, 2022
1 parent 90e92e3 commit 5288389
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 66 deletions.
18 changes: 18 additions & 0 deletions runatlantis.io/docs/repo-level-atlantis-yaml.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ need to be defined.
version: 3
automerge: true
delete_source_branch_on_merge: true
parallel_plan: true
parallel_apply: true
projects:
- name: my-project-name
dir: .
Expand Down Expand Up @@ -88,6 +90,22 @@ projects:
This will stop Atlantis automatically running plan when `project1/` is updated
in a pull request.

### Run plans and applies in parallel

```yaml
version: 3
parallel_plan: true
parallel_apply: true
```

This will run plans and applies for all of your projects in parallel.

Enabling these options can significantly reduce the duration of plans and applies, especially for repositories with many projects.

Use the `--parallel-pool-size` to configure the max number of plans and applies that can run in parallel. The default is 15.

Parallel plans and applies work across both multiple directories and multiple workspaces.

### Configuring Planning

Given the directory structure:
Expand Down
2 changes: 1 addition & 1 deletion server/controllers/locks_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (l *LocksController) DeleteLock(w http.ResponseWriter, r *http.Request) {
// installations of Atlantis will have locks in their DB that do not have
// this field on PullRequest. We skip commenting in this case.
if lock.Pull.BaseRepo != (models.Repo{}) {
unlock, err := l.WorkingDirLocker.TryLock(lock.Pull.BaseRepo.FullName, lock.Pull.Num, lock.Workspace)
unlock, err := l.WorkingDirLocker.TryLock(lock.Pull.BaseRepo.FullName, lock.Pull.Num, lock.Workspace, lock.Project.Path)
if err != nil {
l.Logger.Err("unable to obtain working dir lock when trying to delete old plans: %s", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/events/delete_lock_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (l *DefaultDeleteLockCommand) deleteWorkingDir(lock models.ProjectLock) {
l.Logger.Debug("Not deleting the working dir.")
return
}
unlock, err := l.WorkingDirLocker.TryLock(lock.Pull.BaseRepo.FullName, lock.Pull.Num, lock.Workspace)
unlock, err := l.WorkingDirLocker.TryLock(lock.Pull.BaseRepo.FullName, lock.Pull.Num, lock.Workspace, lock.Project.Path)
if err != nil {
l.Logger.Err("unable to obtain working dir lock when trying to delete old plans: %s", err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions server/events/mocks/mock_working_dir_locker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/events/post_workflow_hooks_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (w *DefaultPostWorkflowHooksCommandRunner) RunPostHooks(

log.Debug("post-hooks configured, running...")

unlockFn, err := w.WorkingDirLocker.TryLock(baseRepo.FullName, pull.Num, DefaultWorkspace)
unlockFn, err := w.WorkingDirLocker.TryLock(baseRepo.FullName, pull.Num, DefaultWorkspace, DefaultRepoRelDir)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions server/events/post_workflow_hooks_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestRunPostHooks_Clone(t *testing.T) {

postWh.GlobalCfg = globalCfg

When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(unlockFn, nil)
When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(unlockFn, nil)
When(postWhWorkingDir.Clone(log, fixtures.GithubRepo, newPull, events.DefaultWorkspace)).ThenReturn(repoDir, false, nil)
When(whPostWorkflowHookRunner.Run(pCtx, testHook.RunCommand, repoDir)).ThenReturn(result, nil)

Expand Down Expand Up @@ -145,7 +145,7 @@ func TestRunPostHooks_Clone(t *testing.T) {

postWh.GlobalCfg = globalCfg

When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(func() {}, errors.New("some error"))
When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(func() {}, errors.New("some error"))

err := postWh.RunPostHooks(ctx)

Expand Down Expand Up @@ -175,7 +175,7 @@ func TestRunPostHooks_Clone(t *testing.T) {

postWh.GlobalCfg = globalCfg

When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(unlockFn, nil)
When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(unlockFn, nil)
When(postWhWorkingDir.Clone(log, fixtures.GithubRepo, newPull, events.DefaultWorkspace)).ThenReturn(repoDir, false, errors.New("some error"))

err := postWh.RunPostHooks(ctx)
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestRunPostHooks_Clone(t *testing.T) {

postWh.GlobalCfg = globalCfg

When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(unlockFn, nil)
When(postWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(unlockFn, nil)
When(postWhWorkingDir.Clone(log, fixtures.GithubRepo, newPull, events.DefaultWorkspace)).ThenReturn(repoDir, false, nil)
When(whPostWorkflowHookRunner.Run(pCtx, testHook.RunCommand, repoDir)).ThenReturn(result, errors.New("some error"))

Expand Down
2 changes: 1 addition & 1 deletion server/events/pre_workflow_hooks_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (w *DefaultPreWorkflowHooksCommandRunner) RunPreHooks(

log.Debug("pre-hooks configured, running...")

unlockFn, err := w.WorkingDirLocker.TryLock(baseRepo.FullName, pull.Num, DefaultWorkspace)
unlockFn, err := w.WorkingDirLocker.TryLock(baseRepo.FullName, pull.Num, DefaultWorkspace, DefaultRepoRelDir)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions server/events/pre_workflow_hooks_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestRunPreHooks_Clone(t *testing.T) {

preWh.GlobalCfg = globalCfg

When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(unlockFn, nil)
When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(unlockFn, nil)
When(preWhWorkingDir.Clone(log, fixtures.GithubRepo, newPull, events.DefaultWorkspace)).ThenReturn(repoDir, false, nil)
When(whPreWorkflowHookRunner.Run(pCtx, testHook.RunCommand, repoDir)).ThenReturn(result, nil)

Expand Down Expand Up @@ -148,7 +148,7 @@ func TestRunPreHooks_Clone(t *testing.T) {

preWh.GlobalCfg = globalCfg

When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(func() {}, errors.New("some error"))
When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(func() {}, errors.New("some error"))

err := preWh.RunPreHooks(ctx)

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestRunPreHooks_Clone(t *testing.T) {

preWh.GlobalCfg = globalCfg

When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(unlockFn, nil)
When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(unlockFn, nil)
When(preWhWorkingDir.Clone(log, fixtures.GithubRepo, newPull, events.DefaultWorkspace)).ThenReturn(repoDir, false, errors.New("some error"))

err := preWh.RunPreHooks(ctx)
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestRunPreHooks_Clone(t *testing.T) {

preWh.GlobalCfg = globalCfg

When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace)).ThenReturn(unlockFn, nil)
When(preWhWorkingDirLocker.TryLock(fixtures.GithubRepo.FullName, newPull.Num, events.DefaultWorkspace, events.DefaultRepoRelDir)).ThenReturn(unlockFn, nil)
When(preWhWorkingDir.Clone(log, fixtures.GithubRepo, newPull, events.DefaultWorkspace)).ThenReturn(repoDir, false, nil)
When(whPreWorkflowHookRunner.Run(pCtx, testHook.RunCommand, repoDir)).ThenReturn(result, errors.New("some error"))

Expand Down
8 changes: 4 additions & 4 deletions server/events/project_command_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (p *DefaultProjectCommandBuilder) buildPlanAllCommands(ctx *command.Context
// Need to lock the workspace we're about to clone to.
workspace := DefaultWorkspace

unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace, DefaultRepoRelDir)
if err != nil {
ctx.Log.Warn("workspace was locked")
return nil, err
Expand Down Expand Up @@ -342,7 +342,7 @@ func (p *DefaultProjectCommandBuilder) buildProjectPlanCommand(ctx *command.Cont

var pcc []command.ProjectContext
ctx.Log.Debug("building plan command")
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace, DefaultRepoRelDir)
if err != nil {
return pcc, err
}
Expand Down Expand Up @@ -478,7 +478,7 @@ func (p *DefaultProjectCommandBuilder) buildProjectApplyCommand(ctx *command.Con
}

var projCtx []command.ProjectContext
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace, DefaultRepoRelDir)
if err != nil {
return projCtx, err
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func (p *DefaultProjectCommandBuilder) buildProjectVersionCommand(ctx *command.C
}

var projCtx []command.ProjectContext
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, workspace, DefaultRepoRelDir)
if err != nil {
return projCtx, err
}
Expand Down
8 changes: 4 additions & 4 deletions server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (p *DefaultProjectCommandRunner) doPolicyCheck(ctx command.ProjectContext)
// Acquire internal lock for the directory we're going to operate in.
// We should refactor this to keep the lock for the duration of plan and policy check since as of now
// there is a small gap where we don't have the lock and if we can't get this here, we should just unlock the PR.
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace, ctx.RepoRelDir)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func (p *DefaultProjectCommandRunner) doPlan(ctx command.ProjectContext) (*model
ctx.Log.Debug("acquired lock for project")

// Acquire internal lock for the directory we're going to operate in.
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace, ctx.RepoRelDir)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func (p *DefaultProjectCommandRunner) doApply(ctx command.ProjectContext) (apply
}

// Acquire internal lock for the directory we're going to operate in.
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace, ctx.RepoRelDir)
if err != nil {
return "", "", err
}
Expand Down Expand Up @@ -451,7 +451,7 @@ func (p *DefaultProjectCommandRunner) doVersion(ctx command.ProjectContext) (ver
}

// Acquire internal lock for the directory we're going to operate in.
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace)
unlockFn, err := p.WorkingDirLocker.TryLock(ctx.Pull.BaseRepo.FullName, ctx.Pull.Num, ctx.Workspace, ctx.RepoRelDir)
if err != nil {
return "", "", err
}
Expand Down
22 changes: 11 additions & 11 deletions server/events/working_dir_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
// on disk and we haven't written Atlantis (yet) to handle concurrent execution
// within this workspace.
type WorkingDirLocker interface {
// TryLock tries to acquire a lock for this repo, workspace and pull.
// TryLock tries to acquire a lock for this repo, pull, workspace, and path.
// It returns a function that should be used to unlock the workspace and
// an error if the workspace is already locked. The error is expected to
// be printed to the pull request.
TryLock(repoFullName string, pullNum int, workspace string) (func(), error)
TryLock(repoFullName string, pullNum int, workspace string, path string) (func(), error)
// TryLockPull tries to acquire a lock for all the workspaces in this repo
// and pull.
// It returns a function that should be used to unlock the workspace and
Expand Down Expand Up @@ -74,31 +74,31 @@ func (d *DefaultWorkingDirLocker) TryLockPull(repoFullName string, pullNum int)
}, nil
}

func (d *DefaultWorkingDirLocker) TryLock(repoFullName string, pullNum int, workspace string) (func(), error) {
func (d *DefaultWorkingDirLocker) TryLock(repoFullName string, pullNum int, workspace string, path string) (func(), error) {
d.mutex.Lock()
defer d.mutex.Unlock()

pullKey := d.pullKey(repoFullName, pullNum)
workspaceKey := d.workspaceKey(repoFullName, pullNum, workspace)
workspaceKey := d.workspaceKey(repoFullName, pullNum, workspace, path)
for _, l := range d.locks {
if l == pullKey || l == workspaceKey {
return func() {}, fmt.Errorf("The %s workspace is currently locked by another"+
return func() {}, fmt.Errorf("The %s workspace at path %s is currently locked by another"+
" command that is running for this pull request.\n"+
"Wait until the previous command is complete and try again.", workspace)
"Wait until the previous command is complete and try again.", workspace, path)
}
}
d.locks = append(d.locks, workspaceKey)
return func() {
d.unlock(repoFullName, pullNum, workspace)
d.unlock(repoFullName, pullNum, workspace, path)
}, nil
}

// Unlock unlocks the workspace for this pull.
func (d *DefaultWorkingDirLocker) unlock(repoFullName string, pullNum int, workspace string) {
func (d *DefaultWorkingDirLocker) unlock(repoFullName string, pullNum int, workspace string, path string) {
d.mutex.Lock()
defer d.mutex.Unlock()

workspaceKey := d.workspaceKey(repoFullName, pullNum, workspace)
workspaceKey := d.workspaceKey(repoFullName, pullNum, workspace, path)
d.removeLock(workspaceKey)
}

Expand All @@ -121,8 +121,8 @@ func (d *DefaultWorkingDirLocker) removeLock(key string) {
d.locks = newLocks
}

func (d *DefaultWorkingDirLocker) workspaceKey(repo string, pull int, workspace string) string {
return fmt.Sprintf("%s/%s", d.pullKey(repo, pull), workspace)
func (d *DefaultWorkingDirLocker) workspaceKey(repo string, pull int, workspace string, path string) string {
return fmt.Sprintf("%s/%s/%s", d.pullKey(repo, pull), workspace, path)
}

func (d *DefaultWorkingDirLocker) pullKey(repo string, pull int) string {
Expand Down
Loading

0 comments on commit 5288389

Please sign in to comment.