Skip to content

Commit

Permalink
Merge pull request #168 from kevpar/deadlock
Browse files Browse the repository at this point in the history
client: Fix deadlock when writing to pipe blocks
  • Loading branch information
kevpar authored May 13, 2024
2 parents aa5f2d4 + 1b4f6f8 commit ef57342
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,25 +386,44 @@ func (c *Client) receiveLoop() error {
// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
c.streamLock.Lock()
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
// This ensures that new stream IDs sent on the wire are always increasing, which is a
// requirement of the TTRPC protocol.
// This use of sendLock could be split into another mutex that covers stream creation + first send,
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
c.sendLock.Lock()
defer c.sendLock.Unlock()

// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
c.streamLock.Unlock()
return nil, ErrClosed
default:
}

// Stream ID should be allocated at same time
s := newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
var s *stream
if err := func() error {
// In the future this could be replaced with a sync.Map instead of streamLock+map.
c.streamLock.Lock()
defer c.streamLock.Unlock()

c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
return ErrClosed
default:
}

s = newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2

return nil
}(); err != nil {
return nil, err
}

if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
return s, filterCloseErr(err)
Expand Down

0 comments on commit ef57342

Please sign in to comment.