Skip to content

Commit

Permalink
Make closeWithError() atomic
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Wang <henwang@amazon.com>
  • Loading branch information
henry118 committed Apr 28, 2023
1 parent 98b5f64 commit e212204
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type stream struct {
sender sender
recv chan *streamMessage

closeOnce sync.Once
recvErr error
mut sync.Mutex
recvErr error
}

func newStream(id streamID, send sender) *stream {
Expand All @@ -46,17 +46,17 @@ func newStream(id streamID, send sender) *stream {
}

func (s *stream) closeWithError(err error) error {
s.closeOnce.Do(func() {
if s.recv != nil {
close(s.recv)
if err != nil {
s.recvErr = err
} else {
s.recvErr = ErrClosed
}
s.mut.Lock()
defer s.mut.Unlock()

if s.recvErr == nil && s.recv != nil {
close(s.recv)
if err != nil {
s.recvErr = err
} else {
s.recvErr = ErrClosed
}
})
}
return nil
}

Expand All @@ -65,6 +65,9 @@ func (s *stream) send(mt messageType, flags uint8, b []byte) error {
}

func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
s.mut.Lock()
defer s.mut.Unlock()

if s.recvErr != nil {
return s.recvErr
}
Expand Down

0 comments on commit e212204

Please sign in to comment.