Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update log handler to close buffered channels when an operation is complete #170

Merged
merged 21 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 4 additions & 27 deletions server/controllers/websocket/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin
return errors.Wrap(err, "upgrading websocket connection")
}

conn.SetCloseHandler(func(code int, text string) error {
// Close the channnel after websocket connection closed.
// Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup.
// is it good practice to close at the receiver? Probably not, we should figure out a better
// way to handle this case
close(input)
return nil
})

// Add a reader goroutine to listen for socket.close() events.
go w.setReadHandler(conn)

// block on reading our input channel
for msg := range input {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
Expand All @@ -51,20 +39,9 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin
}
}

return nil
}

func (w *Writer) setReadHandler(c *websocket.Conn) {
for {
_, _, err := c.ReadMessage()
if err != nil {
// CloseGoingAway (1001) when a browser tab is closed.
// Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
w.log.Warn("Failed to read WS message: %s", err)
}
return
}
// close ws conn after input channel is closed
if err = conn.Close(); err != nil {
w.log.Warn("Failed to close ws connection: %s", err)
}

return nil
}
4 changes: 2 additions & 2 deletions server/core/terraform/async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message)
c.projectCmdOutputHandler.Send(ctx, message, false)
}
wg.Done()
}()
Expand All @@ -102,7 +102,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message)
c.projectCmdOutputHandler.Send(ctx, message, false)
}
wg.Done()
}()
Expand Down
15 changes: 13 additions & 2 deletions server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,22 @@ type ProjectOutputWrapper struct {
func (p *ProjectOutputWrapper) Plan(ctx models.ProjectCommandContext) models.ProjectResult {
// Reset the buffer when running the plan. We only need to do this for plan,
// apply is a continuation of the same workflow
return p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan)
result := p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan)
p.updateOperationStatus(ctx, result)
return result
}

func (p *ProjectOutputWrapper) Apply(ctx models.ProjectCommandContext) models.ProjectResult {
return p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply)
result := p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply)
p.updateOperationStatus(ctx, result)
return result
}

func (p *ProjectOutputWrapper) updateOperationStatus(ctx models.ProjectCommandContext, result models.ProjectResult) {
// No need to mark operation complete if failed operation.
if result.Failure == "" {
Aayyush marked this conversation as resolved.
Show resolved Hide resolved
p.ProjectCmdOutputHandler.Send(ctx, "", true)
}
}

func (p *ProjectOutputWrapper) updateProjectPRStatus(commandName models.CommandName, ctx models.ProjectCommandContext, execute func(ctx models.ProjectCommandContext) models.ProjectResult) models.ProjectResult {
Expand Down
2 changes: 1 addition & 1 deletion server/events/pull_closed_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestCleanUpLogStreaming(t *testing.T) {
}

go prjCmdOutHandler.Handle()
prjCmdOutHandler.Send(ctx, "Test Message")
prjCmdOutHandler.Send(ctx, "Test Message", false)

// Create boltdb and add pull request.
var lockBucket = "bucket"
Expand Down
20 changes: 12 additions & 8 deletions server/handlers/mocks/mock_project_command_output_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 64 additions & 13 deletions server/handlers/project_command_output_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/runatlantis/atlantis/server/logging"
)

type OutputBuffer struct {
OperationComplete bool
Buffer []string
}

type PullContext struct {
PullNum int
Repo string
Expand All @@ -24,15 +29,16 @@ type ProjectCmdOutputLine struct {

JobContext JobContext

Line string
Line string
OperationComplete bool
}

// AsyncProjectCommandOutputHandler is a handler to transport terraform client
// outputs to the front end.
type AsyncProjectCommandOutputHandler struct {
projectCmdOutput chan *ProjectCmdOutputLine

projectOutputBuffers map[string][]string
projectOutputBuffers map[string]OutputBuffer
projectOutputBuffersLock sync.RWMutex

receiverBuffers map[string]map[chan string]bool
Expand Down Expand Up @@ -66,7 +72,7 @@ type ProjectStatusUpdater interface {

type ProjectCommandOutputHandler interface {
// Send will enqueue the msg and wait for Handle() to receive the message.
Send(ctx models.ProjectCommandContext, msg string)
Send(ctx models.ProjectCommandContext, msg string, operationComplete bool)

// Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting
// to read the channel in the same goroutine
Expand Down Expand Up @@ -103,12 +109,12 @@ func NewAsyncProjectCommandOutputHandler(
receiverBuffers: map[string]map[chan string]bool{},
projectStatusUpdater: projectStatusUpdater,
projectJobURLGenerator: projectJobURLGenerator,
projectOutputBuffers: map[string][]string{},
projectOutputBuffers: map[string]OutputBuffer{},
pullToJobMapping: sync.Map{},
}
}

func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) {
func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) {
p.projectCmdOutput <- &ProjectCmdOutputLine{
JobID: ctx.JobID,
JobContext: JobContext{
Expand All @@ -120,7 +126,8 @@ func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext
Workspace: ctx.Workspace,
},
},
Line: msg,
Line: msg,
OperationComplete: operationComplete,
}
}

Expand All @@ -130,6 +137,12 @@ func (p *AsyncProjectCommandOutputHandler) Register(jobID string, receiver chan

func (p *AsyncProjectCommandOutputHandler) Handle() {
for msg := range p.projectCmdOutput {
if msg.OperationComplete {
p.updateOutputBufferOperationStatus(msg.JobID)
p.closeChannelsForJob(msg.JobID)
Aayyush marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// Add job to pullToJob mapping
if _, ok := p.pullToJobMapping.Load(msg.JobContext.PullContext); !ok {
p.pullToJobMapping.Store(msg.JobContext.PullContext, map[string]bool{})
Expand All @@ -142,6 +155,32 @@ func (p *AsyncProjectCommandOutputHandler) Handle() {
}
}

// Sets the OperationComplete to true for the job
func (p *AsyncProjectCommandOutputHandler) updateOutputBufferOperationStatus(jobID string) {
Aayyush marked this conversation as resolved.
Show resolved Hide resolved
p.projectOutputBuffersLock.Lock()
defer func() {
p.projectOutputBuffersLock.Unlock()
}()

if outputBuffer, ok := p.projectOutputBuffers[jobID]; ok {
outputBuffer.OperationComplete = true
p.projectOutputBuffers[jobID] = outputBuffer
}
}

// Closes all the buffered channels for the job
func (p *AsyncProjectCommandOutputHandler) closeChannelsForJob(jobID string) {
p.receiverBuffersLock.Lock()
defer func() {
p.receiverBuffersLock.Unlock()
}()
if openChannels, ok := p.receiverBuffers[jobID]; ok {
for ch := range openChannels {
close(ch)
}
}
}

func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error {
url, err := p.projectJobURLGenerator.GenerateProjectJobURL(ctx)

Expand All @@ -153,13 +192,19 @@ func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.Projec

func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, jobID string) {
p.projectOutputBuffersLock.RLock()
buffer := p.projectOutputBuffers[jobID]
outputBuffer := p.projectOutputBuffers[jobID]
p.projectOutputBuffersLock.RUnlock()

for _, line := range buffer {
for _, line := range outputBuffer.Buffer {
ch <- line
}

// No need register receiver since all the logs have been streamed
if outputBuffer.OperationComplete {
close(ch)
return
}

// add the channel to our registry after we backfill the contents of the buffer,
// to prevent new messages coming in interleaving with this backfill.
p.receiverBuffersLock.Lock()
Expand Down Expand Up @@ -189,10 +234,16 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line strin
p.receiverBuffersLock.Unlock()

p.projectOutputBuffersLock.Lock()
if p.projectOutputBuffers[jobID] == nil {
p.projectOutputBuffers[jobID] = []string{}
if _, ok := p.projectOutputBuffers[jobID]; !ok {
p.projectOutputBuffers[jobID] = OutputBuffer{
OperationComplete: false,
Aayyush marked this conversation as resolved.
Show resolved Hide resolved
Buffer: []string{},
}
}
p.projectOutputBuffers[jobID] = append(p.projectOutputBuffers[jobID], line)
outputBuffer := p.projectOutputBuffers[jobID]
outputBuffer.Buffer = append(outputBuffer.Buffer, line)
p.projectOutputBuffers[jobID] = outputBuffer

p.projectOutputBuffersLock.Unlock()
}

Expand All @@ -208,7 +259,7 @@ func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(jobID string
return p.receiverBuffers[jobID]
}

func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) []string {
func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) OutputBuffer {
return p.projectOutputBuffers[jobID]
}

Expand Down Expand Up @@ -243,7 +294,7 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pullContext PullContext) {
// NoopProjectOutputHandler is a mock that doesn't do anything
type NoopProjectOutputHandler struct{}

func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) {
func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string, isOperationComplete bool) {
}

func (p *NoopProjectOutputHandler) Register(jobID string, receiver chan string) {}
Expand Down
Loading