-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add support for sending error codes on session close #121
base: master
Are you sure you want to change the base?
Changes from 10 commits
e7338b0
d8cf4e7
4b262c0
ea5605b
8adb9a8
f56b1c3
9190b78
5727def
43cd707
ede18a5
3eaea39
39abe7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,9 +46,9 @@ var nullMemoryManager = &nullMemoryManagerImpl{} | |
type Session struct { | ||
rtt int64 // to be accessed atomically, in nanoseconds | ||
|
||
// remoteGoAway indicates the remote side does | ||
// remoteGoAwayNormal indicates the remote side does | ||
// not want futher connections. Must be first for alignment. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What depends on this alignment? Apparently nothing as this has not been first for years. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing. I don't need the field either. I have removed it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the present semantics are, if you call So far as I know, we don't use it anywhere in go-libp2p. Since this will be a major version upgrade, we can change the semantics to make GoAway reset all the streams as a |
||
remoteGoAway int32 | ||
remoteGoAwayNormal int32 | ||
|
||
// localGoAway indicates that we should stop | ||
// accepting futher connections. Must be first for alignment. | ||
|
@@ -102,6 +102,8 @@ type Session struct { | |
// recvDoneCh is closed when recv() exits to avoid a race | ||
// between stream registration and stream shutdown | ||
recvDoneCh chan struct{} | ||
// recvErr is the error the receive loop ended with | ||
recvErr error | ||
|
||
// sendDoneCh is closed when send() exits to avoid a race | ||
// between returning from a Stream.Write and exiting from the send loop | ||
|
@@ -203,8 +205,8 @@ func (s *Session) OpenStream(ctx context.Context) (*Stream, error) { | |
if s.IsClosed() { | ||
return nil, s.shutdownErr | ||
} | ||
if atomic.LoadInt32(&s.remoteGoAway) == 1 { | ||
return nil, ErrRemoteGoAway | ||
if atomic.LoadInt32(&s.remoteGoAwayNormal) == 1 { | ||
return nil, ErrRemoteGoAwayNormal | ||
} | ||
|
||
// Block if we have too many inflight SYNs | ||
|
@@ -283,9 +285,23 @@ func (s *Session) AcceptStream() (*Stream, error) { | |
} | ||
} | ||
|
||
// Close is used to close the session and all streams. | ||
// Attempts to send a GoAway before closing the connection. | ||
// Close is used to close the session and all streams. It doesn't send a GoAway before | ||
// closing the connection. | ||
func (s *Session) Close() error { | ||
return s.close(ErrSessionShutdown, false, goAwayNormal) | ||
} | ||
|
||
// CloseWithError is used to close the session and all streams after sending a GoAway message with errCode. | ||
// Blocks for ConnectionWriteTimeout to write the GoAway message. | ||
// | ||
// The GoAway may not actually be sent depending on the semantics of the underlying net.Conn. | ||
// For TCP connections, it may be dropped depending on LINGER value or if there's unread data in the kernel | ||
// receive buffer. | ||
func (s *Session) CloseWithError(errCode uint32) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we updated the connection manager to be able to deal with potential blocking here? Also, we should probably document it. |
||
return s.close(&GoAwayError{Remote: false, ErrorCode: errCode}, true, errCode) | ||
} | ||
|
||
func (s *Session) close(shutdownErr error, sendGoAway bool, errCode uint32) error { | ||
s.shutdownLock.Lock() | ||
defer s.shutdownLock.Unlock() | ||
|
||
|
@@ -294,35 +310,38 @@ func (s *Session) Close() error { | |
} | ||
s.shutdown = true | ||
if s.shutdownErr == nil { | ||
s.shutdownErr = ErrSessionShutdown | ||
s.shutdownErr = shutdownErr | ||
} | ||
close(s.shutdownCh) | ||
s.conn.Close() | ||
s.stopKeepalive() | ||
<-s.recvDoneCh | ||
|
||
// Only send GoAway if we have an error code. | ||
if sendGoAway && errCode != goAwayNormal { | ||
// wait for write loop to exit | ||
// We need to write the current frame completely before sending a goaway. | ||
// This will wait for at most s.config.ConnectionWriteTimeout | ||
<-s.sendDoneCh | ||
ga := s.goAway(errCode) | ||
if err := s.conn.SetWriteDeadline(time.Now().Add(goAwayWaitTime)); err == nil { | ||
_, _ = s.conn.Write(ga[:]) // there's nothing we can do on error here | ||
} | ||
s.conn.SetWriteDeadline(time.Time{}) | ||
} | ||
|
||
s.conn.Close() | ||
<-s.sendDoneCh | ||
<-s.recvDoneCh | ||
|
||
s.streamLock.Lock() | ||
defer s.streamLock.Unlock() | ||
for id, stream := range s.streams { | ||
stream.forceClose() | ||
stream.forceClose(s.shutdownErr) | ||
delete(s.streams, id) | ||
stream.memorySpan.Done() | ||
} | ||
return nil | ||
} | ||
|
||
// exitErr is used to handle an error that is causing the | ||
// session to terminate. | ||
func (s *Session) exitErr(err error) { | ||
s.shutdownLock.Lock() | ||
if s.shutdownErr == nil { | ||
s.shutdownErr = err | ||
} | ||
s.shutdownLock.Unlock() | ||
s.Close() | ||
} | ||
|
||
// GoAway can be used to prevent accepting further | ||
// connections. It does not close the underlying conn. | ||
func (s *Session) GoAway() error { | ||
|
@@ -451,7 +470,7 @@ func (s *Session) startKeepalive() { | |
|
||
if err != nil { | ||
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) | ||
s.exitErr(ErrKeepAliveTimeout) | ||
s.close(ErrKeepAliveTimeout, false, 0) | ||
} | ||
}) | ||
} | ||
|
@@ -516,7 +535,19 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) err | |
// send is a long running goroutine that sends data | ||
func (s *Session) send() { | ||
if err := s.sendLoop(); err != nil { | ||
s.exitErr(err) | ||
// Prefer the recvLoop error over the sendLoop error. The receive loop might have the error code | ||
// received in a GoAway frame received just before the TCP RST that closed the sendLoop | ||
s.shutdownLock.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment as to why you are holding the shutdownLock around this section. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some comment, can you review once more? |
||
if s.shutdownErr == nil { | ||
s.conn.Close() | ||
<-s.recvDoneCh | ||
if _, ok := s.recvErr.(*GoAwayError); ok { | ||
err = s.recvErr | ||
} | ||
s.shutdownErr = err | ||
} | ||
s.shutdownLock.Unlock() | ||
s.close(err, false, 0) | ||
} | ||
} | ||
|
||
|
@@ -644,7 +675,7 @@ func (s *Session) sendLoop() (err error) { | |
// recv is a long running goroutine that accepts new data | ||
func (s *Session) recv() { | ||
if err := s.recvLoop(); err != nil { | ||
s.exitErr(err) | ||
s.close(err, false, 0) | ||
} | ||
} | ||
|
||
|
@@ -666,7 +697,10 @@ func (s *Session) recvLoop() (err error) { | |
err = fmt.Errorf("panic in yamux receive loop: %s", rerr) | ||
} | ||
}() | ||
defer close(s.recvDoneCh) | ||
defer func() { | ||
s.recvErr = err | ||
close(s.recvDoneCh) | ||
}() | ||
var hdr header | ||
for { | ||
// fmt.Printf("ReadFull from %#v\n", s.reader) | ||
|
@@ -781,18 +815,18 @@ func (s *Session) handleGoAway(hdr header) error { | |
code := hdr.Length() | ||
switch code { | ||
case goAwayNormal: | ||
atomic.SwapInt32(&s.remoteGoAway, 1) | ||
atomic.SwapInt32(&s.remoteGoAwayNormal, 1) | ||
// Don't close connection on normal go away. Let the existing streams | ||
// complete gracefully. | ||
return nil | ||
case goAwayProtoErr: | ||
s.logger.Printf("[ERR] yamux: received protocol error go away") | ||
return fmt.Errorf("yamux protocol error") | ||
case goAwayInternalErr: | ||
s.logger.Printf("[ERR] yamux: received internal error go away") | ||
return fmt.Errorf("remote yamux internal error") | ||
default: | ||
s.logger.Printf("[ERR] yamux: received unexpected go away") | ||
return fmt.Errorf("unexpected go away received") | ||
s.logger.Printf("[ERR] yamux: received go away with error code: %d", code) | ||
} | ||
return nil | ||
return &GoAwayError{Remote: true, ErrorCode: code} | ||
} | ||
|
||
// incomingStream is used to create a new incoming stream | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some text to explain what "normal" is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means a goAway frame was received with error code
goAwayNormal
. I changed the name back to the original to avoid changing a Public name.