Skip to content

Commit

Permalink
Merge pull request #144 from Iceber/stream_recv_channel
Browse files Browse the repository at this point in the history
First process the pending messages in recv channel
  • Loading branch information
fuweid authored May 9, 2023
2 parents 0ca69a9 + c51165f commit ac26f8c
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,23 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
}
defer c.deleteStream(s)

var msg *streamMessage
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ctx.Done():
return ErrClosed
case <-s.recvClose:
return s.recvErr
case msg := <-s.recv:
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
// If recv has a pending message, process that first
select {
case msg = <-s.recv:
default:
return s.recvErr
}
case msg = <-s.recv:
}

// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)

return err
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}

// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)

return err
}

0 comments on commit ac26f8c

Please sign in to comment.