Skip to content

Commit

Permalink
Use Mux State in HTTP Session:
Browse files Browse the repository at this point in the history
- Parameterize Mux with the Front type
- Replace kawa_h1 State by mux in HttpStateMachine
- Handle 1xx in Server ConnectionH1
- Add Upgrade to MuxResult to allow h1 to ws upgrade (implement
  upgrade_mux in HTTP Session)
- Implement Mux::shutting_down
- Handle https redirection in mux::Router::connect
- Implement a working 301 default answer
- Use the new default answer factory in e2e tests
- Fix front/back readiness for h1<->h1 Mux connections

Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Sep 19, 2023
1 parent 60ee968 commit 9814a6b
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 115 deletions.
1 change: 1 addition & 0 deletions e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust-version = "1.70.0"
edition = "2021"

[dependencies]
kawa = "0.6.3"
futures = "^0.3.28"
hyper = { version = "^0.14.27", features = ["client", "http1"] }
hyper-rustls = { version = "^0.24.1", default-features = false, features = ["webpki-tokio", "http1", "tls12", "logging"] }
Expand Down
24 changes: 15 additions & 9 deletions e2e/src/http_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ pub fn http_request<S1: Into<String>, S2: Into<String>, S3: Into<String>, S4: In
)
}

// the default value for the 404 error, as provided in the command lib,
// used as default for listeners
pub fn default_404_answer() -> String {
String::from(include_str!("../../../command/assets/404.html"))
}
use std::io::Write;
use kawa;

// the default value for the 503 error, as provided in the command lib,
// used as default for listeners
pub fn default_503_answer() -> String {
String::from(include_str!("../../../command/assets/503.html"))
/// the default kawa answer for the error code provided, converted to HTTP/1.1
pub fn default_answer(code: u16) -> String {
let mut kawa_answer = kawa::Kawa::new(
kawa::Kind::Response,
kawa::Buffer::new(kawa::SliceBuffer(&mut [])),
);
sozu_lib::protocol::mux::fill_default_answer(&mut kawa_answer, code);
kawa_answer.prepare(&mut kawa::h1::converter::H1BlockConverter);
let out = kawa_answer.as_io_slice();
let mut writer = std::io::BufWriter::new(Vec::new());
writer.write_vectored(&out).expect("WRITE");
let result = unsafe { std::str::from_utf8_unchecked(writer.buffer()) };
result.to_string()
}
33 changes: 17 additions & 16 deletions e2e/src/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sozu_command_lib::{
};

use crate::{
http_utils::{default_404_answer, default_503_answer, http_ok_response, http_request},
http_utils::{default_answer, http_ok_response, http_request},
mock::{
aggregator::SimpleAggregator,
async_backend::BackendHandle as AsyncBackend,
Expand Down Expand Up @@ -672,7 +672,7 @@ fn try_http_behaviors() -> State {

let response = client.receive();
println!("response: {response:?}");
assert_eq!(response, Some(default_404_answer()));
assert_eq!(response, Some(default_answer(404)));
assert_eq!(client.receive(), None);

worker.send_proxy_request_type(RequestType::AddHttpFrontend(RequestHttpFrontend {
Expand All @@ -687,7 +687,7 @@ fn try_http_behaviors() -> State {

let response = client.receive();
println!("response: {response:?}");
assert_eq!(response, Some(default_503_answer()));
assert_eq!(response, Some(default_answer(503)));
assert_eq!(client.receive(), None);

let back_address = create_local_address();
Expand All @@ -705,12 +705,9 @@ fn try_http_behaviors() -> State {
client.connect();
client.send();

let expected_response = String::from(
"HTTP/1.1 400 Bad Request\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n",
);
let response = client.receive();
println!("response: {response:?}");
assert_eq!(response, Some(expected_response));
assert_eq!(response, Some(default_answer(400)));
assert_eq!(client.receive(), None);

let mut backend = SyncBackend::new("backend", back_address, "TEST\r\n\r\n");
Expand All @@ -724,13 +721,10 @@ fn try_http_behaviors() -> State {
let request = backend.receive(0);
backend.send(0);

let expected_response = String::from(
"HTTP/1.1 502 Bad Gateway\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n",
);
let response = client.receive();
println!("request: {request:?}");
println!("response: {response:?}");
assert_eq!(response, Some(expected_response));
assert_eq!(response, Some(default_answer(502)));
assert_eq!(client.receive(), None);

info!("expecting 200");
Expand Down Expand Up @@ -782,7 +776,8 @@ fn try_http_behaviors() -> State {
&& response.ends_with(&expected_response_end)
);

info!("server closes, expecting 503");
// FIXME: do we want 502 or 503???
info!("server closes, expecting 502");
// TODO: what if the client continue to use the closed stream
client.connect();
client.send();
Expand All @@ -793,7 +788,7 @@ fn try_http_behaviors() -> State {
let response = client.receive();
println!("request: {request:?}");
println!("response: {response:?}");
assert_eq!(response, Some(default_503_answer()));
assert_eq!(response, Some(default_answer(502)));
assert_eq!(client.receive(), None);

worker.send_proxy_request_type(RequestType::RemoveBackend(RemoveBackend {
Expand Down Expand Up @@ -1200,7 +1195,9 @@ pub fn try_stick() -> State {
backend1.send(0);
let response = client.receive();
println!("response: {response:?}");
assert!(request.unwrap().starts_with("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\nX-Forwarded-For:"));
assert!(request.unwrap().starts_with(
"GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\n"
));
assert!(response.unwrap().starts_with("HTTP/1.1 200 OK\r\nContent-Length: 5\r\nSet-Cookie: SOZUBALANCEID=sticky_cluster_0-0; Path=/\r\nSozu-Id:"));

// invalid sticky_session
Expand All @@ -1213,7 +1210,9 @@ pub fn try_stick() -> State {
backend2.send(0);
let response = client.receive();
println!("response: {response:?}");
assert!(request.unwrap().starts_with("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\nX-Forwarded-For:"));
assert!(request.unwrap().starts_with(
"GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\n"
));
assert!(response.unwrap().starts_with("HTTP/1.1 200 OK\r\nContent-Length: 5\r\nSet-Cookie: SOZUBALANCEID=sticky_cluster_0-1; Path=/\r\nSozu-Id:"));

// good sticky_session (force use backend2, round-robin would have chosen backend1)
Expand All @@ -1226,7 +1225,9 @@ pub fn try_stick() -> State {
backend2.send(0);
let response = client.receive();
println!("response: {response:?}");
assert!(request.unwrap().starts_with("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\nX-Forwarded-For:"));
assert!(request.unwrap().starts_with(
"GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\n"
));
assert!(response
.unwrap()
.starts_with("HTTP/1.1 200 OK\r\nContent-Length: 5\r\nSozu-Id:"));
Expand Down
124 changes: 103 additions & 21 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
answers::HttpAnswers,
parser::{hostname_and_port, Method},
},
mux::{self, Mux},
proxy_protocol::expect::ExpectProxyProtocol,
Http, Pipe, SessionState,
},
Expand Down Expand Up @@ -66,7 +67,8 @@ StateMachineBuilder! {
/// 3. WebSocket (passthrough)
enum HttpStateMachine impl SessionState {
Expect(ExpectProxyProtocol<TcpStream>),
Http(Http<TcpStream, HttpListener>),
// Http(Http<TcpStream, HttpListener>),
Mux(Mux<TcpStream>),
WebSocket(Pipe<TcpStream, HttpListener>),
}
}
Expand Down Expand Up @@ -125,22 +127,36 @@ impl HttpSession {
gauge_add!("protocol.http", 1);
let session_address = sock.peer_addr().ok();

HttpStateMachine::Http(Http::new(
answers.clone(),
configured_backend_timeout,
configured_connect_timeout,
configured_frontend_timeout,
container_frontend_timeout,
sock,
token,
listener.clone(),
pool.clone(),
Protocol::HTTP,
let mut context = mux::Context::new(pool.clone());
context
.create_stream(request_id, 1 << 16)
.ok_or(AcceptError::BufferCapacityReached)?;
let frontend = mux::Connection::new_h1_server(sock);
HttpStateMachine::Mux(Mux {
frontend_token: token,
frontend,
router: mux::Router::new(listener.clone()),
public_address,
request_id,
session_address,
sticky_name.clone(),
)?)
peer_address: session_address,
sticky_name: sticky_name.clone(),
context,
})
// HttpStateMachine::Http(Http::new(
// answers.clone(),
// configured_backend_timeout,
// configured_connect_timeout,
// configured_frontend_timeout,
// container_frontend_timeout,
// sock,
// token,
// listener.clone(),
// pool.clone(),
// Protocol::HTTP,
// public_address,
// request_id,
// session_address,
// sticky_name.clone(),
// )?)
};

let metrics = SessionMetrics::new(Some(wait_time));
Expand All @@ -164,7 +180,8 @@ impl HttpSession {
pub fn upgrade(&mut self) -> SessionIsToBeClosed {
debug!("HTTP::upgrade");
let new_state = match self.state.take() {
HttpStateMachine::Http(http) => self.upgrade_http(http),
// HttpStateMachine::Http(http) => self.upgrade_http(http),
HttpStateMachine::Mux(mux) => self.upgrade_mux(mux),
HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
HttpStateMachine::FailedUpgrade(_) => unreachable!(),
Expand Down Expand Up @@ -212,7 +229,8 @@ impl HttpSession {

gauge_add!("protocol.proxy.expect", -1);
gauge_add!("protocol.http", 1);
Some(HttpStateMachine::Http(http))
unimplemented!();
// Some(HttpStateMachine::Http(http))
}
_ => None,
}
Expand All @@ -222,7 +240,7 @@ impl HttpSession {
debug!("http switching to ws");
let front_token = self.frontend_token;
let back_token = unwrap_msg!(http.backend_token);
let ws_context = http.websocket_context();
let ws_context = http.context.websocket_context();

let mut container_frontend_timeout = http.container_frontend_timeout;
let mut container_backend_timeout = http.container_backend_timeout;
Expand Down Expand Up @@ -258,6 +276,69 @@ impl HttpSession {
Some(HttpStateMachine::WebSocket(pipe))
}

fn upgrade_mux(&mut self, mut mux: Mux<TcpStream>) -> Option<HttpStateMachine> {
debug!("mux switching to ws");
let stream = mux.context.streams.pop().unwrap();

let (frontend_readiness, frontend_socket) = match mux.frontend {
mux::Connection::H1(mux::ConnectionH1 {
readiness, socket, ..
}) => (readiness, socket),
// only h1<->h1 connections can upgrade to websocket
mux::Connection::H2(_) => unreachable!(),
};

let mux::StreamState::Linked(back_token) = stream.state else { unreachable!() };
let backend = mux.router.backends.remove(&back_token).unwrap();
let (cluster_id, backend_readiness, backend_socket) = match backend {
mux::Connection::H1(mux::ConnectionH1 {
position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)),
readiness,
socket,
..
}) => (cluster_id, readiness, socket),
// the backend disconnected just after upgrade, abort
mux::Connection::H1(_) => return None,
// only h1<->h1 connections can upgrade to websocket
mux::Connection::H2(_) => unreachable!(),
};

let ws_context = stream.context.websocket_context();

// let mut container_frontend_timeout = http.container_frontend_timeout;
// let mut container_backend_timeout = http.container_backend_timeout;
// container_frontend_timeout.reset();
// container_backend_timeout.reset();

let mut pipe = Pipe::new(
stream.back.storage.buffer,
None,
Some(backend_socket),
None,
None,
None,
Some(cluster_id),
stream.front.storage.buffer,
self.frontend_token,
frontend_socket,
self.listener.clone(),
Protocol::HTTP,
stream.context.id,
stream.context.session_address,
Some(ws_context),
);

pipe.frontend_readiness.event = frontend_readiness.event;
pipe.backend_readiness.event = backend_readiness.event;
pipe.set_back_token(back_token);

gauge_add!("protocol.http", -1);
gauge_add!("protocol.ws", 1);
gauge_add!("http.active_requests", -1);
gauge_add!("websocket.active_requests", 1);
Some(HttpStateMachine::WebSocket(pipe))
}

fn upgrade_websocket(&self, ws: Pipe<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
// what do we do here?
error!("Upgrade called on WS, this should not happen");
Expand All @@ -277,7 +358,8 @@ impl ProxySession for HttpSession {
// Restore gauges
match self.state.marker() {
StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1),
StateMarker::Http => gauge_add!("protocol.http", -1),
// StateMarker::Http => gauge_add!("protocol.http", -1),
StateMarker::Mux => gauge_add!("protocol.http", -1),
StateMarker::WebSocket => {
gauge_add!("protocol.ws", -1);
gauge_add!("websocket.active_requests", -1);
Expand Down Expand Up @@ -1393,7 +1475,7 @@ mod tests {
);
println!("http client write: {w:?}");

let expected_answer = "HTTP/1.1 301 Moved Permanently\r\nContent-Length: 0\r\nLocation: https://localhost/redirected?true\r\n\r\n";
let expected_answer = "HTTP/1.1 301 Moved Permanently\r\nLocation: https://localhost/redirected?true\r\nContent-Length: 0\r\n\r\n";
let mut buffer = [0; 4096];
let mut index = 0;
loop {
Expand Down
12 changes: 4 additions & 8 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
answers::HttpAnswers,
parser::{hostname_and_port, Method},
},
mux::Mux,
mux::{self, Mux},
proxy_protocol::expect::ExpectProxyProtocol,
rustls::TlsHandshake,
Http, Pipe, SessionState,
Expand Down Expand Up @@ -87,7 +87,7 @@ StateMachineBuilder! {
enum HttpsStateMachine impl SessionState {
Expect(ExpectProxyProtocol<MioTcpStream>, ServerConnection),
Handshake(TlsHandshake),
Mux(Mux),
Mux(Mux<FrontRustls>),
Http(Http<FrontRustls, HttpsListener>),
WebSocket(Pipe<FrontRustls, HttpsListener>),
Http2(Http2<FrontRustls>) -> todo!("H2"),
Expand Down Expand Up @@ -279,7 +279,6 @@ impl HttpsSession {

gauge_add!("protocol.tls.handshake", -1);

use crate::protocol::mux;
let mut context = mux::Context::new(self.pool.clone());
let mut frontend = match alpn {
AlpnProtocol::Http11 => {
Expand All @@ -293,10 +292,7 @@ impl HttpsSession {
frontend_token: self.frontend_token,
frontend,
context,
router: mux::Router {
listener: self.listener.clone(),
backends: HashMap::new(),
},
router: mux::Router::new(self.listener.clone()),
public_address: self.public_address,
peer_address: self.peer_address,
sticky_name: self.sticky_name.clone(),
Expand All @@ -307,7 +303,7 @@ impl HttpsSession {
debug!("https switching to wss");
let front_token = self.frontend_token;
let back_token = unwrap_msg!(http.backend_token);
let ws_context = http.websocket_context();
let ws_context = http.context.websocket_context();

let mut container_frontend_timeout = http.container_frontend_timeout;
let mut container_backend_timeout = http.container_backend_timeout;
Expand Down
2 changes: 2 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ pub enum RetrieveClusterError {
NoPath,
#[error("unauthorized route")]
UnauthorizedRoute,
#[error("https redirect")]
HttpsRedirect,
#[error("failed to retrieve the frontend for the request: {0}")]
RetrieveFrontend(FrontendFromRequestError),
}
Expand Down
Loading

0 comments on commit 9814a6b

Please sign in to comment.