-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add bidirectional streaming support by running Send() and Recv() concurrently #170
Conversation
return nil, metadata, err | ||
} | ||
metadata.HeaderMD = header | ||
return stream, metadata, <-sendErrs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- it blocks forever unless you close
sendErrs
? - it still blocks until the end of the goroutine even if you close
sendErrs
. So it is not interleaving?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my intent was to block until the message is sent by the client as a sort of middle ground between just ignoring errors and interrupting the stream on any error. This was written mid-migraine so I'm happy to admit it's not as thought out as it could be. Open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I overlooked L260-L263. So actually it does not block forever.
I have two more questions.
- Maybe I'm missing something, but it does not seem to be something like "interleave" for me. What do you mean by "interleaving
Send
andRecv
"? - If it is intended to block until the message is sent, why did you need to run the function in a goroutine?
IIUC, the only difference made by the goroutine is that the goroutine function runs in parallel tostream.Header()
.
But I don't think the difference makes much sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the many messages situation (e.g. rpc Chat(stream ChatMessage) returns (stream ChatMessage)
). Before this change all Send()s have to run before Recv() gets called. by putting Send() in a goroutine concurrent Send() and Recv() calls can be made.
@yugui do you need further clarification? To restate, before this change a streaming handler did not start a goroutine and ran all Send()s before returning and having the caller call Recv(). This proposal is a middle ground of blocking until the first Send() to aid in surfacing issues with initial payloads. Perhaps setting errs to nil would be a nicer way to express the semantics of block until first execution. I'm open to suggestion here. |
I don't think it was changed with this PR because So in my understanding, the change made by this PR was
I think your intention
makes sense. But if my understanding above was correct, you didn't need the new goroutine. You could do the same thing in the same goroutine as caller. |
How would you accomplish this without a separate goroutine?
|
Did you miss that the first Send if successful will send nil and stop blocking?
|
Yes, I did. Sorry. Then, why do you treat the first item in the stream specially? Also I'm not sure if it is safe to start writing response body before reading the request body until the end. https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html#sec6
|
@yugui I was missing some things here (it was written in a rush). This now ensures CloseSend is called and cleans up the semantics a bit. One alternative would be to move the Send handling out and invoke it once to capture the initial error. Thoughts? |
@yugui I changed the implementation to be more clear. |
@tmc Thank you. That's much clearer. https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html#sec6
In my understanding, it requires us to receive the request before sending any response in the TCP session. So #168 looks to be a requirement of this PR. What do you think? |
dec := marshaler.NewDecoder(req.Body) | ||
handleSend := func() error { | ||
var protoReq {{.Method.RequestType.GoType .Method.Service.File.GoPkg.Path}} | ||
err = dec.Decode(&protoReq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := dec.Decode(&protoReq); err != nil
would be more consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was following existing style from the 'client-streaming-request-func' template.
@yugui updated with feedback taken into consideration. This SO post deals with the state of response-before-request (early 401s in particular) http://stackoverflow.com/questions/14250991/is-it-acceptable-for-a-server-to-send-a-http-response-before-the-entire-request The most common case here would be clients that don't process the response until they've sent the request fully. For my use case (a websocket proxy on top of grpc-gateway) this isn't a concern as I control the http communication on both sides. I don't think having some misbehaving http clients should block this as it enables writing websocket proxies. Having this work in grpc-gateway closes the gap of bringing grpc semantics to browsers (with a little extra effort). |
@tmc. Thank you. LGTM. |
New to grpc-gateway. Have some questions related to this work. I take it by "enables writing websocket proxies" that grpc-gateway does not itself use websockets for streaming. Is that correct? Is there an external websocket proxy project? If not, what would one look like? Would it be generic or have to be generated from proto files? |
@karlkfi that is correct. I have an example websocket proxy here: https://github.com/tmc/grpcutils/blob/master/websocket_proxy.go it's a generic http.Handler that will perform a ws upgrade on the same url as the http endpoint. example use here: |
Add bidirectional streaming support by running Send() and Recv() concurrently
Fixes #169