diff --git a/client.go b/client.go index b73116b4a..4b1e1e709 100644 --- a/client.go +++ b/client.go @@ -483,23 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err } defer c.deleteStream(s) + var msg *streamMessage select { case <-ctx.Done(): return ctx.Err() case <-c.ctx.Done(): return ErrClosed case <-s.recvClose: - return s.recvErr - case msg := <-s.recv: - if msg.header.Type == messageTypeResponse { - err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) - } else { - err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + // If recv has a pending message, process that first + select { + case msg = <-s.recv: + default: + return s.recvErr } + case msg = <-s.recv: + } - // return the payload buffer for reuse - c.channel.putmbuf(msg.payload) - - return err + if msg.header.Type == messageTypeResponse { + err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) + } else { + err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) } + + // return the payload buffer for reuse + c.channel.putmbuf(msg.payload) + + return err }