Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix detection of closed connections in cluster connection pools #146

Merged
merged 6 commits into from
Aug 28, 2021

Conversation

XA21X
Copy link
Member

@XA21X XA21X commented Aug 23, 2021

Add a shutdown signal so the Rx process can tell the Tx process to stop, when the remote side has closed the connection.

@rukai rukai self-requested a review August 23, 2021 05:09
Copy link
Member

@rukai rukai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to properly understand the new error handling/shutdown process.
But heres the feedback so far

Copy link
Member

@rukai rukai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not investigated this thoroughly but could we implement it like this:

pub fn spawn_from_stream<C: Codec + 'static>(
    codec: &C,
    stream: TcpStream,
) -> UnboundedSender<Request> {
    let (read, write) = stream.into_split();
    let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();
    let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();

    let codec_clone1 = codec.clone();
    let codec_clone2 = codec.clone();

    tokio::spawn(async move {
        tokio::select! {
            result = tx_process(write, out_rx, return_tx, codec_clone1) => if let Err(e) = result {
                trace!("connection write-closed with error: {:?}", e);
            } else {
                trace!("connection write-closed gracefully");
            },
            result = rx_process(read, return_rx, codec_clone2) => if let Err(e) = result {
                trace!("connection read-closed with error: {:?}", e);
            } else {
                trace!("connection read-closed gracefully");
            }
        }
    });

    out_tx
}

Or do need each in a separate task for performance reasons or something?

let (read, write) = stream.into_split();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();
let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets name it remote_connection_closed_tx, remote_connection_closed_rx

Copy link
Member Author

@XA21X XA21X Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it as closed_tx/rx instead because it's not necessary a "remote" thing? i.e. we don't know if the rx_process finished gracefully or with errors. 🤔

@XA21X
Copy link
Member Author

XA21X commented Aug 24, 2021

separate task for performance reasons

I have not investigated this thoroughly but could we implement it like this:

pub fn spawn_from_stream<C: Codec + 'static>(
    codec: &C,
    stream: TcpStream,
) -> UnboundedSender<Request> {
    let (read, write) = stream.into_split();
    let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();
    let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();

    let codec_clone1 = codec.clone();
    let codec_clone2 = codec.clone();

    tokio::spawn(async move {
        tokio::select! {
            result = tx_process(write, out_rx, return_tx, codec_clone1) => if let Err(e) = result {
                trace!("connection write-closed with error: {:?}", e);
            } else {
                trace!("connection write-closed gracefully");
            },
            result = rx_process(read, return_rx, codec_clone2) => if let Err(e) = result {
                trace!("connection read-closed with error: {:?}", e);
            } else {
                trace!("connection read-closed gracefully");
            }
        }
    });

    out_tx
}

Or do need each in a separate task for performance reasons or something?

My understanding is it's for performance reasons, to allow duplex - send & receive on separate threads at the same time. I haven't checked whether duplex is actually possible or implemented correctly, but it would seem like a good reason to keep them separate?

@rukai
Copy link
Member

rukai commented Aug 24, 2021

ah!
Can we select! on the spawn calls?

@rukai
Copy link
Member

rukai commented Aug 24, 2021

Seems not because then we would be blocking in spawn_from_stream.

My next question is do we need to handle shutting down the reader? Or is that already handled by some other mechanism?

@XA21X
Copy link
Member Author

XA21X commented Aug 24, 2021

Seems not because then we would be blocking in spawn_from_stream.

My next question is do we need to handle shutting down the reader? Or is that already handled by some other mechanism?

    /// **Note:** Dropping the write half will shut down the write half of the TCP
    /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.

When the transform closes the connection out_tx this propagates to: out_rx -> rx_stream -> in_w -> write.

@XA21X XA21X linked an issue Aug 24, 2021 that may be closed by this pull request
@XA21X XA21X changed the title Fix detection of closed connections Fix detection of closed connections in cluster connection pools Aug 24, 2021
@XA21X XA21X requested a review from rukai August 24, 2021 11:19
@rukai
Copy link
Member

rukai commented Aug 25, 2021

ah, so my understanding is shutdown of rx_process task works like this:

  1. tx_process task shutsdown and signals the end of transmission to the redis instance.
  2. The redis instance receives this and then signals end of transmission to rx_process task which then shutsdown.

Is this correct?

@XA21X
Copy link
Member Author

XA21X commented Aug 25, 2021

It turns out I've misread the comment I quoted above (which is still a bit confusing to read), but yes, you are correct.

/// To shut down the stream in the write direction, you can call the
/// [`shutdown()`] method. This will cause the other peer to receive a read of
/// length 0, indicating that no more data will be sent. This only closes
/// the stream in one direction

Dug through the kernel to confirm how it's implemented:

https://github.com/torvalds/linux/blob/6e764bcd1cf72a2846c0e53d3975a09b242c04c9/net/socket.c#L2231
https://github.com/torvalds/linux/blob/6e764bcd1cf72a2846c0e53d3975a09b242c04c9/net/ipv4/tcp.c#L2688
https://github.com/torvalds/linux/blob/6e764bcd1cf72a2846c0e53d3975a09b242c04c9/include/net/sock.h#L1439

A FIN gets sent when a TCP socket is shutdown on the write side.

Copy link
Member

@rukai rukai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thats a thorough investigation, its a lot clearer now, thanks!

While investigating the problem I found this Abortable type provided by tokio which provides slightly more type safety at the cost of flexibility, we can use it as follows:

pub fn spawn_from_stream<C: Codec + 'static>(
    codec: &C,
    stream: TcpStream,
) -> UnboundedSender<Request> {
    let (read, write) = stream.into_split();
    let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();
    let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel::<Request>();

    use futures::future::{AbortHandle, Abortable};
    let (handle, registration) = AbortHandle::new_pair();

    let codec_clone = codec.clone();
    tokio::spawn(
        Abortable::new(
            async move {
                if let Err(e) = tx_process(write, out_rx, return_tx, codec_clone).await {
                    trace!("connection write-closed with error: {:?}", e);
                } else {
                    trace!("connection write-closed gracefully");
                }
            },
            registration
        )
    );

    let codec_clone = codec.clone();
    tokio::spawn(async move {
        if let Err(e) = rx_process(read, return_rx, codec_clone).await {
            trace!("connection read-closed with error: {:?}", e);
        } else {
            trace!("connection read-closed gracefully");
        }

        // Signal the writer to also exit, which then closes `out_tx` - what we consider 'the connection'.
        handle.abort()
    });

    out_tx
}

However, I dont really see much value in using it, tokio::select! is more flexible if our needs change.
Feel free to use it if you want, but I think we should just proceed as is, and maybe delete the TODO comment.
LGTM

When the remote peer closes a connection, this causes the rx task to
stop, but the tx side keeps running until it is used to send a message,
which is guaranteed to fail. The fix is to add a 'closed' signal to
tell the tx task to stop after the rx task finishes.
@benbromhead
Copy link
Member

So to confirm we split into two different tasks for perf reasons.

@benbromhead
Copy link
Member

benbromhead commented Aug 27, 2021

I also spent some time looking through this and figured I needed a test / some code to better understand behaviour and I don't know if it's quite doing what we think it will.

With the test below I got the following trace:

Aug 27 13:22:47.056 TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
Aug 27 13:22:47.056 TRACE mio::poll: registering event source with poller: token=Token(2), interests=READABLE | WRITABLE
Aug 27 13:22:47.056 TRACE mio::poll: registering event source with poller: token=Token(3), interests=READABLE | WRITABLE
Aug 27 13:22:47.056 TRACE tokio_util::codec::framed_impl: flushing framed transport
Aug 27 13:22:47.056 TRACE tokio_util::codec::framed_impl: framed transport flushed
Aug 27 13:22:47.056 TRACE mio::poll: deregistering event source from poller
Aug 27 13:22:47.056 TRACE mio::poll: deregistering event source from poller
Aug 27 13:22:47.056 TRACE shotover_proxy::protocols::redis_codec: frames [] - remaining 0
Aug 27 13:22:47.056 TRACE shotover_proxy::transforms::util::cluster_connection_pool: connection read-closed gracefully
Aug 27 13:22:47.056 TRACE mio::poll: deregistering event source from poller
Aug 27 13:22:47.056 TRACE shotover_proxy::transforms::util::cluster_connection_pool: connection write-closed with error: channel closed

Which shows that on graceful shutdown the write side of the socket gets a logged error as the tx_process returns an error (from the foward), rather than getting the shutdown from the oneshot (this may be a timing thing and on higher latency connections behave as expected).

I also would have expected the other end of the channel returned from spawn_from_stream to have been dropped and thus caused sending on it to error out (given that both tx and rx processes have stopped and dropped their receivers...). Though I'm not 100% on this behavior so please correct me if I'm wrong.

@XA21X could you investigate a little further and maybe include a test to show that shutdown is more or less doing what we expect and that we can't send on the channel if the socket backing it has been dropped / shutdown.

mod test {
    use crate::message::Message;
    use crate::protocols::redis_codec::RedisCodec;
    use crate::protocols::RawFrame;
    use crate::transforms::util::cluster_connection_pool::{spawn_from_stream, ConnectionPool};
    use crate::transforms::util::Request;
    use anyhow::anyhow;
    use std::net::SocketAddr;
    use tokio::io::AsyncReadExt;
    use tokio::net::TcpListener;
    use tokio::net::TcpStream;

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_shutdown() {
        let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

        let builder = tracing_subscriber::fmt()
            .with_writer(non_blocking)
            .with_env_filter("TRACE")
            .with_filter_reloading();
        let handle = builder.reload_handle();

        // To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init.
        // Currently the implementation of try_init will only fail when it is called multiple times.
        builder.try_init().ok();

        let listener = TcpListener::bind("127.0.0.1:2222".parse::<SocketAddr>().unwrap())
            .await
            .unwrap();

        let (closed_tx, closed_rx) = tokio::sync::oneshot::channel();

        let remote = tokio::spawn(async move {
            let (mut socket, _) = listener.accept().await.unwrap();
            //read 1 byte, then read another and drop

            tokio::select! {
                result = socket.read_u8() => if let Err(_) = result {
                    return Err(anyhow!("uh oh"))
                } else {
                    return Err(anyhow!("uh oh"))
                },
                _ = closed_rx => {
                    return Ok(())
                },
            }
        });

        let mut stream = TcpStream::connect("127.0.0.1:2222").await.unwrap();
        let codec = RedisCodec::new(true, 3);

        let sender = spawn_from_stream(&codec, stream);

        closed_tx.send(1).unwrap();

        assert!(remote.await.is_ok());

        assert!(
            sender
                .send(Request {
                    messages: Message::new_bypass(RawFrame::None),
                    return_chan: None,
                    message_id: None
                })
                .is_err(),
            "channel still up"
        );
    }
}

@XA21X
Copy link
Member Author

XA21X commented Aug 27, 2021

I was able to reproduce the error with @benbromhead's test, so I've added a test for signalling in each direction.

When the remote TcpStream gets dropped, it closes the socket file descriptor (libc::close), which is handled by the TCP stack (e.g. Linux kernel) by sending an RST/FIN. As the packet is effectively async to the application, the test manages to call sender.send before the local TcpStream-FramedRead processes the packet as an EOF and returns None.

Adding a 10 millisecond delay before the send was sufficient to allow the read process to detect the closed connection, and tell the write process to shutdown, closing the other side of the sender, before the test attempts a send.

It turns out we can block on that condition (sender.closed().await), so I could remove the timeouts I put into my test while debugging (to fail-early instead of blocking), but there's also no harm leaving them in as a responsiveness check.

Test Output - Remote shutdown closes local sender:

Aug 27 19:51:13.058 TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE | WRITABLE    
Aug 27 19:51:13.060 TRACE mio::poll: registering event source with poller: token=Token(2), interests=READABLE | WRITABLE    
Aug 27 19:51:13.060 TRACE mio::poll: registering event source with poller: token=Token(3), interests=READABLE | WRITABLE    
Aug 27 19:51:13.060 TRACE mio::poll: deregistering event source from poller    
Aug 27 19:51:13.060 TRACE mio::poll: deregistering event source from poller    
Aug 27 19:51:13.060 TRACE tokio_util::codec::framed_impl: flushing framed transport    
Aug 27 19:51:13.061 TRACE tokio_util::codec::framed_impl: framed transport flushed    
Aug 27 19:51:13.061 TRACE shotover_proxy::protocols::redis_codec: frames [] - remaining 0
Aug 27 19:51:13.061 TRACE shotover_proxy::transforms::util::cluster_connection_pool: connection read-closed gracefully
Aug 27 19:51:13.061 TRACE tokio_util::codec::framed_impl: flushing framed transport    
Aug 27 19:51:13.061 TRACE tokio_util::codec::framed_impl: framed transport flushed    
Aug 27 19:51:13.061 TRACE mio::poll: deregistering event source from poller    
Aug 27 19:51:13.061 TRACE shotover_proxy::transforms::util::cluster_connection_pool: connection write-closed by remote upstream
test transforms::util::cluster_connection_pool::test::test_remote_shutdown ... ok

Test Output - Closing local sender causes remote shutdown:

Aug 27 19:52:33.577 TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE | WRITABLE    
Aug 27 19:52:33.578 TRACE mio::poll: registering event source with poller: token=Token(2), interests=READABLE | WRITABLE    
Aug 27 19:52:33.579 TRACE mio::poll: registering event source with poller: token=Token(3), interests=READABLE | WRITABLE    
Aug 27 19:52:33.579 TRACE tokio_util::codec::framed_impl: flushing framed transport    
Aug 27 19:52:33.579 TRACE tokio_util::codec::framed_impl: framed transport flushed    
Aug 27 19:52:33.580 TRACE shotover_proxy::transforms::util::cluster_connection_pool: connection write-closed gracefully
Aug 27 19:52:33.580 TRACE mio::poll: deregistering event source from poller    
Aug 27 19:52:33.580 TRACE mio::poll: deregistering event source from poller    
Aug 27 19:52:33.580 TRACE mio::poll: deregistering event source from poller    
test transforms::util::cluster_connection_pool::test::test_local_shutdown ... ok

Copy link
Member

@benbromhead benbromhead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - I did a quick ninja fix to change the log level in the tests to INFO. I'll merge once the test rerun is green

@benbromhead benbromhead merged commit 4281614 into main Aug 28, 2021
@XA21X XA21X deleted the detect-closed-connections branch August 28, 2021 23:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants