From 1fdbc9d7a1acdedd9459f3f0719b1dfd90a037d5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 28 Feb 2018 12:49:14 -0800 Subject: [PATCH] WIP test for #415 --- proxy/tests/support/proxy.rs | 11 +++++++++-- proxy/tests/support/server.rs | 24 ++++++++++++++++++++--- proxy/tests/support/tcp.rs | 6 +++++- proxy/tests/transparency.rs | 36 +++++++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index eb601a752c191..3cdcbe00251d6 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -23,6 +23,9 @@ pub struct Listening { pub inbound: SocketAddr, pub outbound: SocketAddr, + pub outbound_server: Option, + pub inbound_server: Option, + shutdown: Shutdown, } @@ -150,8 +153,8 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { .name("support proxy".into()) .spawn(move || { let _c = controller; - let _i = inbound; - let _o = outbound; + // let _i = inbound; + // let _o = outbound; let _ = running_tx.send(()); main.run_until(rx); @@ -165,6 +168,10 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { control: control_addr, inbound: inbound_addr, outbound: outbound_addr, + + outbound_server: outbound, + inbound_server: inbound, + shutdown: tx, } } diff --git a/proxy/tests/support/server.rs b/proxy/tests/support/server.rs index e12a29f52dfc8..c466b6ff029e1 100644 --- a/proxy/tests/support/server.rs +++ b/proxy/tests/support/server.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread; use support::*; @@ -30,6 +30,13 @@ pub struct Server { pub struct Listening { pub addr: SocketAddr, pub(super) shutdown: Shutdown, + pub(super) conn_count: Arc>, +} + +impl Listening { + pub fn connections(&self) -> usize { + *self.conn_count.lock().unwrap() + } } impl Server { @@ -81,6 +88,8 @@ impl Server { pub fn run(self) -> Listening { let (tx, rx) = shutdown_signal(); let (addr_tx, addr_rx) = oneshot::channel(); + let conn_count = Arc::new(Mutex::new(0)); + let srv_conn_count = Arc::clone(&conn_count); ::std::thread::Builder::new().name("support server".into()).spawn(move || { let mut core = Core::new().unwrap(); let reactor = core.handle(); @@ -93,7 +102,11 @@ impl Server { Box::new(move |sock| { let h1_clone = h1.clone(); + let srv_conn_count = Arc::clone(&srv_conn_count); let conn = new_svc.new_service() + .inspect(move |_| { + *(srv_conn_count.lock().unwrap()) += 1; + }) .from_err() .and_then(move |svc| h1_clone.serve_connection(sock, svc)) .map(|_| ()) @@ -108,8 +121,12 @@ impl Server { reactor.clone(), ); Box::new(move |sock| { + let srv_conn_count = Arc::clone(&srv_conn_count); let conn = h2.serve(sock) - .map_err(|e| println!("server h2 error: {:?}", e)); + .map_err(|e| println!("server h2 error: {:?}", e)) + .inspect(move |_| { + *(srv_conn_count.lock().unwrap()) += 1; + }); Box::new(conn) }) }, @@ -122,7 +139,7 @@ impl Server { let _ = addr_tx.send(local_addr); let serve = bind.incoming() - .fold((srv, reactor), |(srv, reactor), (sock, _)| { + .fold((srv, reactor), move |(srv, reactor), (sock, _)| { if let Err(e) = sock.set_nodelay(true) { return Err(e); } @@ -145,6 +162,7 @@ impl Server { Listening { addr, shutdown: tx, + conn_count, } } } diff --git a/proxy/tests/support/tcp.rs b/proxy/tests/support/tcp.rs index 87256b4b0adfa..5b325c2f917f1 100644 --- a/proxy/tests/support/tcp.rs +++ b/proxy/tests/support/tcp.rs @@ -2,6 +2,7 @@ use support::*; use std::collections::VecDeque; use std::io; +use std::sync::{Arc, Mutex}; use self::futures::sync::{mpsc, oneshot}; use self::tokio_core::net::TcpStream; @@ -151,6 +152,8 @@ fn run_client(addr: SocketAddr) -> TcpSender { fn run_server(tcp: TcpServer) -> server::Listening { let (tx, rx) = shutdown_signal(); let (addr_tx, addr_rx) = oneshot::channel(); + let conn_count = Arc::new(Mutex::new(0)); + let srv_conn_count = Arc::clone(&conn_count); ::std::thread::Builder::new().name("support server".into()).spawn(move || { let mut core = Core::new().unwrap(); let reactor = core.handle(); @@ -165,7 +168,7 @@ fn run_server(tcp: TcpServer) -> server::Listening { let work = bind.incoming().for_each(move |(sock, _)| { let cb = accepts.pop_front().expect("no more accepts"); - + *(srv_conn_count.lock().unwrap()) += 1; let fut = tokio_io::io::read(sock, vec![0; 1024]) .and_then(move |(sock, mut vec, n)| { vec.truncate(n); @@ -186,5 +189,6 @@ fn run_server(tcp: TcpServer) -> server::Listening { server::Listening { addr, shutdown: tx, + conn_count, } } diff --git a/proxy/tests/transparency.rs b/proxy/tests/transparency.rs index 17e287142e2e4..c8b778e8cc5c1 100644 --- a/proxy/tests/transparency.rs +++ b/proxy/tests/transparency.rs @@ -313,3 +313,39 @@ fn http1_get_doesnt_add_transfer_encoding() { let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local"); assert_eq!(client.get("/"), "hello h1"); } + +#[test] +fn http1_one_connection_per_host() { + let _ = env_logger::try_init(); + + let srv = server::http1().route("/", "hello").run(); + let ctrl = controller::new() + .run(); + let proxy = proxy::new().controller(ctrl).inbound(srv).run(); + + let client = client::http1(proxy.inbound, "foo.bar"); + + let res1 = client.request(client.request_builder("/") + .version(http::Version::HTTP_11) + .header("host", "foo.bar") + ); + assert_eq!(res1.status(), http::StatusCode::OK); + assert_eq!(res1.version(), http::Version::HTTP_11); + let res1 = client.request(client.request_builder("/") + .version(http::Version::HTTP_11) + .header("host", "foo.bar") + ); + assert_eq!(res1.status(), http::StatusCode::OK); + assert_eq!(res1.version(), http::Version::HTTP_11); + + let client = client::http1(proxy.inbound, "bar.baz"); + let res2 = client.request(client.request_builder("/") + .version(http::Version::HTTP_11) + .header("host", "bar.baz")); + assert_eq!(res2.status(), http::StatusCode::OK); + assert_eq!(res2.version(), http::Version::HTTP_11); + + let inbound = &proxy.inbound_server.as_ref() + .expect("no inbound server!"); + assert_eq!(inbound.connections(), 2); +}