@@ -2122,6 +2122,82 @@ mod conn {
21222122 . expect_err ( "client should be closed" ) ;
21232123 }
21242124
2125+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
2126+ async fn http2_connect_detect_close ( ) {
2127+ // Regression test for failure to fully close connections when using HTTP2 CONNECT
2128+ // We send 2 requests and then drop them. We should see the connection gracefully close.
2129+ use futures_util:: future;
2130+ let ( listener, addr) = setup_tk_test_server ( ) . await ;
2131+ let ( tx, rxx) = oneshot:: channel :: < ( ) > ( ) ;
2132+
2133+ tokio:: task:: spawn ( async move {
2134+ use hyper:: server:: conn:: http2;
2135+ use hyper:: service:: service_fn;
2136+
2137+ let res = listener. accept ( ) . await ;
2138+ let ( stream, _) = res. unwrap ( ) ;
2139+ let stream = TokioIo :: new ( stream) ;
2140+
2141+ let service = service_fn ( move |req : Request < hyper:: body:: Incoming > | {
2142+ tokio:: task:: spawn ( async move {
2143+ let io = & mut TokioIo :: new ( hyper:: upgrade:: on ( req) . await . unwrap ( ) ) ;
2144+ io. write_all ( b"hello\n " ) . await . unwrap ( ) ;
2145+ } ) ;
2146+
2147+ future:: ok :: < _ , hyper:: Error > ( Response :: new ( Empty :: < Bytes > :: new ( ) ) )
2148+ } ) ;
2149+
2150+ tokio:: task:: spawn ( async move {
2151+ let conn = http2:: Builder :: new ( TokioExecutor ) . serve_connection ( stream, service) ;
2152+ let _ = conn. await ;
2153+ tx. send ( ( ) ) . unwrap ( ) ;
2154+ } ) ;
2155+ } ) ;
2156+
2157+ let io = tcp_connect ( & addr) . await . expect ( "tcp connect" ) ;
2158+ let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2159+ . handshake ( io)
2160+ . await
2161+ . expect ( "http handshake" ) ;
2162+
2163+ tokio:: task:: spawn ( async move {
2164+ conn. await . expect ( "client conn" ) ;
2165+ } ) ;
2166+
2167+ // Sanity check that client is ready
2168+ future:: poll_fn ( |ctx| client. poll_ready ( ctx) )
2169+ . await
2170+ . expect ( "client poll ready sanity" ) ;
2171+ let requests = 2 ;
2172+ let mut clients = vec ! [ client. clone( ) , client] ;
2173+ let ( tx, rx) = oneshot:: channel :: < ( ) > ( ) ;
2174+ let ( tx2, rx2) = oneshot:: channel :: < ( ) > ( ) ;
2175+ let mut rxs = vec ! [ rx, rx2] ;
2176+ for _i in 0 ..requests {
2177+ let mut client = clients. pop ( ) . unwrap ( ) ;
2178+ let rx = rxs. pop ( ) . unwrap ( ) ;
2179+ let req = Request :: builder ( )
2180+ . method ( Method :: CONNECT )
2181+ . uri ( format ! ( "{}" , addr) )
2182+ . body ( Empty :: < Bytes > :: new ( ) )
2183+ . expect ( "request builder" ) ;
2184+
2185+ let resp = client. send_request ( req) . await . expect ( "req1 send" ) ;
2186+ assert_eq ! ( resp. status( ) , 200 ) ;
2187+ let upgrade = hyper:: upgrade:: on ( resp) . await . unwrap ( ) ;
2188+ tokio:: task:: spawn ( async move {
2189+ let _ = rx. await ;
2190+ drop ( upgrade) ;
2191+ } ) ;
2192+ }
2193+ drop ( tx) ;
2194+ drop ( tx2) ;
2195+ tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , rxx)
2196+ . await
2197+ . expect ( "drop with 1s" )
2198+ . expect ( "tx dropped without sending" ) ;
2199+ }
2200+
21252201 #[ tokio:: test]
21262202 async fn http2_keep_alive_detects_unresponsive_server ( ) {
21272203 let ( listener, addr) = setup_tk_test_server ( ) . await ;
0 commit comments