diff --git a/neqo-http3-server/Cargo.toml b/neqo-http3-server/Cargo.toml index 50105f6eca..d6b3afa181 100644 --- a/neqo-http3-server/Cargo.toml +++ b/neqo-http3-server/Cargo.toml @@ -12,6 +12,7 @@ neqo-common = { path="./../neqo-common" } neqo-http3 = { path = "./../neqo-http3" } neqo-qpack = { path = "./../neqo-qpack" } structopt = "0.3.7" +regex = "1" mio = "0.6.17" mio-extras = "2.0.5" log = {version = "0.4.0", default-features = false} diff --git a/neqo-http3-server/src/main.rs b/neqo-http3-server/src/main.rs index c01eae947b..fb3a46a255 100644 --- a/neqo-http3-server/src/main.rs +++ b/neqo-http3-server/src/main.rs @@ -10,6 +10,7 @@ use std::cell::RefCell; use std::collections::HashMap; use std::env; +use std::fmt::Display; use std::fs::OpenOptions; use std::io; use std::io::Read; @@ -22,13 +23,15 @@ use std::time::{Duration, Instant}; use mio::net::UdpSocket; use mio::{Events, Poll, PollOpt, Ready, Token}; use mio_extras::timer::{Builder, Timeout, Timer}; +use regex::Regex; use structopt::StructOpt; use neqo_common::{qdebug, qinfo, Datagram}; -use neqo_crypto::{init_db, AntiReplay}; +use neqo_crypto::{init_db, AntiReplay, ZeroRttCheckResult, ZeroRttChecker}; use neqo_http3::{Error, Http3Server, Http3ServerEvent}; use neqo_qpack::QpackSettings; -use neqo_transport::{FixedConnectionIdManager, Output}; +use neqo_transport::server::{ActiveConnectionRef, Server}; +use neqo_transport::{ConnectionEvent, ConnectionIdManager, FixedConnectionIdManager, Output}; const TIMER_TOKEN: Token = Token(0xffff_ffff); @@ -83,6 +86,10 @@ struct Args { #[structopt(name = "qns-mode", long)] qns_mode: bool, + + #[structopt(name = "use-old-http", short = "o", long)] + /// Use http 0.9 instead of HTTP/3 + use_old_http: bool, } impl Args { @@ -95,98 +102,268 @@ impl Args { } } -fn process_events(server: &mut Http3Server, args: &Args) { - while let Some(event) = server.next_event() { - eprintln!("Event: {:?}", event); - match event { - Http3ServerEvent::Headers { - mut request, - headers, - fin, - } => { - println!("Headers (request={} fin={}): {:?}", request, fin, headers); - - let default_ret = b"Hello World".to_vec(); - - let response = headers.and_then(|h| { - h.iter().find(|&(k, _)| k == ":path").and_then(|(_, path)| { - if args.qns_mode { - let mut file_path = PathBuf::from("/www"); - file_path.push(path.trim_matches(|p| p == '/')); - - OpenOptions::new() - .read(true) - .open(&file_path) - .map_err(|_e| eprintln!("Could not open {}", file_path.display())) - .ok() - .and_then(|mut f| { - let mut data = Vec::new(); - match f.read_to_end(&mut data) { - Ok(sz) => { - println!( - "{} bytes read from {}", - sz, - file_path.display() - ); - Some(data) - } - Err(e) => { - eprintln!("Error reading data: {:?}", e); - None - } - } - }) - } else { - match path.trim_matches(|p| p == '/').parse::() { - Ok(v) => Some(vec![b'a'; v]), - Err(_) => Some(default_ret), +fn emit_packets(sockets: &mut Vec, out_dgrams: &HashMap>) { + for s in sockets { + if let Some(dgrams) = out_dgrams.get(&s.local_addr().unwrap()) { + for d in dgrams { + let sent = s + .send_to(d, &d.destination()) + .expect("Error sending datagram"); + if sent != d.len() { + eprintln!("Unable to send all {} bytes of datagram", d.len()); + } + } + } + } +} + +fn qns_read_response(filename: &str) -> Option> { + let mut file_path = PathBuf::from("/www"); + file_path.push(filename.trim_matches(|p| p == '/')); + + OpenOptions::new() + .read(true) + .open(&file_path) + .map_err(|_e| eprintln!("Could not open {}", file_path.display())) + .ok() + .and_then(|mut f| { + let mut data = Vec::new(); + match f.read_to_end(&mut data) { + Ok(sz) => { + println!("{} bytes read from {}", sz, file_path.display()); + Some(data) + } + Err(e) => { + eprintln!("Error reading data: {:?}", e); + None + } + } + }) +} + +trait HttpServer: Display { + fn process(&mut self, dgram: Option, now: Instant) -> Output; + fn process_events(&mut self, args: &Args); + fn set_qlog_dir(&mut self, dir: Option); +} + +impl HttpServer for Http3Server { + fn process(&mut self, dgram: Option, now: Instant) -> Output { + self.process(dgram, now) + } + + fn process_events(&mut self, args: &Args) { + while let Some(event) = self.next_event() { + match event { + Http3ServerEvent::Headers { + mut request, + headers, + fin, + } => { + println!("Headers (request={} fin={}): {:?}", request, fin, headers); + + let default_ret = b"Hello World".to_vec(); + + let response = headers.and_then(|h| { + h.iter().find(|&(k, _)| k == ":path").and_then(|(_, path)| { + if args.qns_mode { + qns_read_response(path) + } else { + match path.trim_matches(|p| p == '/').parse::() { + Ok(v) => Some(vec![b'a'; v]), + Err(_) => Some(default_ret), + } } - } - }) - }); + }) + }); - if response.is_none() { - let _ = request.stream_reset(Error::HttpRequestIncomplete.code()); - continue; + if response.is_none() { + let _ = request.stream_reset(Error::HttpRequestIncomplete.code()); + continue; + } + + let response = response.unwrap(); + + request + .set_response( + &[ + (String::from(":status"), String::from("200")), + (String::from("content-length"), response.len().to_string()), + ], + &response, + ) + .unwrap(); + } + Http3ServerEvent::Data { request, data, fin } => { + println!("Data (request={} fin={}): {:?}", request, fin, data); } + _ => {} + } + } + } + + fn set_qlog_dir(&mut self, dir: Option) { + Http3Server::set_qlog_dir(self, dir) + } +} + +#[derive(Clone, Debug)] +struct DenyZeroRttChecker {} + +impl ZeroRttChecker for DenyZeroRttChecker { + fn check(&self, _token: &[u8]) -> ZeroRttCheckResult { + ZeroRttCheckResult::Reject + } +} + +#[derive(Default)] +struct Http09ConnState { + writable: bool, + data_to_send: Option<(Vec, usize)>, +} - let response = response.unwrap(); - - request - .set_response( - &[ - (String::from(":status"), String::from("200")), - (String::from("content-length"), response.len().to_string()), - ], - &response, - ) - .unwrap(); +struct Http09Server { + server: Server, + conn_state: HashMap<(ActiveConnectionRef, u64), Http09ConnState>, +} + +impl Http09Server { + fn new( + now: Instant, + certs: &[impl AsRef], + protocols: &[impl AsRef], + anti_replay: AntiReplay, + cid_manager: Rc>, + ) -> Result { + Ok(Self { + server: Server::new( + now, + certs, + protocols, + anti_replay, + Box::new(DenyZeroRttChecker {}), + cid_manager, + )?, + conn_state: HashMap::new(), + }) + } + + fn stream_readable(&mut self, stream_id: u64, mut conn: &mut ActiveConnectionRef, args: &Args) { + if stream_id % 4 != 0 { + eprintln!("Stream {} not client-initiated bidi, ignoring", stream_id); + return; + } + let mut data = vec![0; 4000]; + conn.borrow_mut() + .stream_recv(stream_id, &mut data) + .expect("Read should succeed"); + let msg = match String::from_utf8(data) { + Ok(s) => s, + Err(_e) => { + eprintln!("invalid string. Is this HTTP 0.9?"); + conn.borrow_mut().stream_close_send(stream_id).unwrap(); + return; } - Http3ServerEvent::Data { request, data, fin } => { - println!("Data (request={} fin={}): {:?}", request, fin, data); + }; + let re = if args.qns_mode { + Regex::new(r"GET +/(\S+)(\r)?\n").unwrap() + } else { + Regex::new(r"GET +/(\d+)(\r)?\n").unwrap() + }; + let m = re.captures(&msg); + let resp = match m.and_then(|m| m.get(1)) { + None => Some(b"Hello World".to_vec()), + Some(path) => { + let path = path.as_str(); + eprintln!("Path = '{}'", path); + if args.qns_mode { + qns_read_response(path) + } else { + let count = usize::from_str_radix(path, 10).unwrap(); + Some(vec![b'a'; count]) + } + } + }; + let conn_state = self.conn_state.get_mut(&(conn.clone(), stream_id)).unwrap(); + conn_state.data_to_send = resp.map(|r| (r, 0)); + if conn_state.writable { + self.stream_writable(stream_id, &mut conn); + } + } + + fn stream_writable(&mut self, stream_id: u64, conn: &mut ActiveConnectionRef) { + match self.conn_state.get_mut(&(conn.clone(), stream_id)) { + None => { + eprintln!("Unknown stream {}, ignoring event", stream_id); + } + Some(conn_state) => { + conn_state.writable = true; + if let Some((data, mut offset)) = &mut conn_state.data_to_send { + let sent = conn + .borrow_mut() + .stream_send(stream_id, &data[offset..]) + .unwrap(); + eprintln!("Wrote {}", sent); + offset += sent; + if offset == data.len() { + eprintln!("Sent {} on {}, closing", sent, stream_id); + conn.borrow_mut().stream_close_send(stream_id).unwrap(); + self.conn_state.remove(&(conn.clone(), stream_id)); + } else { + conn_state.writable = false; + } + } } - _ => {} } } } -fn emit_packets(sockets: &mut Vec, out_dgrams: &HashMap>) { - for s in sockets { - if let Some(dgrams) = out_dgrams.get(&s.local_addr().unwrap()) { - for d in dgrams { - let sent = s - .send_to(d, &d.destination()) - .expect("Error sending datagram"); - if sent != d.len() { - eprintln!("Unable to send all {} bytes of datagram", d.len()); +impl HttpServer for Http09Server { + fn process(&mut self, dgram: Option, now: Instant) -> Output { + self.server.process(dgram, now) + } + + fn process_events(&mut self, args: &Args) { + let active_conns = self.server.active_connections(); + for mut acr in active_conns { + loop { + let event = match acr.borrow_mut().next_event() { + None => break, + Some(e) => e, + }; + match event { + ConnectionEvent::NewStream { stream_id } => { + self.conn_state.insert( + (acr.clone(), stream_id.as_u64()), + Http09ConnState::default(), + ); + } + ConnectionEvent::RecvStreamReadable { stream_id } => { + self.stream_readable(stream_id, &mut acr, args); + } + ConnectionEvent::SendStreamWritable { stream_id } => { + self.stream_writable(stream_id.as_u64(), &mut acr); + } + ConnectionEvent::StateChange { .. } => {} + e => eprintln!("unhandled event {:?}", e), } } } } + + fn set_qlog_dir(&mut self, dir: Option) { + self.server.set_qlog_dir(dir) + } +} + +impl Display for Http09Server { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "Http 0.9 server ") + } } fn process( - server: &mut Http3Server, + server: &mut Box, svr_timeout: &mut Option, inx: usize, mut dgram: Option, @@ -223,6 +400,11 @@ fn main() -> Result<(), io::Error> { if args.qns_mode { match env::var("TESTCASE") { Ok(s) if s == "http3" => {} + Ok(s) if s == "handshake" || s == "transfer" || s == "retry" => { + args.use_old_http = true; + args.alpn = "hq-29".to_string(); + } + Ok(_) => exit(127), Err(_) => exit(1), } @@ -285,20 +467,36 @@ fn main() -> Result<(), io::Error> { local_addr, ( { - let mut svr = Http3Server::new( - Instant::now(), - &[args.key.clone()], - &[args.alpn.clone()], - AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14) - .expect("unable to setup anti-replay"), - Rc::new(RefCell::new(FixedConnectionIdManager::new(10))), - QpackSettings { - max_table_size_encoder: args.max_table_size_encoder, - max_table_size_decoder: args.max_table_size_decoder, - max_blocked_streams: args.max_blocked_streams, - }, - ) - .expect("We cannot make a server!"); + let mut svr: Box = if args.use_old_http { + Box::new( + Http09Server::new( + Instant::now(), + &[args.key.clone()], + &[args.alpn.clone()], + AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14) + .expect("unable to setup anti-replay"), + Rc::new(RefCell::new(FixedConnectionIdManager::new(10))), + ) + .expect("We cannot make a server!"), + ) + } else { + Box::new( + Http3Server::new( + Instant::now(), + &[args.key.clone()], + &[args.alpn.clone()], + AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14) + .expect("unable to setup anti-replay"), + Rc::new(RefCell::new(FixedConnectionIdManager::new(10))), + QpackSettings { + max_table_size_encoder: args.max_table_size_encoder, + max_table_size_decoder: args.max_table_size_decoder, + max_blocked_streams: args.max_blocked_streams, + }, + ) + .expect("We cannot make a server!"), + ) + }; svr.set_qlog_dir(args.qlog_dir.clone()); svr }, @@ -372,7 +570,7 @@ fn main() -> Result<(), io::Error> { out, &mut timer, ); - process_events(server, &args); + server.process_events(&args); process(server, svr_timeout, event.token().0, None, out, &mut timer); } }