Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Provide mechanism for Pipeline to call onCancel #303

Open
1 task done
Tracked by #306
mattp-swirldslabs opened this issue Oct 10, 2024 · 9 comments · May be fixed by #320
Open
1 task done
Tracked by #306

Feat: Provide mechanism for Pipeline to call onCancel #303

mattp-swirldslabs opened this issue Oct 10, 2024 · 9 comments · May be fixed by #320
Assignees
Labels
Improvement Code changes driven by non business requirements.

Comments

@mattp-swirldslabs
Copy link
Contributor

mattp-swirldslabs commented Oct 10, 2024

Problem

As an application developer using the helidon plugin framework
I want a mechanism to register Runnable callbacks for onCancel and onClose events
So that I can deallocate resources when a client closes or cancels a connection

Solution

Provide a registration mechanism like grpc.io's setOnCancelHandler() and setOnCloseHandler() so the application can register Runnable callbacks.

Alternatives

No response

Tasks

  1. rbair23
@mattp-swirldslabs mattp-swirldslabs added the Improvement Code changes driven by non business requirements. label Oct 10, 2024
@litt3 litt3 self-assigned this Oct 10, 2024
@mattp-swirldslabs
Copy link
Contributor Author

A candidate for where cancel events could invoke a registered Runnable:

public void cancel() {
// FUTURE: Look into implementing this
}

@mattp-swirldslabs
Copy link
Contributor Author

Example of how the registered callback is invoked in ServerCalls in grpc.io:
image

@litt3
Copy link
Contributor

litt3 commented Oct 24, 2024

@mattp-swirldslabs A couple clarifications:

  • Is there a specific need to have both onCancel and onClose callbacks?
  • When you refer to a client canceling, do you mean sending an RST_STREAM frame, or are there other ways to "cancel"?
  • Looking at the case of a client canceling: are you imagining both the onCancel and the onClose callbacks running? Or would it be either-or?

@rbair23
Copy link
Member

rbair23 commented Oct 25, 2024

I don't understand the ask. Since at this time PBJ is not generating the grpc service endpoints, but these are created by the application, you have access to cancel and close using the Java Flow APIs? Why do we need to add something, and where would it be added?

@rbair23
Copy link
Member

rbair23 commented Oct 25, 2024

Is it on the UnaryBuilder and other builders that you want these methods added?

@rbair23
Copy link
Member

rbair23 commented Oct 25, 2024

It should be that PBJ auto-generates service interfaces, such as in this example (GreeterService.java) And then the application would implement this interface, such as in this test.

There are 4 types of methods:

  • Unary
  • Client-Streaming
  • Server-Streaming
  • Bidirectional-Streaming

Unary requests have no streams, and therefore no Flow. They just take the protobuf request object as a Java record, and return a protobuf response object as a Java record.

Client-Streaming is where the client is going to send a whole stream of objects, and in this case, the server needs to know when the connection is terminated so it can clean up resources.

Server-Streaming is where the server is streaming results to the client. In this case, the server could benefit from some callback or exception that indicates the client has closed the connection. An exception in this case might be more useful.

Bidirection-Streaming has to worry about both cases -- the client closing the stream being sent to the server, and the client closing the connection being sent from the server.

Suppose you are implementing GreeterService.java and your method signature is:

public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
                Flow.Subscriber<? super HelloReply> replies) {
            // ...
        }

The replies object is the one you will push replies to, and the Flow.Subscriber that you return is the one the client will send requests to you on. This is an example of the "bidirectional-streaming" case. But actually the same method signature is used for the server-streaming case. So I would say, this is oddity #1: the server-streaming case should maybe be given a Consumer style lambda instead of a Flow, since it is only expected to send a single response.

In either client-streaming or bidirection-streaming, the Flow.Subscriber returned from this method is one that will be given requests from the client over and over. This flow object has the following methods:

            return new Flow.Subscriber<>() {
                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    subscription.request(Long.MAX_VALUE); // turn off flow control
                }

                @Override
                public void onNext(HelloRequest item) {
                    // ...
                }

                @Override
                public void onError(Throwable throwable) {
                    // ...
                }

                @Override
                public void onComplete() {
                    // ...
                }
            };

If the client cancels the request, or completes it, the appropriate override method is called. So we already have a mechanism by which the server handler knows when the client has been closed.

For the responses, I think there is a gap, where during the response unless some exception is thrown, a server could send a stream of responses to a broken client connection. And I don't remember any method by which the server can be told "hey, the client is gone, you should stop sending". Maybe an exception is the most reasonable, but the Flow API doesn't provide for it. We could subclass the Flow.Subscriber API to add a "throws" clause, or documentation, that says onNext might through some exception if the connection is closed, allowing the server code to use a try/catch block and clean up resources.

So in summary, I think we already have the mechanism asked for when it comes to client connections closing on the request side, but maybe not when the connection is closed on the response side, which impacts server-streaming the most (bidirectional-streaming should get a notice when the client connection closes).

@mattp-swirldslabs
Copy link
Contributor Author

mattp-swirldslabs commented Oct 29, 2024

Thanks for the comments. I've been trying different scenarios with bidi and server streaming. @rbair23 as you noted with this comment:

For the responses, I think there is a gap, where during the response unless some exception is thrown, a server could send a stream of responses to a broken client connection. And I don't remember any method by which the server can be told "hey, the client is gone, you should stop sending"

If a consumer is streaming response with a server streaming connection and they drop the connection, a SocketException is thrown in PbjProtocolHandler.onNext(). Right now this just fills the logs with error messages (see screenshots). Calling something like, pipeline.error() does not work because it invokes the SendToClientSubscriber.error(). Instead, it would be helpful if I can register a Runnable on the Flow.Subscriber<? super SubscribeStreamResponse> passed when the application code is first invoked via Pipelines.serverStreaming().method().

I think there's a similar issue in bidi connections if the socket times out because the producer is not sending any data. A SocketException is thrown and we need to set a Runnable to be invoked when the exception is caught.

@mattp-swirldslabs
Copy link
Contributor Author

mattp-swirldslabs commented Oct 29, 2024

In this case, a SocketException is thrown when trying to pass data onNext() since Helidon removed the consumer. Here we need a way to invoke an application-defined action to unsubscribe consumers, etc.
image

@mattp-swirldslabs
Copy link
Contributor Author

mattp-swirldslabs commented Oct 29, 2024

Logs on the server fill with errors since application-level objects, like the RingBuffer, were never told to remove the consumer so they keep trying to send responses.
image

@litt3 litt3 removed their assignment Nov 6, 2024
@mattp-swirldslabs mattp-swirldslabs changed the title Feat: Provide mechanism to register onCancel and onClose handlers Feat: Provide mechanism for Pipeline to call onCancel Nov 7, 2024
@mattp-swirldslabs mattp-swirldslabs self-assigned this Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Improvement Code changes driven by non business requirements.
Projects
None yet
3 participants