From a44c9661ab3f00148ba25f403b9ebf087cf86f2e Mon Sep 17 00:00:00 2001 From: Isata Sankoh <85632667+isatasan@users.noreply.github.com> Date: Tue, 17 Aug 2021 13:49:46 -0400 Subject: [PATCH] Channel Refactoring (#91) * Began channel refactoring * Fixing refactoring logic * Replacing terraformOutputChan * Making suggested changes * Changing attribute declarations * Fixed channel logic * Formatting * Changing tests to utilize mocks * Adding go files * Adding suggested changes * Suggested changes before merging * Reformatting * Deleting unnecesary lines * Buffer cleanup logic * Buffer cleanup & Testing * Trying to make test pass * Suggested changes * Fixing test * Fixing failing test * Error checking * Comment improvements * Adding buffer clearing for apply workflow * Removing clearing for apply workflow --- go.mod | 4 + go.sum | 9 + .../events/events_controller_e2e_test.go | 18 +- server/controllers/logstreaming_controller.go | 120 ++-------- .../logstreaming_controller_test.go | 31 +-- .../controllers_websocketresponsewriter.go | 33 --- .../mocks/matchers/ptr_to_websocket_conn.go | 33 --- server/events/models/models.go | 2 +- server/events/project_command_runner.go | 5 +- server/events/project_command_runner_test.go | 20 +- server/events/terraform/terraform_client.go | 25 +-- .../terraform_client_internal_test.go | 52 ++--- .../events/terraform/terraform_client_test.go | 55 ++--- .../handlers_websocketresponsewriter.go | 33 +++ .../mocks/matchers/http_header.go | 0 .../mocks/matchers/http_responsewriter.go | 0 .../matchers/models_projectcommandcontext.go | 33 +++ .../mocks/matchers/ptr_to_http_request.go | 0 .../mocks/matchers/slice_of_byte.go | 0 .../mock_project_command_output_handler.go | 208 ++++++++++++++++++ .../mocks/mock_websocket_handler.go | 12 +- .../mocks/mock_websocket_response_writer.go | 2 +- .../project_command_output_handler.go | 136 ++++++++++++ .../project_command_output_handler_test.go | 140 ++++++++++++ server/handlers/websocket_handler.go | 34 +++ server/server.go | 28 ++- 26 files changed, 716 insertions(+), 317 deletions(-) delete mode 100644 server/controllers/mocks/matchers/controllers_websocketresponsewriter.go delete mode 100644 server/controllers/mocks/matchers/ptr_to_websocket_conn.go create mode 100644 server/handlers/mocks/matchers/handlers_websocketresponsewriter.go rename server/{controllers => handlers}/mocks/matchers/http_header.go (100%) rename server/{controllers => handlers}/mocks/matchers/http_responsewriter.go (100%) create mode 100644 server/handlers/mocks/matchers/models_projectcommandcontext.go rename server/{controllers => handlers}/mocks/matchers/ptr_to_http_request.go (100%) rename server/{controllers => handlers}/mocks/matchers/slice_of_byte.go (100%) create mode 100644 server/handlers/mocks/mock_project_command_output_handler.go rename server/{controllers => handlers}/mocks/mock_websocket_handler.go (89%) rename server/{controllers => handlers}/mocks/mock_websocket_response_writer.go (98%) create mode 100644 server/handlers/project_command_output_handler.go create mode 100644 server/handlers/project_command_output_handler_test.go create mode 100644 server/handlers/websocket_handler.go diff --git a/go.mod b/go.mod index d446249bab..190250b454 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/Laisky/graphql v1.0.5 github.com/Masterminds/sprig/v3 v3.2.2 github.com/agext/levenshtein v1.2.3 // indirect + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect + github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect github.com/aws/aws-sdk-go v1.31.15 // indirect github.com/bradleyfalzon/ghinstallation v1.1.1 @@ -44,6 +46,7 @@ require ( github.com/nlopes/slack v0.4.0 github.com/petergtz/pegomock v2.9.0+incompatible github.com/pkg/errors v0.9.1 + github.com/posener/wstest v1.2.0 github.com/remeh/sizedwaitgroup v1.0.0 github.com/shurcooL/githubv4 v0.0.0-20191127044304-8f68eb5628d0 github.com/shurcooL/graphql v0.0.0-20181231061246-d48a9a75455f // indirect @@ -59,6 +62,7 @@ require ( go.etcd.io/bbolt v1.3.6 go.uber.org/zap v1.17.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 + gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/validator.v9 v9.31.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index a3bf220b6b..2b76b39b5d 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,10 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 h1:AUNCr9CiJuwrRYS3XieqF+Z9B9gNxo/eANAJCF2eiN4= +github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM= github.com/apparentlymart/go-textseg v1.0.0 h1:rRmlIsPEEhUTIKQb7T++Nz/A5Q6C9IuX2wFoYVvnCs0= @@ -204,6 +208,7 @@ github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY= github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= @@ -352,6 +357,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/posener/wstest v1.2.0 h1:PAY0cRybxOjh0yqSDCrlAGUwtx+GNKpuUfid/08pv48= +github.com/posener/wstest v1.2.0/go.mod h1:GkplCx9zskpudjrMp23LyZHrSonab0aZzh2x0ACGRbU= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo= @@ -761,6 +768,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index 7a85269a6e..47bb650a89 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -35,6 +35,7 @@ import ( "github.com/runatlantis/atlantis/server/events/webhooks" "github.com/runatlantis/atlantis/server/events/yaml" "github.com/runatlantis/atlantis/server/events/yaml/valid" + handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -666,7 +667,7 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl e2eStatusUpdater := &events.DefaultCommitStatusUpdater{Client: e2eVCSClient} e2eGithubGetter := mocks.NewMockGithubPullGetter() e2eGitlabGetter := mocks.NewMockGitlabMergeRequestGetter() - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() // Real dependencies. logger := logging.NewNoopLogger(t) @@ -681,7 +682,7 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl GithubUser: "github-user", GitlabUser: "gitlab-user", } - terraformClient, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", "default-tf-version", "https://releases.hashicorp.com", &NoopTFDownloader{}, false, tempchan) + terraformClient, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", "default-tf-version", "https://releases.hashicorp.com", &NoopTFDownloader{}, false, projectCmdOutputHandler) Ok(t, err) boltdb, err := db.New(dataDir) Ok(t, err) @@ -789,14 +790,14 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl TerraformExecutor: terraformClient, DefaultTFVersion: defaultTFVersion, }, - WorkingDir: workingDir, - Webhooks: &mockWebhookSender{}, - WorkingDirLocker: locker, + WorkingDir: workingDir, + Webhooks: &mockWebhookSender{}, + WorkingDirLocker: locker, + ProjectCmdOutputHandler: projectCmdOutputHandler, AggregateApplyRequirements: &events.AggregateApplyRequirements{ PullApprovedChecker: e2eVCSClient, WorkingDir: workingDir, }, - TerraformOutputChan: tempchan, } dbUpdater := &events.DBUpdater{ @@ -898,11 +899,6 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl repoAllowlistChecker, err := events.NewRepoAllowlistChecker("*") Ok(t, err) - go func() { - for range tempchan { - } - }() - ctrl := events_controllers.VCSEventsController{ TestingMode: true, CommandRunner: commandRunner, diff --git a/server/controllers/logstreaming_controller.go b/server/controllers/logstreaming_controller.go index 6184dddd2d..45a5907b3c 100644 --- a/server/controllers/logstreaming_controller.go +++ b/server/controllers/logstreaming_controller.go @@ -4,7 +4,6 @@ import ( "fmt" "net/http" "net/url" - "sync" "strconv" @@ -13,35 +12,10 @@ import ( "github.com/runatlantis/atlantis/server/controllers/templates" "github.com/runatlantis/atlantis/server/events/db" "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" ) -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_handler.go WebsocketHandler - -type WebsocketHandler interface { - Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketResponseWriter, error) -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_response_writer.go WebsocketResponseWriter -type WebsocketResponseWriter interface { - WriteMessage(messageType int, data []byte) error - Close() error -} - -type DefaultWebsocketHandler struct { - handler websocket.Upgrader -} - -func NewWebsocketHandler() WebsocketHandler { - return &DefaultWebsocketHandler{ - handler: websocket.Upgrader{}, - } -} - -func (wh *DefaultWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketResponseWriter, error) { - return wh.handler.Upgrade(w, r, responseHeader) -} - type LogStreamingController struct { AtlantisVersion string AtlantisURL *url.URL @@ -49,12 +23,9 @@ type LogStreamingController struct { LogStreamTemplate templates.TemplateWriter LogStreamErrorTemplate templates.TemplateWriter Db *db.BoltDB - TerraformOutputChan chan *models.TerraformOutputLine - logBuffers map[string][]string - wsChans map[string]map[chan string]bool - chanLock sync.RWMutex - WebsocketHandler WebsocketHandler + WebsocketHandler handlers.WebsocketHandler + ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler } type PullInfo struct { @@ -64,74 +35,6 @@ type PullInfo struct { ProjectName string } -//Add channel to get Terraform output for current PR project -//Send output currently in buffer -func (j *LogStreamingController) addChan(pull string) chan string { - ch := make(chan string, 1000) - j.chanLock.Lock() - for _, line := range j.logBuffers[pull] { - ch <- line - } - if j.wsChans == nil { - j.wsChans = map[string]map[chan string]bool{} - } - if j.wsChans[pull] == nil { - j.wsChans[pull] = map[chan string]bool{} - } - j.wsChans[pull][ch] = true - j.chanLock.Unlock() - return ch -} - -//Remove channel, so client no longer receives Terraform output -func (j *LogStreamingController) removeChan(pull string, ch chan string) { - j.chanLock.Lock() - delete(j.wsChans[pull], ch) - j.chanLock.Unlock() -} - -//Add log line to buffer and send to all current channels -func (j *LogStreamingController) writeLogLine(pull string, line string) { - j.chanLock.Lock() - if j.logBuffers == nil { - j.logBuffers = map[string][]string{} - } - j.Logger.Info("Project info: %s, content: %s", pull, line) - - for ch := range j.wsChans[pull] { - select { - case ch <- line: - default: - delete(j.wsChans[pull], ch) - } - } - if j.logBuffers[pull] == nil { - j.logBuffers[pull] = []string{} - } - j.logBuffers[pull] = append(j.logBuffers[pull], line) - j.chanLock.Unlock() -} - -//Clear log lines in buffer -func (j *LogStreamingController) clearLogLines(pull string) { - j.chanLock.Lock() - delete(j.logBuffers, pull) - j.chanLock.Unlock() -} - -func (j *LogStreamingController) Listen() { - for msg := range j.TerraformOutputChan { - j.Logger.Info("Recieving message %s", msg.Line) - if msg.ClearBuffBefore { - j.clearLogLines(msg.ProjectInfo) - } - j.writeLogLine(msg.ProjectInfo, msg.Line) - if msg.ClearBuffAfter { - j.clearLogLines(msg.ProjectInfo) - } - } -} - func (p *PullInfo) String() string { return fmt.Sprintf("%s/%s/%d/%s", p.Org, p.Repo, p.Pull, p.ProjectName) } @@ -223,16 +126,21 @@ func (j *LogStreamingController) GetLogStreamWS(w http.ResponseWriter, r *http.R defer c.Close() pull := pullInfo.String() - ch := j.addChan(pull) - defer j.removeChan(pull, ch) - for msg := range ch { - j.Logger.Info(msg) + err = j.ProjectCommandOutputHandler.Receive(pull, func(msg string) error { if err := c.WriteMessage(websocket.BinaryMessage, []byte(msg+"\r\n\t")); err != nil { j.Logger.Warn("Failed to write ws message: %s", err) - return + return err } + return nil + }) + + if err != nil { + j.Logger.Warn("Failed to receive message: %s", err) + j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) + return } + } func (j *LogStreamingController) respond(w http.ResponseWriter, lvl logging.LogLevel, responseCode int, format string, args ...interface{}) { @@ -243,7 +151,7 @@ func (j *LogStreamingController) respond(w http.ResponseWriter, lvl logging.LogL } //repo, pull num, project name moved to db -func (j *LogStreamingController) RetrievePrStatus(pullInfo *PullInfo) (bool, error) { //either implement new func in boltdb +func (j *LogStreamingController) RetrievePrStatus(pullInfo *PullInfo) (bool, error) { pull := models.PullRequest{ Num: pullInfo.Pull, BaseRepo: models.Repo{ diff --git a/server/controllers/logstreaming_controller_test.go b/server/controllers/logstreaming_controller_test.go index a046f41342..3f10cca707 100644 --- a/server/controllers/logstreaming_controller_test.go +++ b/server/controllers/logstreaming_controller_test.go @@ -4,23 +4,21 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "github.com/gorilla/mux" "github.com/runatlantis/atlantis/server/controllers" - "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/logging" . "github.com/petergtz/pegomock" - "github.com/runatlantis/atlantis/server/controllers/mocks" - "github.com/runatlantis/atlantis/server/controllers/mocks/matchers" + "github.com/runatlantis/atlantis/server/handlers/mocks" + "github.com/runatlantis/atlantis/server/handlers/mocks/matchers" ) func TestGetLogStream_WebSockets(t *testing.T) { t.Run("Should Group by Project Info", func(t *testing.T) { RegisterMockTestingT(t) - tempchan := make(chan *models.TerraformOutputLine) websocketMock := mocks.NewMockWebsocketHandler() + projectOutputHandler := mocks.NewMockProjectCommandOutputHandler() logger := logging.NewNoopLogger(t) websocketWriterMock := mocks.NewMockWebsocketResponseWriter() params := map[string]string{ @@ -33,29 +31,14 @@ func TestGetLogStream_WebSockets(t *testing.T) { request = mux.SetURLVars(request, params) response := httptest.NewRecorder() logStreamingController := &controllers.LogStreamingController{ - Logger: logger, - TerraformOutputChan: tempchan, - WebsocketHandler: websocketMock, + Logger: logger, + WebsocketHandler: websocketMock, + ProjectCommandOutputHandler: projectOutputHandler, } When(websocketMock.Upgrade(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest(), matchers.AnyHttpHeader())).ThenReturn(websocketWriterMock, nil) - go func() { - tempchan <- &models.TerraformOutputLine{ - ProjectInfo: "test-org/test-repo/1/test-project", - Line: "Test Terraform Output", - } - }() - - go func() { - logStreamingController.Listen() - }() - - go func() { - logStreamingController.GetLogStreamWS(response, request) - }() - - time.Sleep(1 * time.Second) + logStreamingController.GetLogStreamWS(response, request) websocketWriterMock.VerifyWasCalled(Once()) }) diff --git a/server/controllers/mocks/matchers/controllers_websocketresponsewriter.go b/server/controllers/mocks/matchers/controllers_websocketresponsewriter.go deleted file mode 100644 index 313c0eb127..0000000000 --- a/server/controllers/mocks/matchers/controllers_websocketresponsewriter.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - controllers "github.com/runatlantis/atlantis/server/controllers" -) - -func AnyControllersWebsocketResponseWriter() controllers.WebsocketResponseWriter { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(controllers.WebsocketResponseWriter))(nil)).Elem())) - var nullValue controllers.WebsocketResponseWriter - return nullValue -} - -func EqControllersWebsocketResponseWriter(value controllers.WebsocketResponseWriter) controllers.WebsocketResponseWriter { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue controllers.WebsocketResponseWriter - return nullValue -} - -func NotEqControllersWebsocketResponseWriter(value controllers.WebsocketResponseWriter) controllers.WebsocketResponseWriter { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue controllers.WebsocketResponseWriter - return nullValue -} - -func ControllersWebsocketResponseWriterThat(matcher pegomock.ArgumentMatcher) controllers.WebsocketResponseWriter { - pegomock.RegisterMatcher(matcher) - var nullValue controllers.WebsocketResponseWriter - return nullValue -} diff --git a/server/controllers/mocks/matchers/ptr_to_websocket_conn.go b/server/controllers/mocks/matchers/ptr_to_websocket_conn.go deleted file mode 100644 index 97d2df6208..0000000000 --- a/server/controllers/mocks/matchers/ptr_to_websocket_conn.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - websocket "github.com/gorilla/websocket" -) - -func AnyPtrToWebsocketConn() *websocket.Conn { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(*websocket.Conn))(nil)).Elem())) - var nullValue *websocket.Conn - return nullValue -} - -func EqPtrToWebsocketConn(value *websocket.Conn) *websocket.Conn { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue *websocket.Conn - return nullValue -} - -func NotEqPtrToWebsocketConn(value *websocket.Conn) *websocket.Conn { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue *websocket.Conn - return nullValue -} - -func PtrToWebsocketConnThat(matcher pegomock.ArgumentMatcher) *websocket.Conn { - pegomock.RegisterMatcher(matcher) - var nullValue *websocket.Conn - return nullValue -} diff --git a/server/events/models/models.go b/server/events/models/models.go index d7f6d7e0af..d9823dff1a 100644 --- a/server/events/models/models.go +++ b/server/events/models/models.go @@ -661,7 +661,7 @@ func (c CommandName) TitleString() string { return strings.Title(strings.ReplaceAll(strings.ToLower(c.String()), "_", " ")) } -type TerraformOutputLine struct { +type ProjectCmdOutputLine struct { ProjectInfo string Line string diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index f2d6a8eab9..2107c4ddef 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -23,6 +23,7 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/webhooks" "github.com/runatlantis/atlantis/server/events/yaml/valid" + "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" ) @@ -123,7 +124,8 @@ type DefaultProjectCommandRunner struct { Webhooks WebhooksSender WorkingDirLocker WorkingDirLocker AggregateApplyRequirements ApplyRequirement - TerraformOutputChan chan<- *models.TerraformOutputLine + ProjectCmdOutputLine models.ProjectCmdOutputLine + ProjectCmdOutputHandler handlers.ProjectCommandOutputHandler LogStreamURLGenerator LogStreamURLGenerator } @@ -294,6 +296,7 @@ func (p *DefaultProjectCommandRunner) doPlan(ctx models.ProjectCommandContext) ( return nil, "", DirNotExistErr{RepoRelDir: ctx.RepoRelDir} } + p.ProjectCmdOutputHandler.Clear(ctx) outputs, err := p.runSteps(ctx.Steps, ctx, projAbsPath) if err != nil { diff --git a/server/events/project_command_runner_test.go b/server/events/project_command_runner_test.go index 80349f5ae4..bd9f440369 100644 --- a/server/events/project_command_runner_test.go +++ b/server/events/project_command_runner_test.go @@ -27,6 +27,7 @@ import ( mocks2 "github.com/runatlantis/atlantis/server/events/runtime/mocks" tmocks "github.com/runatlantis/atlantis/server/events/terraform/mocks" "github.com/runatlantis/atlantis/server/events/yaml/valid" + handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -41,7 +42,7 @@ func TestDefaultProjectCommandRunner_Plan(t *testing.T) { realEnv := runtime.EnvStepRunner{} mockWorkingDir := mocks.NewMockWorkingDir() mockLocker := mocks.NewMockProjectLocker() - mockChannel := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() mockApplyReqHandler := mocks.NewMockApplyRequirement() runner := events.DefaultProjectCommandRunner{ @@ -55,8 +56,8 @@ func TestDefaultProjectCommandRunner_Plan(t *testing.T) { WorkingDir: mockWorkingDir, Webhooks: nil, WorkingDirLocker: events.NewDefaultWorkingDirLocker(), + ProjectCmdOutputHandler: projectCmdOutputHandler, AggregateApplyRequirements: mockApplyReqHandler, - TerraformOutputChan: mockChannel, } repoDir, cleanup := TempDir(t) @@ -392,13 +393,14 @@ func TestDefaultProjectCommandRunner_RunEnvSteps(t *testing.T) { mockLocker := mocks.NewMockProjectLocker() runner := events.DefaultProjectCommandRunner{ - Locker: mockLocker, - LockURLGenerator: mockURLGenerator{}, - RunStepRunner: &run, - EnvStepRunner: &env, - WorkingDir: mockWorkingDir, - Webhooks: nil, - WorkingDirLocker: events.NewDefaultWorkingDirLocker(), + Locker: mockLocker, + LockURLGenerator: mockURLGenerator{}, + RunStepRunner: &run, + EnvStepRunner: &env, + WorkingDir: mockWorkingDir, + Webhooks: nil, + WorkingDirLocker: events.NewDefaultWorkingDirLocker(), + ProjectCmdOutputHandler: handlermocks.NewMockProjectCommandOutputHandler(), } repoDir, cleanup := TempDir(t) diff --git a/server/events/terraform/terraform_client.go b/server/events/terraform/terraform_client.go index b983074ac4..1d344a280d 100644 --- a/server/events/terraform/terraform_client.go +++ b/server/events/terraform/terraform_client.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" ) @@ -73,7 +74,7 @@ type DefaultClient struct { // usePluginCache determines whether or not to set the TF_PLUGIN_CACHE_DIR env var usePluginCache bool - terraformOutputChan chan<- *models.TerraformOutputLine + projectCmdOutputHandler handlers.ProjectCommandOutputHandler } //go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_downloader.go Downloader @@ -105,7 +106,7 @@ func NewClientWithDefaultVersion( tfDownloader Downloader, usePluginCache bool, fetchAsync bool, - terraformOutputChan chan<- *models.TerraformOutputLine, + projectCmdOutputHandler handlers.ProjectCommandOutputHandler, ) (*DefaultClient, error) { var finalDefaultVersion *version.Version var localVersion *version.Version @@ -172,7 +173,7 @@ func NewClientWithDefaultVersion( versionsLock: &versionsLock, versions: versions, usePluginCache: usePluginCache, - terraformOutputChan: terraformOutputChan, + projectCmdOutputHandler: projectCmdOutputHandler, }, nil } @@ -188,7 +189,7 @@ func NewTestClient( tfDownloadURL string, tfDownloader Downloader, usePluginCache bool, - terraformOutputChan chan<- *models.TerraformOutputLine, + projectCmdOutputHandler handlers.ProjectCommandOutputHandler, ) (*DefaultClient, error) { return NewClientWithDefaultVersion( log, @@ -202,7 +203,7 @@ func NewTestClient( tfDownloader, usePluginCache, false, - terraformOutputChan, + projectCmdOutputHandler, ) } @@ -225,7 +226,7 @@ func NewClient( tfDownloadURL string, tfDownloader Downloader, usePluginCache bool, - terraformOutputChan chan<- *models.TerraformOutputLine, + projectCmdOutputHandler handlers.ProjectCommandOutputHandler, ) (*DefaultClient, error) { return NewClientWithDefaultVersion( log, @@ -239,7 +240,7 @@ func NewClient( tfDownloader, usePluginCache, true, - terraformOutputChan, + projectCmdOutputHandler, ) } @@ -407,10 +408,7 @@ func (c *DefaultClient) RunCommandAsync(ctx models.ProjectCommandContext, path s for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.terraformOutputChan <- &models.TerraformOutputLine{ - ProjectInfo: ctx.PullInfo(), - Line: message, - } + c.projectCmdOutputHandler.Send(ctx, message) } wg.Done() }() @@ -419,10 +417,7 @@ func (c *DefaultClient) RunCommandAsync(ctx models.ProjectCommandContext, path s for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.terraformOutputChan <- &models.TerraformOutputLine{ - ProjectInfo: ctx.PullInfo(), - Line: message, - } + c.projectCmdOutputHandler.Send(ctx, message) } wg.Done() }() diff --git a/server/events/terraform/terraform_client_internal_test.go b/server/events/terraform/terraform_client_internal_test.go index 4f40eddd4c..88291f5590 100644 --- a/server/events/terraform/terraform_client_internal_test.go +++ b/server/events/terraform/terraform_client_internal_test.go @@ -10,6 +10,7 @@ import ( version "github.com/hashicorp/go-version" "github.com/runatlantis/atlantis/server/events/models" + handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -91,7 +92,8 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -114,7 +116,7 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) { terraformPluginCacheDir: tmp, overrideTF: "echo", usePluginCache: true, - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } args := []string{ @@ -125,7 +127,6 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) { "DIR=$DIR", } customEnvVars := map[string]string{} - waitTfStreaming(tempchan) out, err := client.RunCommandWithVersion(ctx, tmp, args, customEnvVars, nil, "workspace") Ok(t, err) exp := fmt.Sprintf("TF_IN_AUTOMATION=true TF_PLUGIN_CACHE_DIR=%s WORKSPACE=workspace ATLANTIS_TERRAFORM_VERSION=0.11.11 DIR=%s\n", tmp, tmp) @@ -138,7 +139,8 @@ func TestDefaultClient_RunCommandWithVersion_Error(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -160,7 +162,7 @@ func TestDefaultClient_RunCommandWithVersion_Error(t *testing.T) { defaultVersion: v, terraformPluginCacheDir: tmp, overrideTF: "echo", - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } args := []string{ @@ -169,7 +171,6 @@ func TestDefaultClient_RunCommandWithVersion_Error(t *testing.T) { "exit", "1", } - waitTfStreaming(tempchan) out, err := client.RunCommandWithVersion(ctx, tmp, args, map[string]string{}, nil, "workspace") ErrEquals(t, fmt.Sprintf(`running "echo dying && exit 1" in %q: exit status 1`, tmp), err) // Test that we still get our output. @@ -181,7 +182,8 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -204,7 +206,7 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) { terraformPluginCacheDir: tmp, overrideTF: "echo", usePluginCache: true, - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } args := []string{ @@ -214,7 +216,6 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) { "ATLANTIS_TERRAFORM_VERSION=$ATLANTIS_TERRAFORM_VERSION", "DIR=$DIR", } - waitTfStreaming(tempchan) _, outCh := client.RunCommandAsync(ctx, tmp, args, map[string]string{}, nil, "workspace") out, err := waitCh(outCh) @@ -228,7 +229,8 @@ func TestDefaultClient_RunCommandAsync_BigOutput(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -250,7 +252,7 @@ func TestDefaultClient_RunCommandAsync_BigOutput(t *testing.T) { defaultVersion: v, terraformPluginCacheDir: tmp, overrideTF: "cat", - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } filename := filepath.Join(tmp, "data") f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) @@ -263,7 +265,6 @@ func TestDefaultClient_RunCommandAsync_BigOutput(t *testing.T) { _, err = f.WriteString(s) Ok(t, err) } - waitTfStreaming(tempchan) _, outCh := client.RunCommandAsync(ctx, tmp, []string{filename}, map[string]string{}, nil, "workspace") out, err := waitCh(outCh) @@ -276,7 +277,8 @@ func TestDefaultClient_RunCommandAsync_StderrOutput(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -298,9 +300,8 @@ func TestDefaultClient_RunCommandAsync_StderrOutput(t *testing.T) { defaultVersion: v, terraformPluginCacheDir: tmp, overrideTF: "echo", - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } - waitTfStreaming(tempchan) _, outCh := client.RunCommandAsync(ctx, tmp, []string{"stderr", ">&2"}, map[string]string{}, nil, "workspace") out, err := waitCh(outCh) @@ -313,7 +314,8 @@ func TestDefaultClient_RunCommandAsync_ExitOne(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -335,9 +337,8 @@ func TestDefaultClient_RunCommandAsync_ExitOne(t *testing.T) { defaultVersion: v, terraformPluginCacheDir: tmp, overrideTF: "echo", - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } - waitTfStreaming(tempchan) _, outCh := client.RunCommandAsync(ctx, tmp, []string{"dying", "&&", "exit", "1"}, map[string]string{}, nil, "workspace") out, err := waitCh(outCh) @@ -351,7 +352,8 @@ func TestDefaultClient_RunCommandAsync_Input(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + ctx := models.ProjectCommandContext{ Log: logger, Workspace: "default", @@ -373,9 +375,9 @@ func TestDefaultClient_RunCommandAsync_Input(t *testing.T) { defaultVersion: v, terraformPluginCacheDir: tmp, overrideTF: "read", - terraformOutputChan: tempchan, + projectCmdOutputHandler: projectCmdOutputHandler, } - waitTfStreaming(tempchan) + inCh, outCh := client.RunCommandAsync(ctx, tmp, []string{"a", "&&", "echo", "$a"}, map[string]string{}, nil, "workspace") inCh <- "echo me\n" @@ -394,11 +396,3 @@ func waitCh(ch <-chan Line) (string, error) { } return strings.Join(ls, "\n"), nil } - -func waitTfStreaming(ch chan *models.TerraformOutputLine) { - go func() { - for range ch { - } - close(ch) - }() -} diff --git a/server/events/terraform/terraform_client_test.go b/server/events/terraform/terraform_client_test.go index 5d8f62fa19..9ba05356f1 100644 --- a/server/events/terraform/terraform_client_test.go +++ b/server/events/terraform/terraform_client_test.go @@ -29,6 +29,7 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/terraform" "github.com/runatlantis/atlantis/server/events/terraform/mocks" + handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -61,7 +62,7 @@ Your version of Terraform is out of date! The latest version is 0.11.13. You can update by downloading from www.terraform.io/downloads.html ` tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -77,14 +78,12 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html Ok(t, err) defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))() - c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, tempchan) + c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) Ok(t, err) Ok(t, err) Equals(t, "0.11.10", c.DefaultVersion().String()) - waitTfStreaming(tempchan) - output, err := c.RunCommandWithVersion(ctx, tmp, nil, map[string]string{"test": "123"}, nil, "") Ok(t, err) Equals(t, fakeBinOut+"\n", output) @@ -100,7 +99,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html ` logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -114,13 +113,12 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html Ok(t, err) defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))() - c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, tempchan) + c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) Ok(t, err) Ok(t, err) Equals(t, "0.11.10", c.DefaultVersion().String()) - waitTfStreaming(tempchan) output, err := c.RunCommandWithVersion(ctx, tmp, nil, map[string]string{}, nil, "") Ok(t, err) Equals(t, fakeBinOut+"\n", output) @@ -131,13 +129,13 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html func TestNewClient_NoTF(t *testing.T) { logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan<- *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() defer cleanup() // Set PATH to only include our empty directory. defer tempSetEnv(t, "PATH", tmp)() - _, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, tempchan) + _, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) ErrEquals(t, "terraform not found in $PATH. Set --default-tf-version or download terraform from https://www.terraform.io/downloads.html", err) } @@ -147,7 +145,7 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) { fakeBinOut := "Terraform v0.11.10\n" logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -161,14 +159,12 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) { Ok(t, err) defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))() - c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, tempchan) + c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) Ok(t, err) Ok(t, err) Equals(t, "0.11.10", c.DefaultVersion().String()) - waitTfStreaming(tempchan) - output, err := c.RunCommandWithVersion(ctx, tmp, nil, map[string]string{}, nil, "") Ok(t, err) Equals(t, fakeBinOut+"\n", output) @@ -179,7 +175,7 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) { func TestNewClient_DefaultTFFlagInBinDir(t *testing.T) { fakeBinOut := "Terraform v0.11.10\n" tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -192,14 +188,12 @@ func TestNewClient_DefaultTFFlagInBinDir(t *testing.T) { Ok(t, err) defer tempSetEnv(t, "PATH", fmt.Sprintf("%s:%s", tmp, os.Getenv("PATH")))() - c, err := terraform.NewClient(logging.NewNoopLogger(t), binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, tempchan) + c, err := terraform.NewClient(logging.NewNoopLogger(t), binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) Ok(t, err) Ok(t, err) Equals(t, "0.11.10", c.DefaultVersion().String()) - waitTfStreaming(tempchan) - output, err := c.RunCommandWithVersion(ctx, tmp, nil, map[string]string{}, nil, "") Ok(t, err) Equals(t, fakeBinOut+"\n", output) @@ -210,7 +204,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) { RegisterMockTestingT(t) logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -227,7 +221,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) { err := ioutil.WriteFile(params[0].(string), []byte("#!/bin/sh\necho '\nTerraform v0.11.10\n'"), 0700) // #nosec G306 return []pegomock.ReturnValue{err} }) - c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, "https://my-mirror.releases.mycompany.com", mockDownloader, true, tempchan) + c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, "https://my-mirror.releases.mycompany.com", mockDownloader, true, projectCmdOutputHandler) Ok(t, err) Ok(t, err) @@ -243,7 +237,6 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) { // Reset PATH so that it has sh. Ok(t, os.Setenv("PATH", orig)) - waitTfStreaming(tempchan) output, err := c.RunCommandWithVersion(ctx, tmp, nil, map[string]string{}, nil, "") Ok(t, err) Equals(t, "\nTerraform v0.11.10\n\n", output) @@ -253,9 +246,9 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) { func TestNewClient_BadVersion(t *testing.T) { logger := logging.NewNoopLogger(t) _, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan<- *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() defer cleanup() - _, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, tempchan) + _, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) ErrEquals(t, "Malformed version: malformed", err) } @@ -264,7 +257,7 @@ func TestRunCommandWithVersion_DLsTF(t *testing.T) { logger := logging.NewNoopLogger(t) RegisterMockTestingT(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -285,15 +278,13 @@ func TestRunCommandWithVersion_DLsTF(t *testing.T) { return []pegomock.ReturnValue{err} }) - c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true, tempchan) + c, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true, projectCmdOutputHandler) Ok(t, err) Equals(t, "0.11.10", c.DefaultVersion().String()) v, err := version.NewVersion("99.99.99") Ok(t, err) - waitTfStreaming(tempchan) - output, err := c.RunCommandWithVersion(ctx, tmp, nil, map[string]string{}, v, "") Assert(t, err == nil, "err: %s: %s", err, output) @@ -305,12 +296,12 @@ func TestEnsureVersion_downloaded(t *testing.T) { logger := logging.NewNoopLogger(t) RegisterMockTestingT(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - tempchan := make(chan<- *models.TerraformOutputLine) + projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() defer cleanup() mockDownloader := mocks.NewMockDownloader() - c, err := terraform.NewTestClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true, tempchan) + c, err := terraform.NewTestClient(logger, binDir, cacheDir, "", "", "0.11.10", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, mockDownloader, true, projectCmdOutputHandler) Ok(t, err) Equals(t, "0.11.10", c.DefaultVersion().String()) @@ -352,11 +343,3 @@ func mkSubDirs(t *testing.T) (string, string, string, func()) { return tmp, binDir, cachedir, cleanup } - -func waitTfStreaming(ch chan *models.TerraformOutputLine) { - go func() { - for range ch { - } - close(ch) - }() -} diff --git a/server/handlers/mocks/matchers/handlers_websocketresponsewriter.go b/server/handlers/mocks/matchers/handlers_websocketresponsewriter.go new file mode 100644 index 0000000000..a2a1b4d443 --- /dev/null +++ b/server/handlers/mocks/matchers/handlers_websocketresponsewriter.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + handlers "github.com/runatlantis/atlantis/server/handlers" +) + +func AnyHandlersWebsocketResponseWriter() handlers.WebsocketResponseWriter { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(handlers.WebsocketResponseWriter))(nil)).Elem())) + var nullValue handlers.WebsocketResponseWriter + return nullValue +} + +func EqHandlersWebsocketResponseWriter(value handlers.WebsocketResponseWriter) handlers.WebsocketResponseWriter { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue handlers.WebsocketResponseWriter + return nullValue +} + +func NotEqHandlersWebsocketResponseWriter(value handlers.WebsocketResponseWriter) handlers.WebsocketResponseWriter { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue handlers.WebsocketResponseWriter + return nullValue +} + +func HandlersWebsocketResponseWriterThat(matcher pegomock.ArgumentMatcher) handlers.WebsocketResponseWriter { + pegomock.RegisterMatcher(matcher) + var nullValue handlers.WebsocketResponseWriter + return nullValue +} diff --git a/server/controllers/mocks/matchers/http_header.go b/server/handlers/mocks/matchers/http_header.go similarity index 100% rename from server/controllers/mocks/matchers/http_header.go rename to server/handlers/mocks/matchers/http_header.go diff --git a/server/controllers/mocks/matchers/http_responsewriter.go b/server/handlers/mocks/matchers/http_responsewriter.go similarity index 100% rename from server/controllers/mocks/matchers/http_responsewriter.go rename to server/handlers/mocks/matchers/http_responsewriter.go diff --git a/server/handlers/mocks/matchers/models_projectcommandcontext.go b/server/handlers/mocks/matchers/models_projectcommandcontext.go new file mode 100644 index 0000000000..535f8b9671 --- /dev/null +++ b/server/handlers/mocks/matchers/models_projectcommandcontext.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + models "github.com/runatlantis/atlantis/server/events/models" +) + +func AnyModelsProjectCommandContext() models.ProjectCommandContext { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(models.ProjectCommandContext))(nil)).Elem())) + var nullValue models.ProjectCommandContext + return nullValue +} + +func EqModelsProjectCommandContext(value models.ProjectCommandContext) models.ProjectCommandContext { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue models.ProjectCommandContext + return nullValue +} + +func NotEqModelsProjectCommandContext(value models.ProjectCommandContext) models.ProjectCommandContext { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue models.ProjectCommandContext + return nullValue +} + +func ModelsProjectCommandContextThat(matcher pegomock.ArgumentMatcher) models.ProjectCommandContext { + pegomock.RegisterMatcher(matcher) + var nullValue models.ProjectCommandContext + return nullValue +} diff --git a/server/controllers/mocks/matchers/ptr_to_http_request.go b/server/handlers/mocks/matchers/ptr_to_http_request.go similarity index 100% rename from server/controllers/mocks/matchers/ptr_to_http_request.go rename to server/handlers/mocks/matchers/ptr_to_http_request.go diff --git a/server/controllers/mocks/matchers/slice_of_byte.go b/server/handlers/mocks/matchers/slice_of_byte.go similarity index 100% rename from server/controllers/mocks/matchers/slice_of_byte.go rename to server/handlers/mocks/matchers/slice_of_byte.go diff --git a/server/handlers/mocks/mock_project_command_output_handler.go b/server/handlers/mocks/mock_project_command_output_handler.go new file mode 100644 index 0000000000..150cf4e55c --- /dev/null +++ b/server/handlers/mocks/mock_project_command_output_handler.go @@ -0,0 +1,208 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ProjectCommandOutputHandler) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + models "github.com/runatlantis/atlantis/server/events/models" + "reflect" + "time" +) + +type MockProjectCommandOutputHandler struct { + fail func(message string, callerSkip ...int) +} + +func NewMockProjectCommandOutputHandler(options ...pegomock.Option) *MockProjectCommandOutputHandler { + mock := &MockProjectCommandOutputHandler{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockProjectCommandOutputHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockProjectCommandOutputHandler) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{ctx, msg} + pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) +} + +func (mock *MockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{ctx} + pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) +} + +func (mock *MockProjectCommandOutputHandler) Receive(projectInfo string, callback func(string) error) error { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{projectInfo, callback} + result := pegomock.GetGenericMockFrom(mock).Invoke("Receive", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(error) + } + } + return ret0 +} + +func (mock *MockProjectCommandOutputHandler) Handle() { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{} + pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) +} + +func (mock *MockProjectCommandOutputHandler) VerifyWasCalledOnce() *VerifierMockProjectCommandOutputHandler { + return &VerifierMockProjectCommandOutputHandler{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockProjectCommandOutputHandler) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockProjectCommandOutputHandler { + return &VerifierMockProjectCommandOutputHandler{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockProjectCommandOutputHandler) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockProjectCommandOutputHandler { + return &VerifierMockProjectCommandOutputHandler{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockProjectCommandOutputHandler) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockProjectCommandOutputHandler { + return &VerifierMockProjectCommandOutputHandler{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockProjectCommandOutputHandler struct { + mock *MockProjectCommandOutputHandler + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) *MockProjectCommandOutputHandler_Send_OngoingVerification { + params := []pegomock.Param{ctx, msg} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_Send_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { + ctx, msg := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], msg[len(msg)-1] +} + +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.ProjectCommandContext) + } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } + } + return +} + +func (verifier *VerifierMockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { + params := []pegomock.Param{ctx} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { + ctx := c.GetAllCapturedArguments() + return ctx[len(ctx)-1] +} + +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.ProjectCommandContext) + } + } + return +} + +func (verifier *VerifierMockProjectCommandOutputHandler) Receive(projectInfo string, callback func(string) error) *MockProjectCommandOutputHandler_Receive_OngoingVerification { + params := []pegomock.Param{projectInfo, callback} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Receive", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Receive_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_Receive_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetCapturedArguments() (string, func(string) error) { + projectInfo, callback := c.GetAllCapturedArguments() + return projectInfo[len(projectInfo)-1], callback[len(callback)-1] +} + +func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []func(string) error) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([]func(string) error, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(func(string) error) + } + } + return +} + +func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { +} + +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { +} diff --git a/server/controllers/mocks/mock_websocket_handler.go b/server/handlers/mocks/mock_websocket_handler.go similarity index 89% rename from server/controllers/mocks/mock_websocket_handler.go rename to server/handlers/mocks/mock_websocket_handler.go index 552848ba36..e57fb820b4 100644 --- a/server/controllers/mocks/mock_websocket_handler.go +++ b/server/handlers/mocks/mock_websocket_handler.go @@ -1,11 +1,11 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/controllers (interfaces: WebsocketHandler) +// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketHandler) package mocks import ( pegomock "github.com/petergtz/pegomock" - controllers "github.com/runatlantis/atlantis/server/controllers" + handlers "github.com/runatlantis/atlantis/server/handlers" http "net/http" "reflect" "time" @@ -26,17 +26,17 @@ func NewMockWebsocketHandler(options ...pegomock.Option) *MockWebsocketHandler { func (mock *MockWebsocketHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockWebsocketHandler) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (controllers.WebsocketResponseWriter, error) { +func (mock *MockWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (handlers.WebsocketResponseWriter, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") } params := []pegomock.Param{w, r, responseHeader} - result := pegomock.GetGenericMockFrom(mock).Invoke("Upgrade", params, []reflect.Type{reflect.TypeOf((*controllers.WebsocketResponseWriter)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 controllers.WebsocketResponseWriter + result := pegomock.GetGenericMockFrom(mock).Invoke("Upgrade", params, []reflect.Type{reflect.TypeOf((*handlers.WebsocketResponseWriter)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 handlers.WebsocketResponseWriter var ret1 error if len(result) != 0 { if result[0] != nil { - ret0 = result[0].(controllers.WebsocketResponseWriter) + ret0 = result[0].(handlers.WebsocketResponseWriter) } if result[1] != nil { ret1 = result[1].(error) diff --git a/server/controllers/mocks/mock_websocket_response_writer.go b/server/handlers/mocks/mock_websocket_response_writer.go similarity index 98% rename from server/controllers/mocks/mock_websocket_response_writer.go rename to server/handlers/mocks/mock_websocket_response_writer.go index b8a7eee58f..819c8fe812 100644 --- a/server/controllers/mocks/mock_websocket_response_writer.go +++ b/server/handlers/mocks/mock_websocket_response_writer.go @@ -1,5 +1,5 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/controllers (interfaces: WebsocketResponseWriter) +// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketResponseWriter) package mocks diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go new file mode 100644 index 0000000000..fbce4b86e8 --- /dev/null +++ b/server/handlers/project_command_output_handler.go @@ -0,0 +1,136 @@ +package handlers + +import ( + "fmt" + "sync" + + "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/logging" +) + +type DefaultProjectCommandOutputHandler struct { + // this is TerraformOutputChan + ProjectCmdOutput chan *models.ProjectCmdOutputLine + // this logBuffers + projectOutputBuffers map[string][]string + // this is wsChans + receiverBuffers map[string]map[chan string]bool + // same as chanLock + controllerBufferLock sync.RWMutex + logger logging.SimpleLogging +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_command_output_handler.go ProjectCommandOutputHandler + +type ProjectCommandOutputHandler interface { + // Send will enqueue the msg and wait for Handle() to receive the message. + Send(ctx models.ProjectCommandContext, msg string) + + // Clears buffer for new project to run + Clear(ctx models.ProjectCommandContext) + + // Receive will create a channel for projectPullInfo and run a callback function argument when the new channel receives a message. + Receive(projectInfo string, callback func(msg string) error) error + + // Listens for msg from channel + Handle() +} + +func NewProjectCommandOutputHandler(projectCmdOutput chan *models.ProjectCmdOutputLine, logger logging.SimpleLogging) ProjectCommandOutputHandler { + return &DefaultProjectCommandOutputHandler{ + ProjectCmdOutput: projectCmdOutput, + logger: logger, + receiverBuffers: map[string]map[chan string]bool{}, + projectOutputBuffers: map[string][]string{}, + } +} + +func (p *DefaultProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { + p.ProjectCmdOutput <- &models.ProjectCmdOutputLine{ + ProjectInfo: ctx.PullInfo(), + Line: msg, + } +} + +func (p *DefaultProjectCommandOutputHandler) Receive(projectInfo string, callback func(msg string) error) error { + ch := p.addChan(projectInfo) + defer p.removeChan(projectInfo, ch) + + for msg := range ch { + if err := callback(msg); err != nil { + return err + } + } + + return nil +} + +func (p *DefaultProjectCommandOutputHandler) Handle() { + fmt.Printf("Testing Handle func") + for msg := range p.ProjectCmdOutput { + p.logger.Info("Receiving message %s", msg.Line) + fmt.Printf("Receiving message %s", msg.Line) + if msg.ClearBuffBefore { + p.clearLogLines(msg.ProjectInfo) + } + p.writeLogLine(msg.ProjectInfo, msg.Line) + if msg.ClearBuffAfter { + p.clearLogLines(msg.ProjectInfo) + } + } +} + +func (p *DefaultProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) { + p.ProjectCmdOutput <- &models.ProjectCmdOutputLine{ + ProjectInfo: ctx.PullInfo(), + Line: "", + ClearBuffBefore: true, + } +} + +func (p *DefaultProjectCommandOutputHandler) clearLogLines(pull string) { + p.controllerBufferLock.Lock() + delete(p.projectOutputBuffers, pull) + p.controllerBufferLock.Unlock() +} + +func (p *DefaultProjectCommandOutputHandler) addChan(pull string) chan string { + ch := make(chan string, 1000) + p.controllerBufferLock.Lock() + for _, line := range p.projectOutputBuffers[pull] { + ch <- line + } + if p.receiverBuffers[pull] == nil { + p.receiverBuffers[pull] = map[chan string]bool{} + } + p.receiverBuffers[pull][ch] = true + p.controllerBufferLock.Unlock() + return ch +} + +//Add log line to buffer and send to all current channels +func (p *DefaultProjectCommandOutputHandler) writeLogLine(pull string, line string) { + p.controllerBufferLock.Lock() + p.logger.Info("Project info: %s, content: %s", pull, line) + + for ch := range p.receiverBuffers[pull] { + select { + case ch <- line: + default: + delete(p.receiverBuffers[pull], ch) + } + } + if p.projectOutputBuffers[pull] == nil { + p.projectOutputBuffers[pull] = []string{} + } + p.projectOutputBuffers[pull] = append(p.projectOutputBuffers[pull], line) + p.controllerBufferLock.Unlock() +} + +//Remove channel, so client no longer receives Terraform output +func (p *DefaultProjectCommandOutputHandler) removeChan(pull string, ch chan string) { + p.controllerBufferLock.Lock() + delete(p.receiverBuffers[pull], ch) + close(ch) + p.controllerBufferLock.Unlock() +} diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go new file mode 100644 index 0000000000..d979481feb --- /dev/null +++ b/server/handlers/project_command_output_handler_test.go @@ -0,0 +1,140 @@ +package handlers_test + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/handlers" + "github.com/runatlantis/atlantis/server/logging" + . "github.com/runatlantis/atlantis/testing" +) + +func TestProjectCommandOutputHandler(t *testing.T) { + t.Run("Should Group by Project Info", func(t *testing.T) { + tempchan := make(chan *models.ProjectCmdOutputLine) + logger := logging.NewNoopLogger(t) + projectOutputHandler := handlers.NewProjectCommandOutputHandler(tempchan, logger) + ctx := models.ProjectCommandContext{ + BaseRepo: models.Repo{ + Name: "test-repo", + Owner: "test-org", + }, + HeadRepo: models.Repo{ + Name: "test-repo", + Owner: "test-org", + }, + Pull: models.PullRequest{ + Num: 1, + HeadBranch: "add-feat", + BaseBranch: "master", + Author: "acme", + }, + User: models.User{ + Username: "acme-user", + }, + Log: logger, + Workspace: "myworkspace", + RepoRelDir: "mydir", + ProjectName: "test-project", + } + fmt.Printf("Testing Handle func in unit test") + + var wg sync.WaitGroup + + go func() { + projectOutputHandler.Handle() + }() + + go func() { + projectOutputHandler.Send(ctx, "Test Terraform Output") + }() + + expectMsg := "" + wg.Add(1) + go func() { + err := projectOutputHandler.Receive(ctx.PullInfo(), func(msg string) error { + expectMsg = msg + wg.Done() + return nil + }) + Ok(t, err) + }() + wg.Wait() + Equals(t, expectMsg, "Test Terraform Output") + + time.Sleep(1 * time.Second) + + }) +} + +func TestProjectCommandOutputHandler_ClearBuff(t *testing.T) { + t.Run("Should Group by Project Info", func(t *testing.T) { + tempchan := make(chan *models.ProjectCmdOutputLine) + logger := logging.NewNoopLogger(t) + projectOutputHandler := handlers.NewProjectCommandOutputHandler(tempchan, logger) + ctx := models.ProjectCommandContext{ + BaseRepo: models.Repo{ + Name: "test-repo", + Owner: "test-org", + }, + HeadRepo: models.Repo{ + Name: "test-repo", + Owner: "test-org", + }, + Pull: models.PullRequest{ + Num: 1, + HeadBranch: "add-feat", + BaseBranch: "master", + Author: "acme", + }, + User: models.User{ + Username: "acme-user", + }, + Log: logger, + Workspace: "myworkspace", + RepoRelDir: "mydir", + ProjectName: "test-project", + } + fmt.Printf("Testing Handle func in unit test") + + var wg sync.WaitGroup + + go func() { + projectOutputHandler.Handle() + }() + + wg.Add(1) + go func() { + projectOutputHandler.Send(ctx, "Test Terraform Output") + }() + + wg.Add(1) + go func() { + projectOutputHandler.Clear(ctx) + }() + + wg.Add(1) + go func() { + projectOutputHandler.Send(ctx, "Test Terraform Output") + }() + + expectMsg := "" + go func() { + err := projectOutputHandler.Receive(ctx.PullInfo(), func(msg string) error { + expectMsg = msg + wg.Done() + return nil + }) + Ok(t, err) + }() + wg.Wait() + + Equals(t, expectMsg, "Test Terraform Output") + + time.Sleep(1 * time.Second) + + }) +} diff --git a/server/handlers/websocket_handler.go b/server/handlers/websocket_handler.go new file mode 100644 index 0000000000..90459aec13 --- /dev/null +++ b/server/handlers/websocket_handler.go @@ -0,0 +1,34 @@ +package handlers + +import ( + "net/http" + + "github.com/gorilla/websocket" +) + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_handler.go WebsocketHandler + +type WebsocketHandler interface { + Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketResponseWriter, error) +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_response_writer.go WebsocketResponseWriter + +type WebsocketResponseWriter interface { + WriteMessage(messageType int, data []byte) error + Close() error +} + +type DefaultWebsocketHandler struct { + handler websocket.Upgrader +} + +func NewWebsocketHandler() WebsocketHandler { + return &DefaultWebsocketHandler{ + handler: websocket.Upgrader{}, + } +} + +func (wh *DefaultWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketResponseWriter, error) { + return wh.handler.Upgrade(w, r, responseHeader) +} diff --git a/server/server.go b/server/server.go index 245f6bb474..f6e42cfdbe 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/mitchellh/go-homedir" "github.com/runatlantis/atlantis/server/events/db" "github.com/runatlantis/atlantis/server/events/yaml/valid" + "github.com/runatlantis/atlantis/server/handlers" assetfs "github.com/elazarl/go-bindata-assetfs" "github.com/gorilla/mux" @@ -104,6 +105,7 @@ type Server struct { SSLKeyFile string Drainer *events.Drainer ScheduledExecutorService *ScheduledExecutorService + ProjectCmdOutputHandler handlers.ProjectCommandOutputHandler } // Config holds config for server that isn't passed in by the user. @@ -277,7 +279,8 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { } vcsClient := vcs.NewClientProxy(githubClient, gitlabClient, bitbucketCloudClient, bitbucketServerClient, azuredevopsClient) commitStatusUpdater := &events.DefaultCommitStatusUpdater{Client: vcsClient, StatusName: userConfig.VCSStatusName} - terraformOutputChan := make(chan *models.TerraformOutputLine) + projectCmdOutput := make(chan *models.ProjectCmdOutputLine) + projectCmdOutputHandler := handlers.NewProjectCommandOutputHandler(projectCmdOutput, logger) binDir, err := mkSubDir(userConfig.DataDir, BinDirName) @@ -302,7 +305,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { userConfig.TFDownloadURL, &terraform.DefaultDownloader{}, true, - terraformOutputChan) + projectCmdOutputHandler) // The flag.Lookup call is to detect if we're running in a unit test. If we // are, then we don't error out because we don't have/want terraform // installed on our CI system where the unit tests run. @@ -511,7 +514,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { WorkingDir: workingDir, Webhooks: webhooksManager, WorkingDirLocker: workingDirLocker, - TerraformOutputChan: terraformOutputChan, + ProjectCmdOutputHandler: projectCmdOutputHandler, AggregateApplyRequirements: applyRequirementHandler, } @@ -637,14 +640,14 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { } logStreamingController := &controllers.LogStreamingController{ - AtlantisVersion: config.AtlantisVersion, - AtlantisURL: parsedURL, - Logger: logger, - LogStreamTemplate: templates.LogStreamingTemplate, - LogStreamErrorTemplate: templates.LogStreamErrorTemplate, - Db: boltdb, - TerraformOutputChan: terraformOutputChan, - WebsocketHandler: controllers.NewWebsocketHandler(), + AtlantisVersion: config.AtlantisVersion, + AtlantisURL: parsedURL, + Logger: logger, + LogStreamTemplate: templates.LogStreamingTemplate, + LogStreamErrorTemplate: templates.LogStreamErrorTemplate, + Db: boltdb, + WebsocketHandler: handlers.NewWebsocketHandler(), + ProjectCommandOutputHandler: projectCmdOutputHandler, } eventsController := &events_controllers.VCSEventsController{ @@ -734,6 +737,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { SSLCertFile: userConfig.SSLCertFile, Drainer: drainer, ScheduledExecutorService: scheduledExecutorService, + ProjectCmdOutputHandler: projectCmdOutputHandler, }, nil } @@ -773,7 +777,7 @@ func (s *Server) Start() error { go s.ScheduledExecutorService.Run() go func() { - s.LogStreamingController.Listen() + s.ProjectCmdOutputHandler.Handle() }() server := &http.Server{Addr: fmt.Sprintf(":%d", s.Port), Handler: n}