Skip to content

Commit

Permalink
Channel Refactoring (#91)
Browse files Browse the repository at this point in the history
* Began channel refactoring

* Fixing refactoring logic

* Replacing terraformOutputChan

* Making suggested changes

* Changing attribute declarations

* Fixed channel logic

* Formatting

* Changing tests to utilize mocks

* Adding go files

* Adding suggested changes

* Suggested changes before merging

* Reformatting

* Deleting unnecesary lines

* Buffer cleanup logic

* Buffer cleanup & Testing

* Trying to make test pass

* Suggested changes

* Fixing test

* Fixing failing test

* Error checking

* Comment improvements

* Adding buffer clearing for apply workflow

* Removing clearing for apply workflow
  • Loading branch information
isatasan authored Aug 17, 2021
1 parent beebc22 commit a44c966
Show file tree
Hide file tree
Showing 26 changed files with 716 additions and 317 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ require (
github.com/Laisky/graphql v1.0.5
github.com/Masterminds/sprig/v3 v3.2.2
github.com/agext/levenshtein v1.2.3 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
github.com/aws/aws-sdk-go v1.31.15 // indirect
github.com/bradleyfalzon/ghinstallation v1.1.1
Expand Down Expand Up @@ -44,6 +46,7 @@ require (
github.com/nlopes/slack v0.4.0
github.com/petergtz/pegomock v2.9.0+incompatible
github.com/pkg/errors v0.9.1
github.com/posener/wstest v1.2.0
github.com/remeh/sizedwaitgroup v1.0.0
github.com/shurcooL/githubv4 v0.0.0-20191127044304-8f68eb5628d0
github.com/shurcooL/graphql v0.0.0-20181231061246-d48a9a75455f // indirect
Expand All @@ -59,6 +62,7 @@ require (
go.etcd.io/bbolt v1.3.6
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/yaml.v2 v2.4.0
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki
github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo=
github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 h1:AUNCr9CiJuwrRYS3XieqF+Z9B9gNxo/eANAJCF2eiN4=
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
github.com/apparentlymart/go-textseg v1.0.0 h1:rRmlIsPEEhUTIKQb7T++Nz/A5Q6C9IuX2wFoYVvnCs0=
Expand Down Expand Up @@ -204,6 +208,7 @@ github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY=
github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
Expand Down Expand Up @@ -352,6 +357,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/wstest v1.2.0 h1:PAY0cRybxOjh0yqSDCrlAGUwtx+GNKpuUfid/08pv48=
github.com/posener/wstest v1.2.0/go.mod h1:GkplCx9zskpudjrMp23LyZHrSonab0aZzh2x0ACGRbU=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
Expand Down Expand Up @@ -761,6 +768,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
18 changes: 7 additions & 11 deletions server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/runatlantis/atlantis/server/events/webhooks"
"github.com/runatlantis/atlantis/server/events/yaml"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)
Expand Down Expand Up @@ -666,7 +667,7 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
e2eStatusUpdater := &events.DefaultCommitStatusUpdater{Client: e2eVCSClient}
e2eGithubGetter := mocks.NewMockGithubPullGetter()
e2eGitlabGetter := mocks.NewMockGitlabMergeRequestGetter()
tempchan := make(chan *models.TerraformOutputLine)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()

// Real dependencies.
logger := logging.NewNoopLogger(t)
Expand All @@ -681,7 +682,7 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
GithubUser: "github-user",
GitlabUser: "gitlab-user",
}
terraformClient, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", "default-tf-version", "https://releases.hashicorp.com", &NoopTFDownloader{}, false, tempchan)
terraformClient, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", "default-tf-version", "https://releases.hashicorp.com", &NoopTFDownloader{}, false, projectCmdOutputHandler)
Ok(t, err)
boltdb, err := db.New(dataDir)
Ok(t, err)
Expand Down Expand Up @@ -789,14 +790,14 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
TerraformExecutor: terraformClient,
DefaultTFVersion: defaultTFVersion,
},
WorkingDir: workingDir,
Webhooks: &mockWebhookSender{},
WorkingDirLocker: locker,
WorkingDir: workingDir,
Webhooks: &mockWebhookSender{},
WorkingDirLocker: locker,
ProjectCmdOutputHandler: projectCmdOutputHandler,
AggregateApplyRequirements: &events.AggregateApplyRequirements{
PullApprovedChecker: e2eVCSClient,
WorkingDir: workingDir,
},
TerraformOutputChan: tempchan,
}

dbUpdater := &events.DBUpdater{
Expand Down Expand Up @@ -898,11 +899,6 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
repoAllowlistChecker, err := events.NewRepoAllowlistChecker("*")
Ok(t, err)

go func() {
for range tempchan {
}
}()

ctrl := events_controllers.VCSEventsController{
TestingMode: true,
CommandRunner: commandRunner,
Expand Down
120 changes: 14 additions & 106 deletions server/controllers/logstreaming_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"net/url"
"sync"

"strconv"

Expand All @@ -13,48 +12,20 @@ import (
"github.com/runatlantis/atlantis/server/controllers/templates"
"github.com/runatlantis/atlantis/server/events/db"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
)

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_handler.go WebsocketHandler

type WebsocketHandler interface {
Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketResponseWriter, error)
}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_response_writer.go WebsocketResponseWriter
type WebsocketResponseWriter interface {
WriteMessage(messageType int, data []byte) error
Close() error
}

type DefaultWebsocketHandler struct {
handler websocket.Upgrader
}

func NewWebsocketHandler() WebsocketHandler {
return &DefaultWebsocketHandler{
handler: websocket.Upgrader{},
}
}

func (wh *DefaultWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketResponseWriter, error) {
return wh.handler.Upgrade(w, r, responseHeader)
}

type LogStreamingController struct {
AtlantisVersion string
AtlantisURL *url.URL
Logger logging.SimpleLogging
LogStreamTemplate templates.TemplateWriter
LogStreamErrorTemplate templates.TemplateWriter
Db *db.BoltDB
TerraformOutputChan chan *models.TerraformOutputLine

logBuffers map[string][]string
wsChans map[string]map[chan string]bool
chanLock sync.RWMutex
WebsocketHandler WebsocketHandler
WebsocketHandler handlers.WebsocketHandler
ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler
}

type PullInfo struct {
Expand All @@ -64,74 +35,6 @@ type PullInfo struct {
ProjectName string
}

//Add channel to get Terraform output for current PR project
//Send output currently in buffer
func (j *LogStreamingController) addChan(pull string) chan string {
ch := make(chan string, 1000)
j.chanLock.Lock()
for _, line := range j.logBuffers[pull] {
ch <- line
}
if j.wsChans == nil {
j.wsChans = map[string]map[chan string]bool{}
}
if j.wsChans[pull] == nil {
j.wsChans[pull] = map[chan string]bool{}
}
j.wsChans[pull][ch] = true
j.chanLock.Unlock()
return ch
}

//Remove channel, so client no longer receives Terraform output
func (j *LogStreamingController) removeChan(pull string, ch chan string) {
j.chanLock.Lock()
delete(j.wsChans[pull], ch)
j.chanLock.Unlock()
}

//Add log line to buffer and send to all current channels
func (j *LogStreamingController) writeLogLine(pull string, line string) {
j.chanLock.Lock()
if j.logBuffers == nil {
j.logBuffers = map[string][]string{}
}
j.Logger.Info("Project info: %s, content: %s", pull, line)

for ch := range j.wsChans[pull] {
select {
case ch <- line:
default:
delete(j.wsChans[pull], ch)
}
}
if j.logBuffers[pull] == nil {
j.logBuffers[pull] = []string{}
}
j.logBuffers[pull] = append(j.logBuffers[pull], line)
j.chanLock.Unlock()
}

//Clear log lines in buffer
func (j *LogStreamingController) clearLogLines(pull string) {
j.chanLock.Lock()
delete(j.logBuffers, pull)
j.chanLock.Unlock()
}

func (j *LogStreamingController) Listen() {
for msg := range j.TerraformOutputChan {
j.Logger.Info("Recieving message %s", msg.Line)
if msg.ClearBuffBefore {
j.clearLogLines(msg.ProjectInfo)
}
j.writeLogLine(msg.ProjectInfo, msg.Line)
if msg.ClearBuffAfter {
j.clearLogLines(msg.ProjectInfo)
}
}
}

func (p *PullInfo) String() string {
return fmt.Sprintf("%s/%s/%d/%s", p.Org, p.Repo, p.Pull, p.ProjectName)
}
Expand Down Expand Up @@ -223,16 +126,21 @@ func (j *LogStreamingController) GetLogStreamWS(w http.ResponseWriter, r *http.R
defer c.Close()

pull := pullInfo.String()
ch := j.addChan(pull)
defer j.removeChan(pull, ch)

for msg := range ch {
j.Logger.Info(msg)
err = j.ProjectCommandOutputHandler.Receive(pull, func(msg string) error {
if err := c.WriteMessage(websocket.BinaryMessage, []byte(msg+"\r\n\t")); err != nil {
j.Logger.Warn("Failed to write ws message: %s", err)
return
return err
}
return nil
})

if err != nil {
j.Logger.Warn("Failed to receive message: %s", err)
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return
}

}

func (j *LogStreamingController) respond(w http.ResponseWriter, lvl logging.LogLevel, responseCode int, format string, args ...interface{}) {
Expand All @@ -243,7 +151,7 @@ func (j *LogStreamingController) respond(w http.ResponseWriter, lvl logging.LogL
}

//repo, pull num, project name moved to db
func (j *LogStreamingController) RetrievePrStatus(pullInfo *PullInfo) (bool, error) { //either implement new func in boltdb
func (j *LogStreamingController) RetrievePrStatus(pullInfo *PullInfo) (bool, error) {
pull := models.PullRequest{
Num: pullInfo.Pull,
BaseRepo: models.Repo{
Expand Down
31 changes: 7 additions & 24 deletions server/controllers/logstreaming_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/runatlantis/atlantis/server/controllers"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/logging"

. "github.com/petergtz/pegomock"
"github.com/runatlantis/atlantis/server/controllers/mocks"
"github.com/runatlantis/atlantis/server/controllers/mocks/matchers"
"github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/handlers/mocks/matchers"
)

func TestGetLogStream_WebSockets(t *testing.T) {
t.Run("Should Group by Project Info", func(t *testing.T) {
RegisterMockTestingT(t)
tempchan := make(chan *models.TerraformOutputLine)
websocketMock := mocks.NewMockWebsocketHandler()
projectOutputHandler := mocks.NewMockProjectCommandOutputHandler()
logger := logging.NewNoopLogger(t)
websocketWriterMock := mocks.NewMockWebsocketResponseWriter()
params := map[string]string{
Expand All @@ -33,29 +31,14 @@ func TestGetLogStream_WebSockets(t *testing.T) {
request = mux.SetURLVars(request, params)
response := httptest.NewRecorder()
logStreamingController := &controllers.LogStreamingController{
Logger: logger,
TerraformOutputChan: tempchan,
WebsocketHandler: websocketMock,
Logger: logger,
WebsocketHandler: websocketMock,
ProjectCommandOutputHandler: projectOutputHandler,
}

When(websocketMock.Upgrade(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest(), matchers.AnyHttpHeader())).ThenReturn(websocketWriterMock, nil)

go func() {
tempchan <- &models.TerraformOutputLine{
ProjectInfo: "test-org/test-repo/1/test-project",
Line: "Test Terraform Output",
}
}()

go func() {
logStreamingController.Listen()
}()

go func() {
logStreamingController.GetLogStreamWS(response, request)
}()

time.Sleep(1 * time.Second)
logStreamingController.GetLogStreamWS(response, request)

websocketWriterMock.VerifyWasCalled(Once())
})
Expand Down

This file was deleted.

Loading

0 comments on commit a44c966

Please sign in to comment.