Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in log framer only affecting Windows #3608

Merged
merged 5 commits into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ BUG FIXES:
* cli: Fix panic when running `keyring` commands [GH-3509]
* client: Fix a panic when restoring an allocation with a dead leader task
[GH-3502]
* client: Fix crash when following logs from a Windows node [GH-3608]
* client: Fix allocation accounting in GC and trigger GCs on allocation
updates [GH-3445]
* driver/rkt: Remove pods on shutdown [GH-3562]
Expand Down
84 changes: 57 additions & 27 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ type StreamFramer struct {
// Captures whether the framer is running and any error that occurred to
// cause it to stop.
running bool
Err error
err error
}

// NewStreamFramer creates a new stream framer that will output StreamFrames to
Expand Down Expand Up @@ -379,16 +379,23 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}

// Err returns the error that caused the StreamFramer to exit
func (s *StreamFramer) Err() error {
s.l.Lock()
defer s.l.Unlock()
return s.err
}

// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
var err error
defer func() {
close(s.exitCh)
s.l.Lock()
s.running = false
s.Err = err
s.err = err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be set using a SetErr() method that has guards against concurrent access?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The appropriate lock is acquired on L394 and this should be the only place that sets s.err so I don't want to add a helper to encourage other uses.

s.l.Unlock()
close(s.exitCh)
}()

OUTER:
Expand Down Expand Up @@ -466,8 +473,8 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.Err != nil {
return s.Err
if s.err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be s.Err()? Otherwise add a comment about why this can be accessed directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for line 477

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method holds the appropriate lock (L470)

return s.err
}

return fmt.Errorf("StreamFramer not running")
Expand Down Expand Up @@ -608,6 +615,35 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
return nil, nil
}

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
func parseFramerErr(err error) error {
if err == nil {
return nil
}

errMsg := err.Error()

if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

// Windows version of ECONNRESET
//XXX(schmichael) I could find no existing error or constant to
// compare this against.
if strings.Contains(errMsg, "forcibly closed") {
return syscall.EPIPE
}

return err
}

// stream is the internal method to stream the content of a file. eofCancelCh is
// used to cancel the stream if triggered while at EOF. If the connection is
// broken an EPIPE error is returned
Expand All @@ -629,26 +665,6 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done()
}()

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
parseFramerErr := func(e error) error {
if e == nil {
return nil
}

if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

return err
}

// Create a variable to allow setting the last event
var lastEvent string

Expand Down Expand Up @@ -721,7 +737,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err)
return parseFramerErr(framer.Err())
case err, ok := <-eofCancelCh:
if !ok {
return nil
Expand Down Expand Up @@ -916,7 +932,21 @@ func (s *HTTPServer) logs(follow, plain bool, offset int64,
return nil
}

//Since we successfully streamed, update the overall offset/idx.
// defensively check to make sure StreamFramer hasn't stopped
// running to avoid tight loops with goroutine leaks as in
// #3342
select {
case <-framer.ExitCh():
err := parseFramerErr(framer.Err())
if err == syscall.EPIPE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic be in parseFramerErr instead, since it already has logic related to translating errors? I.e, does the caller need to do the translation of syscall.EPIPE -> nil error directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it returns the error from parseFramerErr in line 740, but has this extra logic here where it turns that to nil. Good question though - why do we treat the error two different ways?

Copy link
Contributor

@preetapan preetapan Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this again, I am not convinced this block is necessary (saw your comment about defensive code). But if its here, I would make it return the value of parseFramerErr rather than nil to make handling exit channel events identical in both cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes more sense in the actual code and not in the code review I promise. :) The code review cuts out the code 10 lines up that follow similar return behavior when handling the error from s.stream()

The error returned from s.stream() is already passed through parseFramerErr(), then line 924 converts EPIPE to nil in this func.

So this defensive code converts the framer error with the helper func and then follows the same if err == EPIPE { return nil } behavior as above.

Sorry the code is obtuse but the good news is that it only determines whether or not an error is logged when a client disconnects (EPIPE errors are not logged as they're expected).

The important thing in this defensive code is that we return from this infinite for{} loop if the framer has exited. The error code only matters for logging.

// EPIPE just means the connection was closed
return nil
}
return err
default:
}

// Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1
}
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/hpcloud/tail/watch/filechanges.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion vendor/github.com/hpcloud/tail/watch/inotify.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 37 additions & 49 deletions vendor/github.com/hpcloud/tail/watch/inotify_tracker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions vendor/golang.org/x/sys/unix/dev_darwin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading