Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'Client.Endpoint()' returns 'context canceled' while reading the Body #773

Closed
xescugc opened this issue Oct 9, 2018 · 5 comments · Fixed by #776
Closed

'Client.Endpoint()' returns 'context canceled' while reading the Body #773

xescugc opened this issue Oct 9, 2018 · 5 comments · Fixed by #776
Labels

Comments

@xescugc
Copy link
Contributor

xescugc commented Oct 9, 2018

Hello! I have a question which IDK if it's a missus on my part and there is another way to do it.

I'm writing a client which consumes a MS that returns binary data. I'm using the kithttp.BufferedStream(true) while NewClient to make sure the body is not closed so I can read it when I want and not have it buffered.

This works fine with my implementation of the decode* that I use on the NewClient except if the body is longer than 3966 which then stops reading from the body.

This is my decode*:

func decode(_ context.Context, r *http.Response) (interface{}, error) {
        pr, pw := io.Pipe()

        go func() {
                defer r.Body.Close()
                defer pw.Close()
                i, err := io.Copy(pw, r.Body)
                fmt.Println(i, err)
        }()
        // IOR is a io.Reader
        return response{IOR: pr}, nil
}

The intention is to only read from the r.Body when I need it, that's why I'm using a io.Pipe, and also because I have to r.Body.Close() when it ends (if not, just passing the r.Body to the response.IOR would work, but then the r.Body wold not closed, no?).

This code works for "small" r.Body but when the it's bigger than 3966 the io.Copy stops and the output form the print is 3966 context canceled. So basically the

defer cancel()
is called and the body lost.

Maybe I'm overcomplicating it using the io.Pipe but I thought it was the best solution for what I had in mind.

Maybe the defer cancel() should be called once the r.Body.Close() is called as it's I have the BufferedStream option no? IDK if that's even possible haha.

Thanks!

@xescugc
Copy link
Contributor Author

xescugc commented Oct 9, 2018

So I've refactored the code to maybe be more clean, and instead of using a io.Reader as type I'm using a io.ReadCloser so I can remove all this logic with io.Pipe (which at the end was of no use as with the r.Body I can do the same) and added the corresponding logic to the code to Close the io.ReadCloser.

But now the error is random 👍 . I know that it's due to that defer cancel() (if I comment it all tests pass).

I would like to know how to read from a r.Body when I want (so not read it all and buffer it) without being cancelled by the context.

Ideally the BufferedStream(true) prevents the body from being closed (yes it does), but then when the cancel() is called the Request is cancelled and the body also cancelled, so not ideal.

That's what I think it's happening, but I'm not 100% sure.

@xescugc
Copy link
Contributor Author

xescugc commented Oct 12, 2018

Ok, so I had some reading on the topic https://medium.com/the-software-firehose/canceling-requests-in-go-using-context-b4b28a118da8 and https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ and I think that my assumptions where correct, if the http.Request is cancelled the http.Response.Body is closed too.

So then I tried to reproduce it on the go-kit test to make sure that It was, in fact, something not related to my code (which could be too 😄) and I did achieve to reproduce it on the test for BufferedStream basically adding a big testbody https://github.com/go-kit/kit/blob/master/transport/http/client_test.go#L102 (like len(testbody) == 8400) and a time.Sleep(1 *time.Second) (to simulate work here https://github.com/go-kit/kit/blob/master/transport/http/client_test.go#L133) the output was:

--- FAIL: TestHTTPClientBufferedStream (1.00s)
    client_test.go:139: want "EOF", have "context canceled"
FAIL

And some times (IDK why If I change the sleep time or something it switches from one to the other):

--- FAIL: TestHTTPClientBufferedStream (1.00s)
    client_test.go:144: want "EOF", have "read tcp 127.0.0.1:47002->127.0.0.1:34551: use of closed network connection"                                                                                             
FAIL

Removing the logic for cancelling (just for testing purposes) it returns another error:

--- FAIL: TestHTTPClientBufferedStream (1.00s)
    client_test.go:139: want "EOF", have %!q(<nil>)
FAIL

Which makes sense with the io.Reader documentation:

[...] An instance of this general case is that a Reader returning a non-zero number of bytes at the end of the input stream may return either err == EOF or err == nil. The next Read should return 0, EOF. [...]

And if I repeat https://github.com/go-kit/kit/blob/master/transport/http/client_test.go#L135 again it works (the read bytes of the first one is all of them 8400 <nil>, and the second on is 0 EOF so as expected).

So now that I know that this is an error, I tried to find a solution for it. First of all I did try to find a way to know when the Response.Body was closed, so then we can call the cancel(), as with the BufferedStream we left the Response.Body open and it has to be closed by the user, or the context. So I find no way to do it and the the solution would be to wrap the Body with a custom Close() that when it's call it calls the Request.Body.Close() and also cancel().

The idea would be something like this:

type bodyWithCancel struct {
        io.ReadCloser

        cancel context.CancelFunc
}

func (bwc bodyWithCancel) Close() error {
        bwc.ReadCloser.Close()
        bwc.cancel()
        return nil
}

In terms of use this would mean that when the BufferedStream is used, the user should call Close() to the Body, which should not be that bad as with this option you are taking care of the Body already.

In terms of code it would mean that on the Client.Endpoint we would have to call cancel() manually on all the possible errors (which is not nice), and when BufferedStream is validated, if it's true then we'll replace the Response.Body by:

if c.bufferedStream {
  resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel}
} else {
  defer resp.Body.Close()
}

I'm commenting all this, because IDK if it may have potential side effects that I'm not aware of, or if it's a good solution (or wanted solution). So I prefer to comment it before opening a PR. Maybe there is another solution for the problem but I did not see any other 😢

@xescugc
Copy link
Contributor Author

xescugc commented Oct 18, 2018

@peterbourgon WDYT about this? Does this make sense?

@peterbourgon
Copy link
Member

It makes rough sense to me when typed out. Can you make a proposed PR?

@xescugc
Copy link
Contributor Author

xescugc commented Oct 18, 2018

Yes ofc! It was to know if It was ok to continue with it (open PR) or I had something wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants