From 8faa16c999cf47f2c9260c87d6e8b17deca8a269 Mon Sep 17 00:00:00 2001 From: Eloi DEMOLIS <eloi.demolis@clever-cloud.com> Date: Fri, 1 Mar 2024 11:41:00 +0100 Subject: [PATCH] Restore context in logs by moving the fields in HttpContext Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com> --- lib/src/http.rs | 9 +- lib/src/https.rs | 9 +- lib/src/protocol/kawa_h1/editor.rs | 12 +++ lib/src/protocol/kawa_h1/mod.rs | 134 +++++++++++++++-------------- 4 files changed, 92 insertions(+), 72 deletions(-) diff --git a/lib/src/http.rs b/lib/src/http.rs index 85b22f544..566cdf89a 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -220,7 +220,10 @@ impl HttpSession { let back_token = match http.backend_token { Some(back_token) => back_token, None => { - warn!("Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'", http.cluster_id, self.frontend_token, http.backend_id, http.context.id); + warn!( + "Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'", + http.context.cluster_id, self.frontend_token, http.context.backend_id, http.context.id + ); return None; } }; @@ -239,12 +242,12 @@ impl HttpSession { let mut pipe = Pipe::new( backend_buffer, - http.backend_id, + http.context.backend_id, http.backend_socket, http.backend, Some(container_backend_timeout), Some(container_frontend_timeout), - http.cluster_id, + http.context.cluster_id, http.request_stream.storage.buffer, front_token, http.frontend_socket, diff --git a/lib/src/https.rs b/lib/src/https.rs index 8311a2775..6db23fb06 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -346,7 +346,10 @@ impl HttpsSession { let back_token = match http.backend_token { Some(back_token) => back_token, None => { - warn!("Could not upgrade https request on cluster '{:?}' ({:?}) using backend '{:?}' into secure websocket for request '{}'", http.cluster_id, self.frontend_token, http.backend_id, http.context.id); + warn!( + "Could not upgrade https request on cluster '{:?}' ({:?}) using backend '{:?}' into secure websocket for request '{}'", + http.context.cluster_id, self.frontend_token, http.context.backend_id, http.context.id + ); return None; } }; @@ -365,12 +368,12 @@ impl HttpsSession { let mut pipe = Pipe::new( backend_buffer, - http.backend_id, + http.context.backend_id, http.backend_socket, http.backend, Some(container_backend_timeout), Some(container_frontend_timeout), - http.cluster_id, + http.context.cluster_id, http.request_stream.storage.buffer, front_token, http.frontend_socket, diff --git a/lib/src/protocol/kawa_h1/editor.rs b/lib/src/protocol/kawa_h1/editor.rs index ccb30cd5a..26a8d3658 100644 --- a/lib/src/protocol/kawa_h1/editor.rs +++ b/lib/src/protocol/kawa_h1/editor.rs @@ -11,6 +11,8 @@ use crate::{ Protocol, }; +use sozu_command_lib::logging::LogContext; + /// This is the container used to store and use information about the session from within a Kawa parser callback #[derive(Debug)] pub struct HttpContext { @@ -40,6 +42,8 @@ pub struct HttpContext { pub closing: bool, /// the value of the custom header, named "Sozu-Id", that Kawa should write (request and response) pub id: Ulid, + pub backend_id: Option<String>, + pub cluster_id: Option<String>, /// the value of the protocol Kawa should write in the Forwarded headers of the request pub protocol: Protocol, /// the value of the public address Kawa should write in the Forwarded headers of the request @@ -353,4 +357,12 @@ impl HttpContext { self.reason = None; self.user_agent = None; } + + pub fn log_context(&self) -> LogContext { + LogContext { + request_id: self.id, + cluster_id: self.cluster_id.as_deref(), + backend_id: self.backend_id.as_deref(), + } + } } diff --git a/lib/src/protocol/kawa_h1/mod.rs b/lib/src/protocol/kawa_h1/mod.rs index a5219a568..5bfcf4f08 100644 --- a/lib/src/protocol/kawa_h1/mod.rs +++ b/lib/src/protocol/kawa_h1/mod.rs @@ -23,6 +23,7 @@ use crate::{ pool::{Checkout, Pool}, protocol::{ http::{answers::DefaultAnswerStream, editor::HttpContext, parser::Method}, + pipe::WebSocketContext, SessionState, }, retry::RetryPolicy, @@ -36,8 +37,6 @@ use crate::{ RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult, }; -use super::pipe::WebSocketContext; - /// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer type GenericHttpStream = kawa::Kawa<Checkout>; @@ -99,7 +98,6 @@ pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> { answers: Rc<RefCell<answers::HttpAnswers>>, pub backend: Option<Rc<RefCell<Backend>>>, backend_connection_status: BackendConnectionStatus, - pub backend_id: Option<String>, pub backend_readiness: Readiness, pub backend_socket: Option<TcpStream>, backend_stop: Option<Instant>, @@ -109,7 +107,6 @@ pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> { configured_backend_timeout: Duration, configured_connect_timeout: Duration, configured_frontend_timeout: Duration, - pub cluster_id: Option<String>, /// attempts to connect to the backends during the session connection_attempts: u8, pub frontend_readiness: Readiness, @@ -163,13 +160,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L Ok(Http { answers, backend_connection_status: BackendConnectionStatus::NotConnected, - backend_id: None, backend_readiness: Readiness::new(), backend_socket: None, backend_stop: None, backend_token: None, backend: None, - cluster_id: None, configured_backend_timeout, configured_connect_timeout, configured_frontend_timeout, @@ -193,8 +188,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L kawa::Buffer::new(back_buffer), )), context: HttpContext { - closing: false, id: request_id, + backend_id: None, + cluster_id: None, + + closing: false, keep_alive_backend: true, keep_alive_frontend: true, protocol, @@ -253,7 +251,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L if !response_storage.is_empty() { warn!( "{} Leftover fragment from response: {}", - "self.log_context()", + self.context.log_context(), parser::view( response_storage.used(), 16, @@ -285,7 +283,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L ResponseStream::DefaultAnswer(..) => { error!( "{}\tsending default answer, should not read from front socket", - self.log_context() + self.context.log_context() ); self.frontend_readiness.interest.remove(Ready::READABLE); self.frontend_readiness.interest.insert(Ready::WRITABLE); @@ -312,7 +310,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L .socket_read(self.request_stream.storage.space()); debug!( "{}\tFRONT [{}->{:?}]: read {} bytes", - "self.log_context()", self.frontend_token.0, self.backend_token, size + self.context.log_context(), + self.frontend_token.0, + self.backend_token, + size ); if size > 0 { @@ -376,7 +377,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L incr!("http.frontend_parse_errors"); warn!( "{} Parsing request error in {:?}: {}", - "self.log_context()", + self.context.log_context(), marker, match kind { kawa::ParsingErrorKind::Consuming { index } => { @@ -440,7 +441,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs); debug!( "{}\tFRONT [{}<-{:?}]: wrote {} bytes", - "self.log_context()", self.frontend_token.0, self.backend_token, size + self.context.log_context(), + self.frontend_token.0, + self.backend_token, + size ); if size > 0 { @@ -477,7 +481,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L metrics.reset(); if self.context.closing { - debug!("{} closing proxy, no keep alive", self.log_context()); + debug!( + "{} closing proxy, no keep alive", + self.context.log_context() + ); return StateResult::CloseSession; } @@ -531,17 +538,17 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L response_length_known, ) { (true, true, true) => { - debug!("{} keep alive front/back", self.log_context()); + debug!("{} keep alive front/back", self.context.log_context()); self.reset(); StateResult::Continue } (true, false, true) => { - debug!("{} keep alive front", self.log_context()); + debug!("{} keep alive front", self.context.log_context()); self.reset(); StateResult::CloseBackend } _ => { - debug!("{} no keep alive", self.log_context()); + debug!("{} no keep alive", self.context.log_context()); StateResult::CloseSession } }; @@ -567,7 +574,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L } if response_stream.is_completed() { - save_http_status_metric(self.context.status, self.log_context()); + save_http_status_metric(self.context.status, self.context.log_context()); self.log_default_answer_success(metrics); self.frontend_readiness.reset(); self.backend_readiness.reset(); @@ -591,7 +598,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L if let ResponseStream::DefaultAnswer(..) = self.response_stream { error!( "{}\tsending default answer, should not write to back", - self.log_context() + self.context.log_context() ); self.backend_readiness.interest.remove(Ready::WRITABLE); self.frontend_readiness.interest.insert(Ready::WRITABLE); @@ -616,7 +623,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L let (size, socket_state) = backend_socket.socket_write_vectored(&bufs); debug!( "{}\tBACK [{}->{:?}]: wrote {} bytes", - self.log_context(), + self.context.log_context(), self.frontend_token.0, self.backend_token, size @@ -679,7 +686,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L _ => { error!( "{}\tsending default answer, should not read from back socket", - self.log_context() + self.context.log_context() ); self.backend_readiness.interest.remove(Ready::READABLE); self.frontend_readiness.interest.insert(Ready::WRITABLE); @@ -712,7 +719,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space()); debug!( "{}\tBACK [{}<-{:?}]: read {} bytes", - "ctx", // FIXME: self.log_context(), + "ctx", // FIXME: self.context.log_context(), self.frontend_token.0, self.backend_token, size @@ -765,7 +772,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L incr!("http.backend_parse_errors"); warn!( "{} Parsing response error in {:?}: {}", - "self.log_context()", + self.context.log_context(), marker, match kind { kawa::ParsingErrorKind::Consuming { index } => { @@ -806,14 +813,6 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L } impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> { - fn log_context(&self) -> LogContext { - LogContext { - request_id: self.context.id, - cluster_id: self.cluster_id.as_deref(), - backend_id: self.backend_id.as_deref(), - } - } - fn log_endpoint(&self) -> EndpointRecord { EndpointRecord::Http { method: self.context.method.as_deref(), @@ -879,7 +878,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L listener.get_tags(hostname) }); - let context = self.log_context(); + let context = self.context.log_context(); metrics.register_end_of_session(&context); log_access! { @@ -902,7 +901,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L } pub fn log_request_success(&self, metrics: &SessionMetrics) { - save_http_status_metric(self.context.status, self.log_context()); + save_http_status_metric(self.context.status, self.context.log_context()); self.log_request(metrics, false, None); } @@ -913,7 +912,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L incr!("http.errors"); error!( "{} Could not process request properly got: {}", - self.log_context(), + self.context.log_context(), message ); self.print_state(self.protocol_string()); @@ -931,52 +930,52 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L match answer { DefaultAnswer::Answer301 { .. } => incr!( "http.301.redirection", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer400 { .. } => incr!("http.400.errors"), DefaultAnswer::Answer401 { .. } => incr!( "http.401.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer404 { .. } => incr!("http.404.errors"), DefaultAnswer::Answer408 { .. } => incr!( "http.408.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer413 { .. } => incr!( "http.413.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer502 { .. } => incr!( "http.502.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer503 { .. } => incr!( "http.503.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer504 { .. } => incr!( "http.504.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), DefaultAnswer::Answer507 { .. } => incr!( "http.507.errors", - self.cluster_id.as_deref(), - self.backend_id.as_deref() + self.context.cluster_id.as_deref(), + self.context.backend_id.as_deref() ), }; } let mut kawa = self.answers.borrow().get( answer, - self.cluster_id.as_deref(), + self.context.cluster_id.as_deref(), self.context.id.to_string(), ); kawa.prepare(&mut kawa::h1::BlockConverter); @@ -1027,11 +1026,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L } pub fn set_cluster_id(&mut self, cluster_id: String) { - self.cluster_id = Some(cluster_id); + self.context.cluster_id = Some(cluster_id); } pub fn set_backend_id(&mut self, backend_id: String) { - self.backend_id = Some(backend_id); + self.context.backend_id = Some(backend_id); } pub fn set_backend_token(&mut self, token: Token) { @@ -1060,7 +1059,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L self.container_backend_timeout.cancel(); debug!( "{}\tPROXY [{}->{}] CLOSED BACKEND", - self.log_context(), + self.context.log_context(), self.frontend_token.0, self.backend_token .map(|t| format!("{}", t.0)) @@ -1091,7 +1090,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L gauge_add!( "connections_per_backend", -1, - self.cluster_id.as_deref(), + self.context.cluster_id.as_deref(), metrics.backend_id.as_deref() ); } @@ -1107,7 +1106,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L /// Check the number of connection attempts against authorized connection retries fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> { if self.connection_attempts >= CONN_RETRIES { - error!("{} max connection attempt reached", self.log_context()); + error!( + "{} max connection attempt reached", + self.context.log_context() + ); self.set_answer(DefaultAnswer::Answer503 { details: format!( "Max connection attempt reached: {}", @@ -1281,7 +1283,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics, ) -> Result<BackendConnectAction, BackendConnectionError> { - let old_cluster_id = self.cluster_id.clone(); + let old_cluster_id = self.context.cluster_id.clone(); let old_backend_token = self.backend_token; self.check_circuit_breaker()?; @@ -1292,12 +1294,12 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L trace!( "connect_to_backend: {:?} {:?} {:?}", - self.cluster_id, + self.context.cluster_id, cluster_id, self.backend_connection_status ); // check if we can reuse the backend connection - if (self.cluster_id.as_ref()) == Some(&cluster_id) + if (self.context.cluster_id.as_ref()) == Some(&cluster_id) && self.backend_connection_status == BackendConnectionStatus::Connected { let has_backend = self @@ -1328,7 +1330,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L self.close_backend(proxy.clone(), metrics); } - self.cluster_id = Some(cluster_id.clone()); + self.context.cluster_id = Some(cluster_id.clone()); let frontend_should_stick = proxy .borrow() @@ -1398,7 +1400,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L gauge_add!( "connections_per_backend", 1, - self.cluster_id.as_deref(), + self.context.cluster_id.as_deref(), metrics.backend_id.as_deref() ); @@ -1417,7 +1419,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L if backend.retry_policy.is_down() { incr!( "backend.up", - self.cluster_id.as_deref(), + self.context.cluster_id.as_deref(), metrics.backend_id.as_deref() ); @@ -1455,7 +1457,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L backend.retry_policy.fail(); incr!( "backend.connections.error", - self.cluster_id.as_deref(), + self.context.cluster_id.as_deref(), metrics.backend_id.as_deref() ); if !already_unavailable && backend.retry_policy.is_down() { @@ -1465,7 +1467,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L ); incr!( "backend.down", - self.cluster_id.as_deref(), + self.context.cluster_id.as_deref(), metrics.backend_id.as_deref() ); @@ -1570,7 +1572,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L //retry connecting the backend error!( "{} error connecting to backend, trying again, attempt {}", - self.log_context(), + self.context.log_context(), self.connection_attempts ); @@ -1610,7 +1612,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L trace!( "PROXY\t{} {:?} {:?} -> {:?}", - self.log_context(), + self.context.log_context(), self.frontend_token, self.frontend_readiness, self.backend_readiness @@ -1820,7 +1822,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState TimeoutStatus::Response => { error!( "backend {:?} timeout while receiving response (cluster {:?})", - self.backend_id, self.cluster_id + self.context.backend_id, self.context.cluster_id ); StateResult::CloseSession }