-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream.Write() waits indefinitely if stream is full. Cancellations apply to entire stream, not messages. #1229
Comments
It's not clear how we can cancel a |
+ejona86 don't think this is a protocol issue at all. This is an issue with blocking writes in general, if the application can't be unblocked then we have a problem. I do agree that if the write can't complete, possibly for flow-control reasons, then the RST_STREAM might be necessary to unblock if partial writes have occurred but that's already an allowed feature in the spec. This problem becomes exacerbated for streaming RPCs where the semantics of the call deadlines are ill-defined |
@louiscryan Right. That is absolutely correct. The OP's requirement of cancelling writes without resetting stream was what I was referring to. Having a cancellable API is not. However, I think doing that is going more than halfway the path of adding a fully async API, so if we were to do that, one might as well go all the way and make it return a Future. |
Returning a Future would be even better. |
Sorry for closing this issue too quickly. We thought the whole issue was about trying to salvage the stream even though the receiver isn't reading off the channel. It does seem reasonable to have a way of cancelling a send without relying upon the context's deadline, which the server can't influence. If I understand your objective correctly, it's possible to achieve the behavior of adding a deadline to an individual SendMsg call as follows: done := make(chan struct{})
go func () {
stream.SendMsg(response)
close(done)
}()
t := time.NewTimer(5*time.Second)
select {
case <-t.C:
return status.Errorf(codes.DeadlineExceeded, "too slow")
case <-done:
if !t.Stop() {
<-t.C
}
} This is clearly too much boilerplate for sending a message with a timeout. We could simplify this significantly if we added an explicit t := time.AfterFunc(5*time.Second, stream.Cancel)
stream.SendMsg()
if !t.Stop() {
<- t.C
} Would adding the ability to cancel the stream without needing to return from the handler be sufficient to resolve this issue? (Note that the client side can already implement this pattern, because it created the context used to perform the call, so it can use the associated cancel function.) |
The last option is more usable, and I agree that cancellation are very general purpose. However for the specific use case, |
Only the client creates the stream, so this wouldn't work for the server. If we do something to make timeouts first-class, it should really be set per-send. Unfortunately, our hands are a bit tied here: any change will break the API, because {Client,Server}Stream are interfaces -- even adding a variadic SendOptions-type parameter to Send, which would be backward compatible if the stream wasn't in an interface. |
@dfawley can we get your option but with using a
and the err optionally returning a This also means that all cleanups on |
Retrying on a stream after a timeout is basically impossible. Technically it is possible in certain circumstances, but if you sent part of the message on the wire before the timeout, but not all of it, the stream must be reset (i.e. closed). Because of this, we would need to distinguish to the caller whether or not that happened, which is confusing. And if a timeout ever happens, it means the receiver isn't receiving its pending data for whatever reason, so what would be the goal of continuing to use the stream in that case? End the stream and let the client initiate a new one if desired. It's also somewhat arbitrary to give up in this case if the goal is not to end the stream: if there is sufficient flow control for the message, the send will return quickly, but there's no guarantee the client ever requests the data. Using a context to specify timeouts is fine, but I think for now the best idea is to wrap the code in #1229 (comment) in a function and call it where desired. It can be written fairly generically, too: // DoWithTimeout runs f and returns its error. If the deadline d elapses first,
// it returns a grpc DeadlineExceeded error instead.
func DoWithTimeout(f func() error, d time.Duration) error {
errChan := make(chan error, 1)
go func () {
errChan <- f()
close(errChan)
}()
t := time.NewTimer(d)
select {
case <-t.C:
return status.Errorf(codes.DeadlineExceeded, "too slow")
case err := <-errChan:
if !t.Stop() {
<-t.C
}
return err
}
}
func (s *server) MyHandler(stream pb.Service_MyHandlerServer) error {
for <work to do> {
if err := DoWithTimeout(func() error { return stream.SendMsg(<data>) }, 5*time.Second); err != nil {
return err
}
}
return nil
} Or with a context instead: // DoWithContext runs f and returns its error. If the context is cancelled or
// times out first, it returns the context's error instead.
func DoWithContext(ctx context.Context, f func() error) error {
errChan := make(chan error, 1)
go func () {
errChan <- f()
close(errChan)
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
if !t.Stop() {
<-t.C
}
return err
}
} tl;dr: this is something we would really like to support, but it requires breaking the API, and there is a fairly reasonable workaround. This makes it both hard and low priority, so I'm going to close this for now. |
Just in case someone run into this issue, the |
Good catch, thanks. I updated the examples. |
I have adopted the workaround but it still doesn't solve the problem completely:
In the case the client has become unresponsive, the goroutine remains blocked indefinitely on |
You can use the examples in this comment as ways to add a timeout to your send attempt. Note the important bits of starting a timer and exiting the RPC handler. When the handler returns, the RPC ends and all Send/Recv calls in any goroutines will unblock with errors indicating the stream is closed. |
We ran into
an indefinite wait issue.
https://github.com/istio/mixer/blob/a15700f0423e8fc0d96c154675bc37ff70aae0aa/pkg/api/grpcServer.go#L138
The issue was the grpc stream was full. The stream was being protected by a mutex.
The go-routine actively writing a message blocked indefinitely.
The only cancellation available is on the entire stream.
It would be useful to have
stream.Write(msg, timeout)
The text was updated successfully, but these errors were encountered: