Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configured Resume prevents closing of connection on RejectedSetupException (Regression 1.1.3->1.1.5) #1121

Open
mdindoffer opened this issue Feb 11, 2025 · 0 comments

Comments

@mdindoffer
Copy link

mdindoffer commented Feb 11, 2025

This is basically a followup from #1092 (comment)
RSocket 1.1.5 did fix the premature closing of the TCP connection when server responds to SETUP message with a RejectedSetupException. Unfortunately, the authentication behaviour is still broken with the Resume functionality enabled. In this case Server will not send the Error frame to the Client and the connection is left hanging.

Expected Behavior

Returning Mono.error(new RejectedSetupException("")) in the SetupAcceptor should lead to Client receiving an Error frame and closure of the connection.

Actual Behavior

Instead, Resume somehow prevents the Server from sending the Error frame and the Client is not notified of the rejected setup.

Steps to Reproduce

This can be easily reproduced by the very same example I provided in #1092 (comment) with one small change - adding Resume configuration to the Client and Server. This reproduction test is passing with RSocket 1.1.3, but fails due to a timeout with 1.1.5.

public class AuthenticationWithResumeTest {

    private static final Logger LOG = LoggerFactory.getLogger(AuthenticationWithResumeTest.class);
    private static final int PORT = 23200;

    @Test
    void authTest() {
        createServer().block();
        RSocket rsocketClient = createClient().block();

        StepVerifier.create(
                        rsocketClient.requestResponse(DefaultPayload.create("Client: Hello"))
                )
                .expectError(RejectedSetupException.class)
                .verify(Duration.ofSeconds(5));
    }

    private static Mono<CloseableChannel> createServer() {
        LOG.info("Starting server at port {}", PORT);
        RSocketServer rSocketServer = RSocketServer.create((connectionSetupPayload, rSocket) -> Mono.just(new MyServerRsocket()));

        TcpServer tcpServer = TcpServer.create()
                .host("localhost")
                .port(PORT);

        return rSocketServer
                .interceptors(interceptorRegistry -> interceptorRegistry.forSocketAcceptor(socketAcceptor -> (setup, sendingSocket) -> {
                    if (true) {//TODO here would be an authentication check based on the setup payload
                        return Mono.error(new RejectedSetupException("ACCESS_DENIED"));
                    } else {
                        return socketAcceptor.accept(setup, sendingSocket);
                    }
                }))
                .resume(new Resume().retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))))
                .bind(TcpServerTransport.create(tcpServer))
                .doOnNext(closeableChannel -> LOG.info("RSocket server started."));
    }

    private static Mono<RSocket> createClient() {
        LOG.info("Connecting....");
        return RSocketConnector.create()
                .resume(new Resume().retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                        .doAfterRetry(retrySignal -> {
                            if (retrySignal.failure() != null) {
                                LOG.warn("Reconnecting (for resume) to server failed. Will retry...");
                            } else {
                                LOG.info("Successfully reconnected (resume) to server");
                            }
                        })))
                .connect(TcpClientTransport.create(TcpClient.create()
                        .host("localhost")
                        .port(PORT)))
                .doOnNext(rSocket -> LOG.info("Successfully connected to server"))
                .doOnError(throwable -> LOG.error("Failed to connect to server"));
    }

    public static class MyServerRsocket implements RSocket {
        private static final Logger LOG = LoggerFactory.getLogger(MyServerRsocket.class);

        @Override
        public Mono<Payload> requestResponse(Payload payload) {
            LOG.info("Got a Response request with payload: {}", payload.getDataUtf8());
            return Mono.just("Response data blah blah blah")
                    .map(DefaultPayload::create);
        }
    }
}

Output:

[main] INFO  AuthenticationWithResumeTest - Starting server at port 23200
[reactor-tcp-epoll-1] INFO  AuthenticationWithResumeTest - RSocket server started.
[main] INFO  AuthenticationWithResumeTest - Connecting....
[reactor-tcp-epoll-3] DEBUG i.r.FrameLogger - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b10000000 Length: 74
Data:

[reactor-tcp-epoll-3] DEBUG i.r.r.ResumableDuplexConnection - Side[client]|Session[դw�p�M�����-6Ǝ]|DuplexConnection[1]. Connecting
[reactor-tcp-epoll-3] DEBUG i.r.r.InMemoryResumableFramesStore - Side[client]|Session[դw�p�M�����-6Ǝ]. Resumed at position[0]
[reactor-tcp-epoll-3] DEBUG i.r.r.InMemoryResumableFramesStore - Side[client]|Session[դw�p�M�����-6Ǝ]. Connected at Position[0] and ImpliedPosition[0]
[reactor-tcp-epoll-3] INFO  AuthenticationWithResumeTest - Successfully connected to server
[reactor-tcp-epoll-4] DEBUG i.r.FrameLogger - receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b10000000 Length: 74
Data:

[main] DEBUG i.r.FrameLogger - sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 43 6c 69 65 6e 74 3a 20 48 65 6c 6c 6f          |Client: Hello   |
+--------+-------------------------------------------------+----------------+
[reactor-tcp-epoll-4] DEBUG i.r.r.ResumableDuplexConnection - Side[server]|Session[դw�p�M�����-6Ǝ]|DuplexConnection[1]. Connecting
[reactor-tcp-epoll-4] DEBUG i.r.r.InMemoryResumableFramesStore - Side[server]|Session[դw�p�M�����-6Ǝ]. Resumed at position[0]

java.lang.AssertionError: VerifySubscriber timed out on RequestResponseRequesterMono

Your Environment

  • openjdk 21.0.6
  • RSocket 1.1.5 (via rsocket-bom)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant