Skip to content

Commit

Permalink
Move Http09Server to its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
Andy Grover committed Jul 14, 2020
1 parent 4063034 commit e928348
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 159 deletions.
165 changes: 6 additions & 159 deletions neqo-http3-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ use std::time::{Duration, Instant};
use mio::net::UdpSocket;
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::timer::{Builder, Timeout, Timer};
use regex::Regex;
use structopt::StructOpt;

use neqo_common::{qdebug, qinfo, Datagram};
use neqo_crypto::{init_db, AntiReplay, ZeroRttCheckResult, ZeroRttChecker};
use neqo_crypto::{init_db, AntiReplay};
use neqo_http3::{Error, Http3Server, Http3ServerEvent};
use neqo_qpack::QpackSettings;
use neqo_transport::server::{ActiveConnectionRef, Server};
use neqo_transport::{ConnectionEvent, ConnectionIdManager, FixedConnectionIdManager, Output};
use neqo_transport::{FixedConnectionIdManager, Output};

use crate::old_https::Http09Server;

const TIMER_TOKEN: Token = Token(0xffff_ffff);

mod old_https;

#[derive(Debug, StructOpt)]
#[structopt(name = "neqo-http3-server", about = "A basic HTTP3 server.")]
struct Args {
Expand Down Expand Up @@ -207,161 +209,6 @@ impl HttpServer for Http3Server {
}
}

#[derive(Clone, Debug)]
struct DenyZeroRttChecker {}

impl ZeroRttChecker for DenyZeroRttChecker {
fn check(&self, _token: &[u8]) -> ZeroRttCheckResult {
ZeroRttCheckResult::Reject
}
}

#[derive(Default)]
struct Http09ConnState {
writable: bool,
data_to_send: Option<(Vec<u8>, usize)>,
}

struct Http09Server {
server: Server,
conn_state: HashMap<(ActiveConnectionRef, u64), Http09ConnState>,
}

impl Http09Server {
fn new(
now: Instant,
certs: &[impl AsRef<str>],
protocols: &[impl AsRef<str>],
anti_replay: AntiReplay,
cid_manager: Rc<RefCell<dyn ConnectionIdManager>>,
) -> Result<Self, Error> {
Ok(Self {
server: Server::new(
now,
certs,
protocols,
anti_replay,
Box::new(DenyZeroRttChecker {}),
cid_manager,
)?,
conn_state: HashMap::new(),
})
}

fn stream_readable(&mut self, stream_id: u64, mut conn: &mut ActiveConnectionRef, args: &Args) {
if stream_id % 4 != 0 {
eprintln!("Stream {} not client-initiated bidi, ignoring", stream_id);
return;
}
let mut data = vec![0; 4000];
conn.borrow_mut()
.stream_recv(stream_id, &mut data)
.expect("Read should succeed");
let msg = match String::from_utf8(data) {
Ok(s) => s,
Err(_e) => {
eprintln!("invalid string. Is this HTTP 0.9?");
conn.borrow_mut().stream_close_send(stream_id).unwrap();
return;
}
};
let re = if args.qns_mode {
Regex::new(r"GET +/(\S+)(\r)?\n").unwrap()
} else {
Regex::new(r"GET +/(\d+)(\r)?\n").unwrap()
};
let m = re.captures(&msg);
let resp = match m.and_then(|m| m.get(1)) {
None => Some(b"Hello World".to_vec()),
Some(path) => {
let path = path.as_str();
eprintln!("Path = '{}'", path);
if args.qns_mode {
qns_read_response(path)
} else {
let count = usize::from_str_radix(path, 10).unwrap();
Some(vec![b'a'; count])
}
}
};
let conn_state = self.conn_state.get_mut(&(conn.clone(), stream_id)).unwrap();
conn_state.data_to_send = resp.map(|r| (r, 0));
if conn_state.writable {
self.stream_writable(stream_id, &mut conn);
}
}

fn stream_writable(&mut self, stream_id: u64, conn: &mut ActiveConnectionRef) {
match self.conn_state.get_mut(&(conn.clone(), stream_id)) {
None => {
eprintln!("Unknown stream {}, ignoring event", stream_id);
}
Some(conn_state) => {
conn_state.writable = true;
if let Some((data, mut offset)) = &mut conn_state.data_to_send {
let sent = conn
.borrow_mut()
.stream_send(stream_id, &data[offset..])
.unwrap();
eprintln!("Wrote {}", sent);
offset += sent;
if offset == data.len() {
eprintln!("Sent {} on {}, closing", sent, stream_id);
conn.borrow_mut().stream_close_send(stream_id).unwrap();
self.conn_state.remove(&(conn.clone(), stream_id));
} else {
conn_state.writable = false;
}
}
}
}
}
}

impl HttpServer for Http09Server {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
self.server.process(dgram, now)
}

fn process_events(&mut self, args: &Args) {
let active_conns = self.server.active_connections();
for mut acr in active_conns {
loop {
let event = match acr.borrow_mut().next_event() {
None => break,
Some(e) => e,
};
match event {
ConnectionEvent::NewStream { stream_id } => {
self.conn_state.insert(
(acr.clone(), stream_id.as_u64()),
Http09ConnState::default(),
);
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
self.stream_readable(stream_id, &mut acr, args);
}
ConnectionEvent::SendStreamWritable { stream_id } => {
self.stream_writable(stream_id.as_u64(), &mut acr);
}
ConnectionEvent::StateChange { .. } => {}
e => eprintln!("unhandled event {:?}", e),
}
}
}
}

fn set_qlog_dir(&mut self, dir: Option<PathBuf>) {
self.server.set_qlog_dir(dir)
}
}

impl Display for Http09Server {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "Http 0.9 server ")
}
}

fn process(
server: &mut Box<dyn HttpServer>,
svr_timeout: &mut Option<Timeout>,
Expand Down
180 changes: 180 additions & 0 deletions neqo-http3-server/src/old_https.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![cfg_attr(feature = "deny-warnings", deny(warnings))]
#![warn(clippy::use_self)]

use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Display;
use std::path::PathBuf;
use std::rc::Rc;
use std::time::Instant;

use regex::Regex;

use neqo_common::Datagram;
use neqo_crypto::{AntiReplay, ZeroRttCheckResult, ZeroRttChecker};
use neqo_http3::Error;
use neqo_transport::server::{ActiveConnectionRef, Server};
use neqo_transport::{ConnectionEvent, ConnectionIdManager, Output};

use super::{qns_read_response, Args, HttpServer};

#[derive(Clone, Debug)]
struct DenyZeroRttChecker {}

impl ZeroRttChecker for DenyZeroRttChecker {
fn check(&self, _token: &[u8]) -> ZeroRttCheckResult {
ZeroRttCheckResult::Reject
}
}

#[derive(Default)]
struct Http09ConnState {
writable: bool,
data_to_send: Option<(Vec<u8>, usize)>,
}

pub struct Http09Server {
server: Server,
conn_state: HashMap<(ActiveConnectionRef, u64), Http09ConnState>,
}

impl Http09Server {
pub fn new(
now: Instant,
certs: &[impl AsRef<str>],
protocols: &[impl AsRef<str>],
anti_replay: AntiReplay,
cid_manager: Rc<RefCell<dyn ConnectionIdManager>>,
) -> Result<Self, Error> {
Ok(Self {
server: Server::new(
now,
certs,
protocols,
anti_replay,
Box::new(DenyZeroRttChecker {}),
cid_manager,
)?,
conn_state: HashMap::new(),
})
}

fn stream_readable(&mut self, stream_id: u64, mut conn: &mut ActiveConnectionRef, args: &Args) {
if stream_id % 4 != 0 {
eprintln!("Stream {} not client-initiated bidi, ignoring", stream_id);
return;
}
let mut data = vec![0; 4000];
conn.borrow_mut()
.stream_recv(stream_id, &mut data)
.expect("Read should succeed");
let msg = match String::from_utf8(data) {
Ok(s) => s,
Err(_e) => {
eprintln!("invalid string. Is this HTTP 0.9?");
conn.borrow_mut().stream_close_send(stream_id).unwrap();
return;
}
};
let re = if args.qns_mode {
Regex::new(r"GET +/(\S+)(\r)?\n").unwrap()
} else {
Regex::new(r"GET +/(\d+)(\r)?\n").unwrap()
};
let m = re.captures(&msg);
let resp = match m.and_then(|m| m.get(1)) {
None => Some(b"Hello World".to_vec()),
Some(path) => {
let path = path.as_str();
eprintln!("Path = '{}'", path);
if args.qns_mode {
qns_read_response(path)
} else {
let count = usize::from_str_radix(path, 10).unwrap();
Some(vec![b'a'; count])
}
}
};
let conn_state = self.conn_state.get_mut(&(conn.clone(), stream_id)).unwrap();
conn_state.data_to_send = resp.map(|r| (r, 0));
if conn_state.writable {
self.stream_writable(stream_id, &mut conn);
}
}

fn stream_writable(&mut self, stream_id: u64, conn: &mut ActiveConnectionRef) {
match self.conn_state.get_mut(&(conn.clone(), stream_id)) {
None => {
eprintln!("Unknown stream {}, ignoring event", stream_id);
}
Some(conn_state) => {
conn_state.writable = true;
if let Some((data, mut offset)) = &mut conn_state.data_to_send {
let sent = conn
.borrow_mut()
.stream_send(stream_id, &data[offset..])
.unwrap();
eprintln!("Wrote {}", sent);
offset += sent;
if offset == data.len() {
eprintln!("Sent {} on {}, closing", sent, stream_id);
conn.borrow_mut().stream_close_send(stream_id).unwrap();
self.conn_state.remove(&(conn.clone(), stream_id));
} else {
conn_state.writable = false;
}
}
}
}
}
}

impl HttpServer for Http09Server {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
self.server.process(dgram, now)
}

fn process_events(&mut self, args: &Args) {
let active_conns = self.server.active_connections();
for mut acr in active_conns {
loop {
let event = match acr.borrow_mut().next_event() {
None => break,
Some(e) => e,
};
match event {
ConnectionEvent::NewStream { stream_id } => {
self.conn_state.insert(
(acr.clone(), stream_id.as_u64()),
Http09ConnState::default(),
);
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
self.stream_readable(stream_id, &mut acr, args);
}
ConnectionEvent::SendStreamWritable { stream_id } => {
self.stream_writable(stream_id.as_u64(), &mut acr);
}
ConnectionEvent::StateChange { .. } => {}
e => eprintln!("unhandled event {:?}", e),
}
}
}
}

fn set_qlog_dir(&mut self, dir: Option<PathBuf>) {
self.server.set_qlog_dir(dir)
}
}

impl Display for Http09Server {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "Http 0.9 server ")
}
}

0 comments on commit e928348

Please sign in to comment.