diff --git a/transport/http/client.go b/transport/http/client.go index eca566300..4cd8f27a8 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "encoding/xml" + "io" "io/ioutil" "net/http" "net/url" @@ -84,6 +85,7 @@ func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { // BufferedStream sets whether the Response.Body is left open, allowing it // to be read from later. Useful for transporting a file as a buffered stream. +// That body has to be Closed to propery end the request. func BufferedStream(buffered bool) ClientOption { return func(c *Client) { c.bufferedStream = buffered } } @@ -92,7 +94,6 @@ func BufferedStream(buffered bool) ClientOption { func (c Client) Endpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() var ( resp *http.Response @@ -112,10 +113,12 @@ func (c Client) Endpoint() endpoint.Endpoint { req, err := http.NewRequest(c.method, c.tgt.String(), nil) if err != nil { + cancel() return nil, err } if err = c.enc(ctx, req, request); err != nil { + cancel() return nil, err } @@ -126,11 +129,17 @@ func (c Client) Endpoint() endpoint.Endpoint { resp, err = c.client.Do(req.WithContext(ctx)) if err != nil { + cancel() return nil, err } - if !c.bufferedStream { + // If we expect a buffered stream, we don't cancel the context when the endpoint returns. + // Instead, we should call the cancel func when closing the response body. + if c.bufferedStream { + resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel} + } else { defer resp.Body.Close() + defer cancel() } for _, f := range c.after { @@ -146,6 +155,20 @@ func (c Client) Endpoint() endpoint.Endpoint { } } +// bodyWithCancel is a wrapper for an io.ReadCloser with also a +// cancel function which is called when the Close is used +type bodyWithCancel struct { + io.ReadCloser + + cancel context.CancelFunc +} + +func (bwc bodyWithCancel) Close() error { + bwc.ReadCloser.Close() + bwc.cancel() + return nil +} + // ClientFinalizerFunc can be used to perform work at the end of a client HTTP // request, after the response is returned. The principal // intended use is for error logging. Additional response parameters are diff --git a/transport/http/client_test.go b/transport/http/client_test.go index e31d201cc..9ec1a6caf 100644 --- a/transport/http/client_test.go +++ b/transport/http/client_test.go @@ -98,8 +98,12 @@ func TestHTTPClient(t *testing.T) { } func TestHTTPClientBufferedStream(t *testing.T) { + // bodysize has a size big enought to make the resopnse.Body not an instant read + // so if the response is cancelled it wount be all readed and the test would fail + // The 6000 has not a particular meaning, it big enough to fulfill the usecase. + const bodysize = 6000 var ( - testbody = "testbody" + testbody = string(make([]byte, bodysize)) encode = func(context.Context, *http.Request, interface{}) error { return nil } decode = func(_ context.Context, r *http.Response) (interface{}, error) { return TestResponse{r.Body, ""}, nil @@ -129,6 +133,9 @@ func TestHTTPClientBufferedStream(t *testing.T) { if !ok { t.Fatal("response should be TestResponse") } + defer response.Body.Close() + // Faking work + time.Sleep(time.Second * 1) // Check that response body was NOT closed b := make([]byte, len(testbody))