From 4caaa9b34d5052e4f9df36d7c15e1b4d6f30b005 Mon Sep 17 00:00:00 2001 From: Andrei Vydrin Date: Tue, 19 Mar 2024 16:04:52 +0700 Subject: [PATCH] feat: lock before plan --- Dockerfile | 4 +- .../events/events_controller_e2e_test.go | 1 + server/events/command_runner_test.go | 1 + server/events/plan_command_runner.go | 108 ++++++++++++++++-- server/server.go | 1 + 5 files changed, 103 insertions(+), 12 deletions(-) diff --git a/Dockerfile b/Dockerfile index 992e494900..48ef9cf845 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ ARG DEFAULT_CONFTEST_VERSION=0.46.0 # Stage 1: build artifact and download deps -FROM golang:1.22.0-alpine AS builder +FROM golang:1.22.1-alpine AS builder ARG ATLANTIS_VERSION=dev ENV ATLANTIS_VERSION=${ATLANTIS_VERSION} @@ -159,7 +159,7 @@ COPY docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh # Install packages needed to run Atlantis. # We place this last as it will bust less docker layer caches when packages update RUN apk add --no-cache \ - ca-certificates~=20230506 \ + ca-certificates \ curl~=8 \ git~=2 \ unzip~=6 \ diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index aaf7f8d8ab..f1a8c233de 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -1517,6 +1517,7 @@ func setupE2E(t *testing.T, repoDir string, opt setupOption) (events_controllers lockingClient, discardApprovalOnPlan, e2ePullReqStatusFetcher, + projectLocker, ) applyCommandRunner := events.NewApplyCommandRunner( diff --git a/server/events/command_runner_test.go b/server/events/command_runner_test.go index d9c8451570..5ab8c4c8a4 100644 --- a/server/events/command_runner_test.go +++ b/server/events/command_runner_test.go @@ -163,6 +163,7 @@ func setup(t *testing.T, options ...func(testConfig *TestConfig)) *vcsmocks.Mock lockingLocker, testConfig.discardApprovalOnPlan, pullReqStatusFetcher, + nil, ) applyCommandRunner = events.NewApplyCommandRunner( diff --git a/server/events/plan_command_runner.go b/server/events/plan_command_runner.go index 044a594233..e6cf93f8ff 100644 --- a/server/events/plan_command_runner.go +++ b/server/events/plan_command_runner.go @@ -1,10 +1,12 @@ package events import ( + "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/core/locking" "github.com/runatlantis/atlantis/server/events/command" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/vcs" + "sync" ) func NewPlanCommandRunner( @@ -26,6 +28,7 @@ func NewPlanCommandRunner( lockingLocker locking.Locker, discardApprovalOnPlan bool, pullReqStatusFetcher vcs.PullReqStatusFetcher, + projectLocker ProjectLocker, ) *PlanCommandRunner { return &PlanCommandRunner{ silenceVCSStatusNoPlans: silenceVCSStatusNoPlans, @@ -46,6 +49,7 @@ func NewPlanCommandRunner( lockingLocker: lockingLocker, DiscardApprovalOnPlan: discardApprovalOnPlan, pullReqStatusFetcher: pullReqStatusFetcher, + projectLocker: projectLocker, } } @@ -72,6 +76,8 @@ type PlanCommandRunner struct { parallelPoolSize int pullStatusFetcher PullStatusFetcher lockingLocker locking.Locker + projectLocker ProjectLocker + mtx sync.Mutex // DiscardApprovalOnPlan controls if all already existing approvals should be removed/dismissed before executing // a plan. DiscardApprovalOnPlan bool @@ -126,13 +132,54 @@ func (p *PlanCommandRunner) runAutoplan(ctx *command.Context) { ctx.Log.Err("deleting locks: %s", err) } - // Only run commands in parallel if enabled + var projectResults []command.ProjectResult + if p.projectLocker != nil { + p.mtx.Lock() + for _, pctx := range projectCmds { + lockResult := command.ProjectResult{ + Command: command.Plan, + PlanSuccess: nil, + Error: nil, + Failure: "", + RepoRelDir: pctx.RepoRelDir, + Workspace: pctx.Workspace, + ProjectName: pctx.ProjectName, + } + + // Lock the project + lockResponse, err := p.projectLocker.TryLock(pctx.Log, pctx.Pull, pctx.User, pctx.Workspace, models.NewProject(pctx.Pull.BaseRepo.FullName, pctx.RepoRelDir, pctx.ProjectName), pctx.RepoLocking) + if err != nil { + pctx.Log.Err("locking project: %s", err) + lockResult.Error = errors.Wrap(err, "acquiring lock") + } else { + lockResult.Failure = lockResponse.LockFailureReason + } + if lockResult.Error != nil || lockResult.Failure != "" { + projectResults = append(projectResults, lockResult) + } + } + p.mtx.Unlock() + } + var result command.Result - if p.isParallelEnabled(projectCmds) { - ctx.Log.Info("Running plans in parallel") - result = runProjectCmdsParallelGroups(ctx, projectCmds, p.prjCmdRunner.Plan, p.parallelPoolSize) + + if len(projectResults) > 0 { + result = command.Result{ + ProjectResults: projectResults, + } + + _, err = p.lockingLocker.UnlockByPull(baseRepo.FullName, pull.Num) + if err != nil { + ctx.Log.Err("deleting locks: %s", err) + } } else { - result = runProjectCmds(projectCmds, p.prjCmdRunner.Plan) + // Only run commands in parallel if enabled + if p.isParallelEnabled(projectCmds) { + ctx.Log.Info("Running plans in parallel") + result = runProjectCmdsParallelGroups(ctx, projectCmds, p.prjCmdRunner.Plan, p.parallelPoolSize) + } else { + result = runProjectCmds(projectCmds, p.prjCmdRunner.Plan) + } } if p.autoMerger.automergeEnabled(projectCmds) && result.HasErrors() { @@ -253,13 +300,54 @@ func (p *PlanCommandRunner) run(ctx *command.Context, cmd *CommentCommand) { } } - // Only run commands in parallel if enabled + var projectResults []command.ProjectResult + if p.projectLocker != nil { + p.mtx.Lock() + for _, pctx := range projectCmds { + lockResult := command.ProjectResult{ + Command: command.Plan, + PlanSuccess: nil, + Error: nil, + Failure: "", + RepoRelDir: pctx.RepoRelDir, + Workspace: pctx.Workspace, + ProjectName: pctx.ProjectName, + } + + // Lock the project + lockResponse, err := p.projectLocker.TryLock(pctx.Log, pctx.Pull, pctx.User, pctx.Workspace, models.NewProject(pctx.Pull.BaseRepo.FullName, pctx.RepoRelDir, pctx.ProjectName), pctx.RepoLocking) + if err != nil { + pctx.Log.Err("locking project: %s", err) + lockResult.Error = errors.Wrap(err, "acquiring lock") + } else { + lockResult.Failure = lockResponse.LockFailureReason + } + if lockResult.Error != nil || lockResult.Failure != "" { + projectResults = append(projectResults, lockResult) + } + } + p.mtx.Unlock() + } + var result command.Result - if p.isParallelEnabled(projectCmds) { - ctx.Log.Info("Running plans in parallel") - result = runProjectCmdsParallelGroups(ctx, projectCmds, p.prjCmdRunner.Plan, p.parallelPoolSize) + + if len(projectResults) > 0 { + result = command.Result{ + ProjectResults: projectResults, + } + + _, err = p.lockingLocker.UnlockByPull(baseRepo.FullName, pull.Num) + if err != nil { + ctx.Log.Err("deleting locks: %s", err) + } } else { - result = runProjectCmds(projectCmds, p.prjCmdRunner.Plan) + // Only run commands in parallel if enabled + if p.isParallelEnabled(projectCmds) { + ctx.Log.Info("Running plans in parallel") + result = runProjectCmdsParallelGroups(ctx, projectCmds, p.prjCmdRunner.Plan, p.parallelPoolSize) + } else { + result = runProjectCmds(projectCmds, p.prjCmdRunner.Plan) + } } if p.autoMerger.automergeEnabled(projectCmds) && result.HasErrors() { diff --git a/server/server.go b/server/server.go index eeab9d732e..cf1b5f56b6 100644 --- a/server/server.go +++ b/server/server.go @@ -715,6 +715,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { lockingClient, userConfig.DiscardApprovalOnPlanFlag, pullReqStatusFetcher, + projectLocker, ) applyCommandRunner := events.NewApplyCommandRunner(