Skip to content

Commit

Permalink
WIP test for #415
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Feb 28, 2018
1 parent 41bef41 commit 1fdbc9d
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 6 deletions.
11 changes: 9 additions & 2 deletions proxy/tests/support/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct Listening {
pub inbound: SocketAddr,
pub outbound: SocketAddr,

pub outbound_server: Option<server::Listening>,
pub inbound_server: Option<server::Listening>,

shutdown: Shutdown,
}

Expand Down Expand Up @@ -150,8 +153,8 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
.name("support proxy".into())
.spawn(move || {
let _c = controller;
let _i = inbound;
let _o = outbound;
// let _i = inbound;
// let _o = outbound;

let _ = running_tx.send(());
main.run_until(rx);
Expand All @@ -165,6 +168,10 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
control: control_addr,
inbound: inbound_addr,
outbound: outbound_addr,

outbound_server: outbound,
inbound_server: inbound,

shutdown: tx,
}
}
24 changes: 21 additions & 3 deletions proxy/tests/support/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;

use support::*;
Expand Down Expand Up @@ -30,6 +30,13 @@ pub struct Server {
pub struct Listening {
pub addr: SocketAddr,
pub(super) shutdown: Shutdown,
pub(super) conn_count: Arc<Mutex<usize>>,
}

impl Listening {
pub fn connections(&self) -> usize {
*self.conn_count.lock().unwrap()
}
}

impl Server {
Expand Down Expand Up @@ -81,6 +88,8 @@ impl Server {
pub fn run(self) -> Listening {
let (tx, rx) = shutdown_signal();
let (addr_tx, addr_rx) = oneshot::channel();
let conn_count = Arc::new(Mutex::new(0));
let srv_conn_count = Arc::clone(&conn_count);
::std::thread::Builder::new().name("support server".into()).spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
Expand All @@ -93,7 +102,11 @@ impl Server {

Box::new(move |sock| {
let h1_clone = h1.clone();
let srv_conn_count = Arc::clone(&srv_conn_count);
let conn = new_svc.new_service()
.inspect(move |_| {
*(srv_conn_count.lock().unwrap()) += 1;
})
.from_err()
.and_then(move |svc| h1_clone.serve_connection(sock, svc))
.map(|_| ())
Expand All @@ -108,8 +121,12 @@ impl Server {
reactor.clone(),
);
Box::new(move |sock| {
let srv_conn_count = Arc::clone(&srv_conn_count);
let conn = h2.serve(sock)
.map_err(|e| println!("server h2 error: {:?}", e));
.map_err(|e| println!("server h2 error: {:?}", e))
.inspect(move |_| {
*(srv_conn_count.lock().unwrap()) += 1;
});
Box::new(conn)
})
},
Expand All @@ -122,7 +139,7 @@ impl Server {
let _ = addr_tx.send(local_addr);

let serve = bind.incoming()
.fold((srv, reactor), |(srv, reactor), (sock, _)| {
.fold((srv, reactor), move |(srv, reactor), (sock, _)| {
if let Err(e) = sock.set_nodelay(true) {
return Err(e);
}
Expand All @@ -145,6 +162,7 @@ impl Server {
Listening {
addr,
shutdown: tx,
conn_count,
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion proxy/tests/support/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use support::*;

use std::collections::VecDeque;
use std::io;
use std::sync::{Arc, Mutex};

use self::futures::sync::{mpsc, oneshot};
use self::tokio_core::net::TcpStream;
Expand Down Expand Up @@ -151,6 +152,8 @@ fn run_client(addr: SocketAddr) -> TcpSender {
fn run_server(tcp: TcpServer) -> server::Listening {
let (tx, rx) = shutdown_signal();
let (addr_tx, addr_rx) = oneshot::channel();
let conn_count = Arc::new(Mutex::new(0));
let srv_conn_count = Arc::clone(&conn_count);
::std::thread::Builder::new().name("support server".into()).spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
Expand All @@ -165,7 +168,7 @@ fn run_server(tcp: TcpServer) -> server::Listening {

let work = bind.incoming().for_each(move |(sock, _)| {
let cb = accepts.pop_front().expect("no more accepts");

*(srv_conn_count.lock().unwrap()) += 1;
let fut = tokio_io::io::read(sock, vec![0; 1024])
.and_then(move |(sock, mut vec, n)| {
vec.truncate(n);
Expand All @@ -186,5 +189,6 @@ fn run_server(tcp: TcpServer) -> server::Listening {
server::Listening {
addr,
shutdown: tx,
conn_count,
}
}
36 changes: 36 additions & 0 deletions proxy/tests/transparency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,39 @@ fn http1_get_doesnt_add_transfer_encoding() {
let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local");
assert_eq!(client.get("/"), "hello h1");
}

#[test]
fn http1_one_connection_per_host() {
let _ = env_logger::try_init();

let srv = server::http1().route("/", "hello").run();
let ctrl = controller::new()
.run();
let proxy = proxy::new().controller(ctrl).inbound(srv).run();

let client = client::http1(proxy.inbound, "foo.bar");

let res1 = client.request(client.request_builder("/")
.version(http::Version::HTTP_11)
.header("host", "foo.bar")
);
assert_eq!(res1.status(), http::StatusCode::OK);
assert_eq!(res1.version(), http::Version::HTTP_11);
let res1 = client.request(client.request_builder("/")
.version(http::Version::HTTP_11)
.header("host", "foo.bar")
);
assert_eq!(res1.status(), http::StatusCode::OK);
assert_eq!(res1.version(), http::Version::HTTP_11);

let client = client::http1(proxy.inbound, "bar.baz");
let res2 = client.request(client.request_builder("/")
.version(http::Version::HTTP_11)
.header("host", "bar.baz"));
assert_eq!(res2.status(), http::StatusCode::OK);
assert_eq!(res2.version(), http::Version::HTTP_11);

let inbound = &proxy.inbound_server.as_ref()
.expect("no inbound server!");
assert_eq!(inbound.connections(), 2);
}

0 comments on commit 1fdbc9d

Please sign in to comment.