Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poll_shutdown is called in a busy loop when returning Poll::Pending #831

Closed
bdbai opened this issue Jan 17, 2025 · 1 comment · Fixed by #834
Closed

poll_shutdown is called in a busy loop when returning Poll::Pending #831

bdbai opened this issue Jan 17, 2025 · 1 comment · Fixed by #834
Labels

Comments

@bdbai
Copy link
Contributor

bdbai commented Jan 17, 2025

Code to reproduce

use h2::client;

use http::{Method, Request};
use std::{
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
    time::Duration,
};
use tokio::{
    io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
    net::TcpStream,
    time::sleep,
};
use tokio_native_tls::native_tls::TlsConnector;

type Result<T> = std::io::Result<T>;

struct ShutdownDelayedStream<T> {
    shutdown_poll_count: u32,
    shutdown_delay_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
    inner: T,
}

impl<T> ShutdownDelayedStream<T> {
    fn new(inner: T) -> Self {
        Self {
            shutdown_poll_count: 0,
            shutdown_delay_fut: Some(Box::pin(async {
                sleep(Duration::from_millis(100)).await;
            })),
            inner,
        }
    }
}

impl<T: AsyncRead + Unpin> AsyncRead for ShutdownDelayedStream<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
    }
}

impl<T: AsyncWrite + Unpin> AsyncWrite for ShutdownDelayedStream<T> {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
        Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_flush(cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        let this = self.get_mut();
        this.shutdown_poll_count = this.shutdown_poll_count.wrapping_add(1);
        println!(
            "poll_shutdown: shutdown_poll_count={}",
            this.shutdown_poll_count
        );
        if let Some(delay_fut) = this.shutdown_delay_fut.as_mut() {
            ready!(delay_fut.as_mut().poll(cx));
            this.shutdown_delay_fut = None;
        }
        println!("poll_shutdown: calling inner poll_shutdown");
        Pin::new(&mut this.inner).poll_shutdown(cx)
    }
}

#[tokio::main]
pub async fn main() -> Result<()> {
    // _main_tcp().await
    main_h2().await
}
pub async fn _main_tcp() -> Result<()> {
    let stream = TcpStream::connect("1.1.1.1:443").await?;
    println!("Connected");
    let mut stream = ShutdownDelayedStream::new(stream);
    stream.shutdown().await?;
    println!("Shutdown done");
    Ok(())
}

pub async fn main_h2() -> Result<()> {
    let (mut h2, conn_task) = {
        // Establish TCP connection to the server.
        let stream = TcpStream::connect("1.1.1.1:443").await?;
        let builder = tokio_native_tls::TlsConnector::from(
            TlsConnector::builder()
                .request_alpns(&["h2"])
                .build()
                .unwrap(),
        );
        let stream = builder.connect("1.1.1.1", stream).await.unwrap();
        let stream = ShutdownDelayedStream::new(stream);
        let (h2, connection) = client::handshake(stream).await.expect("handshake");

        let conn_task = tokio::spawn(async move {
            connection.await.unwrap();
        });
        let h2 = h2.ready().await.expect("h2 ready");
        (h2, conn_task)
    };
    {
        // Prepare the HTTP request to send to the server.
        let request = Request::builder()
            .method(Method::GET)
            .uri("https://1.1.1.1/")
            .body(())
            .unwrap();

        // Send the request. The second tuple item allows the caller
        // to stream a request body.
        let (response, _send) = h2.send_request(request, true).unwrap();

        let (head, _) = response.await.expect("response").into_parts();

        println!("Received response: {:?}", head);
    }
    drop(h2);
    println!("Waiting for connection task to finish");
    conn_task.await?;
    println!("Connection task finished");
    Ok(())
}

Actual result

poll_shutdown is called a lot of times as if in a busy loop until the timer times up.

(...omitted 2k lines of logs...)
poll_shutdown: shutdown_poll_count=2269
poll_shutdown: shutdown_poll_count=2270
poll_shutdown: shutdown_poll_count=2271
poll_shutdown: shutdown_poll_count=2272
poll_shutdown: shutdown_poll_count=2273
poll_shutdown: calling inner poll_shutdown
Connection task finished

Expected result

When poll_shutdown returns Poll::Pending, it should not be woken up immediately. Calling _main_tcp produces the following output:

Connected
poll_shutdown: shutdown_poll_count=1
poll_shutdown: shutdown_poll_count=2
poll_shutdown: calling inner poll_shutdown
Shutdown done

Expecting to see something similar using main_h2.

Reference

compio-rs/cyper#25

This was first discovered when I saw CPU usage spikes in a server running cyper in compio runtime, which uses io_uring under the hood. Normally in compio IO requests submitted do not finish immediately, which is causing issues here.

@bdbai bdbai changed the title poll_shutdown is called in a busy loop when returning Poll::Pending poll_shutdown is called in a busy loop when returning Poll::Pending Jan 17, 2025
@seanmonstar
Copy link
Member

Thanks for the report! I've got a #834 with a fix pending.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants