Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(logs) Buffer logs to prevent requests from timing out #659

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

### To be Released

* feat(integration-link-create): better catch 401 from SCM API
* bump github.com/fatih/color from 1.10.0 to 1.11.0
* fix(logs) Buffer logs to prevent requests from timing out [#659](https://github.com/Scalingo/cli/pull/659)
* feat(integration-link-create): better catch 401 from SCM API [#651](https://github.com/Scalingo/cli/pull/651)
* bump github.com/fatih/color from 1.10.0 to 1.11.0 [#654](https://github.com/Scalingo/cli/pull/654)

### 1.20.1

Expand Down
64 changes: 59 additions & 5 deletions logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"regexp"
"strings"
"sync"
"time"
"unicode"

Expand All @@ -22,6 +23,10 @@ import (
errgo "gopkg.in/errgo.v1"
)

const (
logsMaxBufferSize = 150000 // Size of the buffer when querying logs (in lines)
)

type WSEvent struct {
Type string `json:"event"`
Log string `json:"log"`
Expand All @@ -46,17 +51,66 @@ func Dump(logsURL string, n int, filter string) error {
return nil
}

// Create a buffered channel with a maximum size of the number of log lines
// requested. On medium to good internet connection, we are fetching lines
// faster than we can process them. This buffer is here to get the logs as
// fast as possible since the request will time out after 30s.
buffSize := n
if buffSize > logsMaxBufferSize { // Cap the size of the buffer (to prevent high memory allocation when user specify n=1_000_000)
buffSize = logsMaxBufferSize
}

// This buffered channel will be used as a buffer between the network
// connection and our logs processing pipeline.
buff := make(chan string, buffSize)
// This waitgroup is used to ensure that the logs processing pipeline is
// finished before exiting the method.
wg := &sync.WaitGroup{}

// Start a goroutine that will read from buffered channel and send those
// lines to the logs processing pipeline.
wg.Add(1)
go func() {
defer wg.Done()
for bline := range buff {
colorizeLogs(string(bline))
}
}()

// Ensure that all lines are printed out before exiting this method.
defer wg.Wait()

// Here we used bufio to read from the response because we want to easily
// split response in lines.
// Note: This can look like a duplicate measure with our buffered channel
// (buff) however it's not. The reason is that ReadBytes will fill the sr
// Reader only if the internal buffer is empty. This means that the first
// ReadBytes will fetch 4MB of data from the connection. Then it will use
// this internal buffer until it runs out. However if our logs processing
// pipeline is slow, it will never query the next 4MB of data. Hence the
// buffered channel.
sr := bufio.NewReader(res.Body)

for {
// Read one line from the response
bline, err := sr.ReadBytes('\n')

if err != nil {
break
}
// If there was an error, we will exit, so we can close the buffered
// channel and let the goroutine finish its work.
close(buff)

colorizeLogs(string(bline))
if err == stdio.EOF {
// If the error is EOF, it means that we successfully read all of the
// response body
return nil
}
// Otherwise there was an error: return it
return errgo.Notef(err, "fail to read logs")
}
// Send the line to the buffer
buff <- string(bline)
}

return nil
}

func Stream(logsRawURL string, filter string) error {
Expand Down