diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 8716a1887..a2b33331d 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -5,6 +5,7 @@ rust-version = "1.70.0" edition = "2021" [dependencies] +kawa = "0.6.3" futures = "^0.3.28" hyper = { version = "^0.14.27", features = ["client", "http1"] } hyper-rustls = { version = "^0.24.1", default-features = false, features = ["webpki-tokio", "http1", "tls12", "logging"] } diff --git a/e2e/src/http_utils/mod.rs b/e2e/src/http_utils/mod.rs index 7ca7a5bbd..f0afbc6bf 100644 --- a/e2e/src/http_utils/mod.rs +++ b/e2e/src/http_utils/mod.rs @@ -24,14 +24,20 @@ pub fn http_request, S2: Into, S3: Into, S4: In ) } -// the default value for the 404 error, as provided in the command lib, -// used as default for listeners -pub fn default_404_answer() -> String { - String::from(include_str!("../../../command/assets/404.html")) -} +use std::io::Write; +use kawa; -// the default value for the 503 error, as provided in the command lib, -// used as default for listeners -pub fn default_503_answer() -> String { - String::from(include_str!("../../../command/assets/503.html")) +/// the default kawa answer for the error code provided, converted to HTTP/1.1 +pub fn default_answer(code: u16) -> String { + let mut kawa_answer = kawa::Kawa::new( + kawa::Kind::Response, + kawa::Buffer::new(kawa::SliceBuffer(&mut [])), + ); + sozu_lib::protocol::mux::fill_default_answer(&mut kawa_answer, code); + kawa_answer.prepare(&mut kawa::h1::converter::H1BlockConverter); + let out = kawa_answer.as_io_slice(); + let mut writer = std::io::BufWriter::new(Vec::new()); + writer.write_vectored(&out).expect("WRITE"); + let result = unsafe { std::str::from_utf8_unchecked(writer.buffer()) }; + result.to_string() } diff --git a/e2e/src/tests/tests.rs b/e2e/src/tests/tests.rs index 46a9da079..e8935f4c2 100644 --- a/e2e/src/tests/tests.rs +++ b/e2e/src/tests/tests.rs @@ -17,7 +17,7 @@ use sozu_command_lib::{ }; use crate::{ - http_utils::{default_404_answer, default_503_answer, http_ok_response, http_request}, + http_utils::{default_answer, http_ok_response, http_request}, mock::{ aggregator::SimpleAggregator, async_backend::BackendHandle as AsyncBackend, @@ -672,7 +672,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("response: {response:?}"); - assert_eq!(response, Some(default_404_answer())); + assert_eq!(response, Some(default_answer(404))); assert_eq!(client.receive(), None); worker.send_proxy_request_type(RequestType::AddHttpFrontend(RequestHttpFrontend { @@ -687,7 +687,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("response: {response:?}"); - assert_eq!(response, Some(default_503_answer())); + assert_eq!(response, Some(default_answer(503))); assert_eq!(client.receive(), None); let back_address = create_local_address(); @@ -705,12 +705,9 @@ fn try_http_behaviors() -> State { client.connect(); client.send(); - let expected_response = String::from( - "HTTP/1.1 400 Bad Request\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n", - ); let response = client.receive(); println!("response: {response:?}"); - assert_eq!(response, Some(expected_response)); + assert_eq!(response, Some(default_answer(400))); assert_eq!(client.receive(), None); let mut backend = SyncBackend::new("backend", back_address, "TEST\r\n\r\n"); @@ -724,13 +721,10 @@ fn try_http_behaviors() -> State { let request = backend.receive(0); backend.send(0); - let expected_response = String::from( - "HTTP/1.1 502 Bad Gateway\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n", - ); let response = client.receive(); println!("request: {request:?}"); println!("response: {response:?}"); - assert_eq!(response, Some(expected_response)); + assert_eq!(response, Some(default_answer(502))); assert_eq!(client.receive(), None); info!("expecting 200"); @@ -782,7 +776,8 @@ fn try_http_behaviors() -> State { && response.ends_with(&expected_response_end) ); - info!("server closes, expecting 503"); + // FIXME: do we want 502 or 503??? + info!("server closes, expecting 502"); // TODO: what if the client continue to use the closed stream client.connect(); client.send(); @@ -793,7 +788,7 @@ fn try_http_behaviors() -> State { let response = client.receive(); println!("request: {request:?}"); println!("response: {response:?}"); - assert_eq!(response, Some(default_503_answer())); + assert_eq!(response, Some(default_answer(502))); assert_eq!(client.receive(), None); worker.send_proxy_request_type(RequestType::RemoveBackend(RemoveBackend { @@ -1200,7 +1195,9 @@ pub fn try_stick() -> State { backend1.send(0); let response = client.receive(); println!("response: {response:?}"); - assert!(request.unwrap().starts_with("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\nX-Forwarded-For:")); + assert!(request.unwrap().starts_with( + "GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\n" + )); assert!(response.unwrap().starts_with("HTTP/1.1 200 OK\r\nContent-Length: 5\r\nSet-Cookie: SOZUBALANCEID=sticky_cluster_0-0; Path=/\r\nSozu-Id:")); // invalid sticky_session @@ -1213,7 +1210,9 @@ pub fn try_stick() -> State { backend2.send(0); let response = client.receive(); println!("response: {response:?}"); - assert!(request.unwrap().starts_with("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\nX-Forwarded-For:")); + assert!(request.unwrap().starts_with( + "GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\n" + )); assert!(response.unwrap().starts_with("HTTP/1.1 200 OK\r\nContent-Length: 5\r\nSet-Cookie: SOZUBALANCEID=sticky_cluster_0-1; Path=/\r\nSozu-Id:")); // good sticky_session (force use backend2, round-robin would have chosen backend1) @@ -1226,7 +1225,9 @@ pub fn try_stick() -> State { backend2.send(0); let response = client.receive(); println!("response: {response:?}"); - assert!(request.unwrap().starts_with("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\nX-Forwarded-For:")); + assert!(request.unwrap().starts_with( + "GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nCookie: foo=bar\r\n" + )); assert!(response .unwrap() .starts_with("HTTP/1.1 200 OK\r\nContent-Length: 5\r\nSozu-Id:")); diff --git a/lib/src/http.rs b/lib/src/http.rs index 99f697a6a..73ff18bb4 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -39,6 +39,7 @@ use crate::{ answers::HttpAnswers, parser::{hostname_and_port, Method}, }, + mux::{self, Mux}, proxy_protocol::expect::ExpectProxyProtocol, Http, Pipe, SessionState, }, @@ -66,7 +67,8 @@ StateMachineBuilder! { /// 3. WebSocket (passthrough) enum HttpStateMachine impl SessionState { Expect(ExpectProxyProtocol), - Http(Http), + // Http(Http), + Mux(Mux), WebSocket(Pipe), } } @@ -125,22 +127,36 @@ impl HttpSession { gauge_add!("protocol.http", 1); let session_address = sock.peer_addr().ok(); - HttpStateMachine::Http(Http::new( - answers.clone(), - configured_backend_timeout, - configured_connect_timeout, - configured_frontend_timeout, - container_frontend_timeout, - sock, - token, - listener.clone(), - pool.clone(), - Protocol::HTTP, + let mut context = mux::Context::new(pool.clone()); + context + .create_stream(request_id, 1 << 16) + .ok_or(AcceptError::BufferCapacityReached)?; + let frontend = mux::Connection::new_h1_server(sock); + HttpStateMachine::Mux(Mux { + frontend_token: token, + frontend, + router: mux::Router::new(listener.clone()), public_address, - request_id, - session_address, - sticky_name.clone(), - )?) + peer_address: session_address, + sticky_name: sticky_name.clone(), + context, + }) + // HttpStateMachine::Http(Http::new( + // answers.clone(), + // configured_backend_timeout, + // configured_connect_timeout, + // configured_frontend_timeout, + // container_frontend_timeout, + // sock, + // token, + // listener.clone(), + // pool.clone(), + // Protocol::HTTP, + // public_address, + // request_id, + // session_address, + // sticky_name.clone(), + // )?) }; let metrics = SessionMetrics::new(Some(wait_time)); @@ -164,7 +180,8 @@ impl HttpSession { pub fn upgrade(&mut self) -> SessionIsToBeClosed { debug!("HTTP::upgrade"); let new_state = match self.state.take() { - HttpStateMachine::Http(http) => self.upgrade_http(http), + // HttpStateMachine::Http(http) => self.upgrade_http(http), + HttpStateMachine::Mux(mux) => self.upgrade_mux(mux), HttpStateMachine::Expect(expect) => self.upgrade_expect(expect), HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws), HttpStateMachine::FailedUpgrade(_) => unreachable!(), @@ -212,7 +229,8 @@ impl HttpSession { gauge_add!("protocol.proxy.expect", -1); gauge_add!("protocol.http", 1); - Some(HttpStateMachine::Http(http)) + unimplemented!(); + // Some(HttpStateMachine::Http(http)) } _ => None, } @@ -222,7 +240,7 @@ impl HttpSession { debug!("http switching to ws"); let front_token = self.frontend_token; let back_token = unwrap_msg!(http.backend_token); - let ws_context = http.websocket_context(); + let ws_context = http.context.websocket_context(); let mut container_frontend_timeout = http.container_frontend_timeout; let mut container_backend_timeout = http.container_backend_timeout; @@ -258,6 +276,69 @@ impl HttpSession { Some(HttpStateMachine::WebSocket(pipe)) } + fn upgrade_mux(&mut self, mut mux: Mux) -> Option { + debug!("mux switching to ws"); + let stream = mux.context.streams.pop().unwrap(); + + let (frontend_readiness, frontend_socket) = match mux.frontend { + mux::Connection::H1(mux::ConnectionH1 { + readiness, socket, .. + }) => (readiness, socket), + // only h1<->h1 connections can upgrade to websocket + mux::Connection::H2(_) => unreachable!(), + }; + + let mux::StreamState::Linked(back_token) = stream.state else { unreachable!() }; + let backend = mux.router.backends.remove(&back_token).unwrap(); + let (cluster_id, backend_readiness, backend_socket) = match backend { + mux::Connection::H1(mux::ConnectionH1 { + position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)), + readiness, + socket, + .. + }) => (cluster_id, readiness, socket), + // the backend disconnected just after upgrade, abort + mux::Connection::H1(_) => return None, + // only h1<->h1 connections can upgrade to websocket + mux::Connection::H2(_) => unreachable!(), + }; + + let ws_context = stream.context.websocket_context(); + + // let mut container_frontend_timeout = http.container_frontend_timeout; + // let mut container_backend_timeout = http.container_backend_timeout; + // container_frontend_timeout.reset(); + // container_backend_timeout.reset(); + + let mut pipe = Pipe::new( + stream.back.storage.buffer, + None, + Some(backend_socket), + None, + None, + None, + Some(cluster_id), + stream.front.storage.buffer, + self.frontend_token, + frontend_socket, + self.listener.clone(), + Protocol::HTTP, + stream.context.id, + stream.context.session_address, + Some(ws_context), + ); + + pipe.frontend_readiness.event = frontend_readiness.event; + pipe.backend_readiness.event = backend_readiness.event; + pipe.set_back_token(back_token); + + gauge_add!("protocol.http", -1); + gauge_add!("protocol.ws", 1); + gauge_add!("http.active_requests", -1); + gauge_add!("websocket.active_requests", 1); + Some(HttpStateMachine::WebSocket(pipe)) + } + fn upgrade_websocket(&self, ws: Pipe) -> Option { // what do we do here? error!("Upgrade called on WS, this should not happen"); @@ -277,7 +358,8 @@ impl ProxySession for HttpSession { // Restore gauges match self.state.marker() { StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1), - StateMarker::Http => gauge_add!("protocol.http", -1), + // StateMarker::Http => gauge_add!("protocol.http", -1), + StateMarker::Mux => gauge_add!("protocol.http", -1), StateMarker::WebSocket => { gauge_add!("protocol.ws", -1); gauge_add!("websocket.active_requests", -1); @@ -1393,7 +1475,7 @@ mod tests { ); println!("http client write: {w:?}"); - let expected_answer = "HTTP/1.1 301 Moved Permanently\r\nContent-Length: 0\r\nLocation: https://localhost/redirected?true\r\n\r\n"; + let expected_answer = "HTTP/1.1 301 Moved Permanently\r\nLocation: https://localhost/redirected?true\r\nContent-Length: 0\r\n\r\n"; let mut buffer = [0; 4096]; let mut index = 0; loop { diff --git a/lib/src/https.rs b/lib/src/https.rs index 512dd4187..2877a7112 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -54,7 +54,7 @@ use crate::{ answers::HttpAnswers, parser::{hostname_and_port, Method}, }, - mux::Mux, + mux::{self, Mux}, proxy_protocol::expect::ExpectProxyProtocol, rustls::TlsHandshake, Http, Pipe, SessionState, @@ -87,7 +87,7 @@ StateMachineBuilder! { enum HttpsStateMachine impl SessionState { Expect(ExpectProxyProtocol, ServerConnection), Handshake(TlsHandshake), - Mux(Mux), + Mux(Mux), Http(Http), WebSocket(Pipe), Http2(Http2) -> todo!("H2"), @@ -279,7 +279,6 @@ impl HttpsSession { gauge_add!("protocol.tls.handshake", -1); - use crate::protocol::mux; let mut context = mux::Context::new(self.pool.clone()); let mut frontend = match alpn { AlpnProtocol::Http11 => { @@ -293,10 +292,7 @@ impl HttpsSession { frontend_token: self.frontend_token, frontend, context, - router: mux::Router { - listener: self.listener.clone(), - backends: HashMap::new(), - }, + router: mux::Router::new(self.listener.clone()), public_address: self.public_address, peer_address: self.peer_address, sticky_name: self.sticky_name.clone(), @@ -307,7 +303,7 @@ impl HttpsSession { debug!("https switching to wss"); let front_token = self.frontend_token; let back_token = unwrap_msg!(http.backend_token); - let ws_context = http.websocket_context(); + let ws_context = http.context.websocket_context(); let mut container_frontend_timeout = http.container_frontend_timeout; let mut container_backend_timeout = http.container_backend_timeout; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 749742ecf..820b51475 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -637,6 +637,8 @@ pub enum RetrieveClusterError { NoPath, #[error("unauthorized route")] UnauthorizedRoute, + #[error("https redirect")] + HttpsRedirect, #[error("failed to retrieve the frontend for the request: {0}")] RetrieveFrontend(FrontendFromRequestError), } diff --git a/lib/src/protocol/kawa_h1/editor.rs b/lib/src/protocol/kawa_h1/editor.rs index 261d9ee8d..441b9d977 100644 --- a/lib/src/protocol/kawa_h1/editor.rs +++ b/lib/src/protocol/kawa_h1/editor.rs @@ -6,6 +6,7 @@ use std::{ use rusty_ulid::Ulid; use crate::{ + logs::Endpoint, pool::Checkout, protocol::http::{parser::compare_no_case, GenericHttpStream, Method}, Protocol, RetrieveClusterError, @@ -317,6 +318,18 @@ impl HttpContext { Ok((given_authority, given_path, given_method)) } + /// Format the context of the websocket into a loggable String + pub fn websocket_context(&self) -> String { + Endpoint::Http { + method: self.method.as_ref(), + authority: self.authority.as_deref(), + path: self.path.as_deref(), + status: self.status, + reason: self.reason.as_deref(), + } + .to_string() + } + pub fn reset(&mut self) { self.keep_alive_backend = true; self.sticky_session_found = None; diff --git a/lib/src/protocol/kawa_h1/mod.rs b/lib/src/protocol/kawa_h1/mod.rs index 5a0b5cb82..cf9ec3d01 100644 --- a/lib/src/protocol/kawa_h1/mod.rs +++ b/lib/src/protocol/kawa_h1/mod.rs @@ -718,20 +718,6 @@ impl Http String { - format!( - "{}", - Endpoint::Http { - method: self.context.method.as_ref(), - authority: self.context.authority.as_deref(), - path: self.context.path.as_deref(), - status: self.context.status, - reason: self.context.reason.as_deref(), - } - ) - } - pub fn log_request(&self, metrics: &SessionMetrics, message: Option<&str>) { let listener = self.listener.borrow(); let tags = self.context.authority.as_ref().and_then(|host| { diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index 7a499f195..33c806e31 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -77,6 +77,12 @@ impl ConnectionH1 { .insert(Ready::WRITABLE) } Position::Server => { + if let StreamState::Linked(token) = stream.state { + endpoint + .readiness_mut(token) + .interest + .insert(Ready::WRITABLE) + } if was_initial { self.requests += 1; println_!("REQUESTS: {}", self.requests); @@ -112,15 +118,56 @@ impl ConnectionH1 { match self.position { Position::Client(_) => self.readiness.interest.insert(Ready::READABLE), Position::Server => { - stream.context.reset(); - stream.back.clear(); - stream.back.storage.clear(); - stream.front.clear(); - // do not clear stream.front.storage because of H1 pipelining - stream.attempts = 0; + if stream.context.closing { + return MuxResult::CloseSession; + } + let kawa = &mut stream.back; + match kawa.detached.status_line { + kawa::StatusLine::Response { code: 101, .. } => { + println!("============== HANDLE UPGRADE!"); + // unimplemented!(); + return MuxResult::Upgrade; + } + kawa::StatusLine::Response { code: 100, .. } => { + println!("============== HANDLE CONTINUE!"); + // after a 100 continue, we expect the client to continue with its request + self.readiness.interest.insert(Ready::READABLE); + kawa.clear(); + return MuxResult::Continue; + } + kawa::StatusLine::Response { code: 103, .. } => { + println!("============== HANDLE EARLY HINT!"); + if let StreamState::Linked(token) = stream.state { + // after a 103 early hints, we expect the server to send its response + endpoint + .readiness_mut(token) + .interest + .insert(Ready::READABLE); + kawa.clear(); + return MuxResult::Continue; + } else { + return MuxResult::CloseSession; + } + } + _ => {} + } let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked); - if let StreamState::Linked(token) = old_state { - endpoint.end_stream(token, self.stream, context); + if stream.context.keep_alive_frontend { + println!("{old_state:?} {:?}", self.readiness); + if let StreamState::Linked(token) = old_state { + println!("{:?}", endpoint.readiness(token)); + endpoint.end_stream(token, self.stream, context); + } + self.readiness.interest.insert(Ready::READABLE); + let stream = &mut context.streams[self.stream]; + stream.context.reset(); + stream.back.clear(); + stream.back.storage.clear(); + stream.front.clear(); + // do not clear stream.front.storage because of H1 pipelining + stream.attempts = 0; + } else { + return MuxResult::CloseSession; } } } @@ -128,6 +175,17 @@ impl ConnectionH1 { MuxResult::Continue } + fn force_disconnect(&mut self) -> MuxResult { + match self.position { + Position::Client(_) => { + self.position = Position::Client(BackendStatus::Disconnecting); + self.readiness.event = Ready::HUP; + MuxResult::Continue + } + Position::Server => MuxResult::CloseSession, + } + } + pub fn close(&mut self, context: &mut Context, mut endpoint: E) where E: Endpoint, @@ -156,10 +214,11 @@ impl ConnectionH1 { Position::Client(BackendStatus::Connected(cluster_id)) | Position::Client(BackendStatus::Connecting(cluster_id)) => { self.stream = usize::MAX; - self.position = if stream_context.keep_alive_backend { - Position::Client(BackendStatus::KeepAlive(std::mem::take(cluster_id))) + if stream_context.keep_alive_backend { + self.position = + Position::Client(BackendStatus::KeepAlive(std::mem::take(cluster_id))) } else { - Position::Client(BackendStatus::Disconnecting) + self.force_disconnect(); } } Position::Client(BackendStatus::KeepAlive(_)) @@ -177,6 +236,8 @@ impl ConnectionH1 { } } (true, false) => { + // we do not have an answer, but the request has already been partially consumed + // so we can't retry, send a 502 bad gateway instead set_default_answer(stream, &mut self.readiness, 502); } (false, false) => { @@ -191,6 +252,7 @@ impl ConnectionH1 { pub fn start_stream(&mut self, stream: GlobalStreamId, context: &mut Context) { println_!("start H1 stream {stream} {:?}", self.readiness); + self.readiness.interest.insert(Ready::ALL); self.stream = stream; match &mut self.position { Position::Client(BackendStatus::KeepAlive(cluster_id)) => { diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 3e15a4c14..d6b42da78 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -8,7 +8,7 @@ use std::{ use mio::{net::TcpStream, Interest, Token}; use rusty_ulid::Ulid; -use sozu_command::ready::Ready; +use sozu_command::{proto::command::ListenerType, ready::Ready}; mod converter; mod h1; @@ -19,23 +19,21 @@ mod serializer; use crate::{ backends::{Backend, BackendError}, - https::HttpsListener, pool::{Checkout, Pool}, protocol::{ http::editor::HttpContext, - mux::{ - h1::ConnectionH1, - h2::{ConnectionH2, H2Settings, H2State, H2StreamId}, - }, + mux::h2::{H2Settings, H2State, H2StreamId}, SessionState, }, router::Route, server::CONN_RETRIES, - socket::{FrontRustls, SocketHandler, SocketResult}, + socket::{SocketHandler, SocketResult}, BackendConnectionError, L7ListenerHandler, L7Proxy, ProxySession, Readiness, - RetrieveClusterError, SessionMetrics, SessionResult, StateResult, + RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult, }; +pub use crate::protocol::mux::{h1::ConnectionH1, h2::ConnectionH2}; + #[macro_export] macro_rules! println_ { ($($t:expr),*) => { @@ -53,11 +51,22 @@ type GenericHttpStream = kawa::Kawa; type StreamId = u32; type GlobalStreamId = usize; -/// Replace the content of the kawa message with a default Sozu answer for a given status code -fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16) { - let kawa = &mut stream.back; - kawa.clear(); - kawa.storage.clear(); +pub fn fill_default_301_answer(kawa: &mut kawa::Kawa, host: &str, uri: &str) { + kawa.detached.status_line = kawa::StatusLine::Response { + version: kawa::Version::V20, + code: 301, + status: kawa::Store::Static(b"301"), + reason: kawa::Store::Static(b"Moved Permanently"), + }; + kawa.push_block(kawa::Block::StatusLine); + kawa.push_block(kawa::Block::Header(kawa::Pair { + key: kawa::Store::Static(b"Location"), + val: kawa::Store::from_string(format!("https://{host}{uri}")), + })); + terminate_default_answer(kawa, false); +} + +pub fn fill_default_answer(kawa: &mut kawa::Kawa, code: u16) { kawa.detached.status_line = kawa::StatusLine::Response { version: kawa::Version::V20, code, @@ -65,14 +74,20 @@ fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16) reason: kawa::Store::Static(b"Sozu Default Answer"), }; kawa.push_block(kawa::Block::StatusLine); - kawa.push_block(kawa::Block::Header(kawa::Pair { - key: kawa::Store::Static(b"Cache-Control"), - val: kawa::Store::Static(b"no-cache"), - })); - kawa.push_block(kawa::Block::Header(kawa::Pair { - key: kawa::Store::Static(b"Connection"), - val: kawa::Store::Static(b"close"), - })); + terminate_default_answer(kawa, true); +} + +pub fn terminate_default_answer(kawa: &mut kawa::Kawa, close: bool) { + if close { + kawa.push_block(kawa::Block::Header(kawa::Pair { + key: kawa::Store::Static(b"Cache-Control"), + val: kawa::Store::Static(b"no-cache"), + })); + kawa.push_block(kawa::Block::Header(kawa::Pair { + key: kawa::Store::Static(b"Connection"), + val: kawa::Store::Static(b"close"), + })); + } kawa.push_block(kawa::Block::Header(kawa::Pair { key: kawa::Store::Static(b"Content-Length"), val: kawa::Store::Static(b"0"), @@ -84,6 +99,20 @@ fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16) end_stream: true, })); kawa.parsing_phase = kawa::ParsingPhase::Terminated; +} + +/// Replace the content of the kawa message with a default Sozu answer for a given status code +fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16) { + let kawa = &mut stream.back; + kawa.clear(); + kawa.storage.clear(); + if code == 301 { + let host = stream.context.authority.as_deref().unwrap(); + let uri = stream.context.path.as_deref().unwrap(); + fill_default_301_answer(kawa, host, uri); + } else { + fill_default_answer(kawa, code); + } stream.state = StreamState::Unlinked; readiness.interest.insert(Ready::WRITABLE); } @@ -120,6 +149,7 @@ pub enum BackendStatus { pub enum MuxResult { Continue, + Upgrade, CloseSession, } @@ -238,13 +268,13 @@ impl Connection { Connection::H2(c) => &mut c.readiness, } } - fn position(&self) -> &Position { + pub fn position(&self) -> &Position { match self { Connection::H1(c) => &c.position, Connection::H2(c) => &c.position, } } - fn position_mut(&mut self) -> &mut Position { + pub fn position_mut(&mut self) -> &mut Position { match self { Connection::H1(c) => &mut c.position, Connection::H2(c) => &mut c.position, @@ -300,12 +330,12 @@ impl Connection { } } -struct EndpointServer<'a>(&'a mut Connection); +struct EndpointServer<'a, Front: SocketHandler>(&'a mut Connection); struct EndpointClient<'a>(&'a mut Router); // note: EndpointServer are used by client Connection, they do not know the frontend Token // they will use the Stream's Token which is their backend token -impl<'a> Endpoint for EndpointServer<'a> { +impl<'a, Front: SocketHandler> Endpoint for EndpointServer<'a, Front> { fn readiness(&self, _token: Token) -> &Readiness { self.0.readiness() } @@ -442,8 +472,8 @@ pub struct Stream { pub window: i32, pub attempts: u8, pub state: StreamState, - front: GenericHttpStream, - back: GenericHttpStream, + pub front: GenericHttpStream, + pub back: GenericHttpStream, pub context: HttpContext, } @@ -560,11 +590,18 @@ impl Context { } pub struct Router { - pub listener: Rc>, + pub listener: Rc>, pub backends: HashMap>, } impl Router { + pub fn new(listener: Rc>) -> Self { + Self { + listener, + backends: HashMap::new(), + } + } + fn connect( &mut self, stream_id: GlobalStreamId, @@ -587,12 +624,18 @@ impl Router { .route_from_request(stream_context, proxy.clone()) .map_err(BackendConnectionError::RetrieveClusterError)?; - let frontend_should_stick = proxy + let (frontend_should_stick, frontend_should_redirect_https) = proxy .borrow() .clusters() .get(&cluster_id) - .map(|cluster| cluster.sticky_session) - .unwrap_or(false); + .map(|cluster| (cluster.sticky_session, cluster.https_redirect)) + .unwrap_or((false, false)); + + if frontend_should_redirect_https && matches!(proxy.borrow().kind(), ListenerType::Http) { + return Err(BackendConnectionError::RetrieveClusterError( + RetrieveClusterError::HttpsRedirect, + )); + } let mut reuse_token = None; // let mut priority = 0; @@ -794,9 +837,9 @@ impl Router { } } -pub struct Mux { +pub struct Mux { pub frontend_token: Token, - pub frontend: Connection, + pub frontend: Connection, pub router: Router, pub public_address: SocketAddr, pub peer_address: Option, @@ -804,13 +847,13 @@ pub struct Mux { pub context: Context, } -impl Mux { +impl Mux { pub fn front_socket(&self) -> &TcpStream { self.frontend.socket() } } -impl SessionState for Mux { +impl SessionState for Mux { fn ready( &mut self, session: Rc>, @@ -836,6 +879,7 @@ impl SessionState for Mux { { MuxResult::Continue => {} MuxResult::CloseSession => return SessionResult::Close, + MuxResult::Upgrade => return SessionResult::Upgrade, } } @@ -844,6 +888,7 @@ impl SessionState for Mux { let mut dead_backends = Vec::new(); for (token, backend) in self.router.backends.iter_mut() { let readiness = backend.readiness_mut(); + println!("{token:?} -> {readiness:?}"); let dead = readiness.filter_interest().is_hup() || readiness.filter_interest().is_error(); if dead { @@ -863,6 +908,7 @@ impl SessionState for Mux { } match backend.writable(context, EndpointServer(&mut self.frontend)) { MuxResult::Continue => {} + MuxResult::Upgrade => unreachable!(), // only frontend can upgrade MuxResult::CloseSession => return SessionResult::Close, } } @@ -870,6 +916,7 @@ impl SessionState for Mux { if backend.readiness().filter_interest().is_readable() { match backend.readable(context, EndpointServer(&mut self.frontend)) { MuxResult::Continue => {} + MuxResult::Upgrade => unreachable!(), // only frontend can upgrade MuxResult::CloseSession => return SessionResult::Close, } } @@ -888,7 +935,7 @@ impl SessionState for Mux { for token in &dead_backends { self.router.backends.remove(token); } - println_!("FRONTEND: {:#?}", &self.frontend); + println_!("FRONTEND: {:#?}", self.frontend); println_!("BACKENDS: {:#?}", self.router.backends); } @@ -900,6 +947,7 @@ impl SessionState for Mux { { MuxResult::Continue => {} MuxResult::CloseSession => return SessionResult::Close, + MuxResult::Upgrade => return SessionResult::Upgrade, } } @@ -951,6 +999,9 @@ impl SessionState for Mux { ) => { set_default_answer(stream, front_readiness, 401); } + BE::RetrieveClusterError(RetrieveClusterError::HttpsRedirect) => { + set_default_answer(stream, front_readiness, 301); + } BE::Backend(_) => {} BE::RetrieveClusterError(_) => unreachable!(), @@ -996,6 +1047,7 @@ impl SessionState for Mux { self.frontend.readiness() ); } + fn close(&mut self, _proxy: Rc>, _metrics: &mut SessionMetrics) { let s = match &mut self.frontend { Connection::H1(c) => &mut c.socket, @@ -1017,4 +1069,32 @@ impl SessionState for Mux { } } } + + fn shutting_down(&mut self) -> SessionIsToBeClosed { + let mut can_stop = true; + for stream in &mut self.context.streams { + match stream.state { + StreamState::Linked(_) => { + can_stop = false; + } + StreamState::Unlinked => { + let front = &stream.front; + let back = &stream.back; + kawa::debug_kawa(front); + kawa::debug_kawa(back); + if front.is_initial() + && front.storage.is_empty() + && back.is_initial() + && back.storage.is_empty() + { + continue; + } + stream.context.closing = true; + can_stop = false; + } + _ => {} + } + } + can_stop + } }