diff --git a/.bleep b/.bleep index ac4d79095..5081a5be5 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -28f94f2a402bbf66341bdac8fa670caf5b7311e9 \ No newline at end of file +2351cdf592f9986201d754e6ee1f37f493f69abb \ No newline at end of file diff --git a/pingora-cache/Cargo.toml b/pingora-cache/Cargo.toml index 2dc3f2f73..6ae6c123c 100644 --- a/pingora-cache/Cargo.toml +++ b/pingora-cache/Cargo.toml @@ -43,6 +43,7 @@ lru = { workspace = true } ahash = { workspace = true } hex = "0.4" httparse = { workspace = true } +strum = { version = "0.26", features = ["derive"] } [dev-dependencies] tokio-test = "0.4" diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index d1346e501..f1f8bd210 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -21,6 +21,7 @@ use key::{CacheHashKey, HashBinary}; use lock::WritePermit; use pingora_error::Result; use pingora_http::ResponseHeader; +use rustracing::tag::Tag; use std::time::{Duration, Instant, SystemTime}; use trace::CacheTraceCTX; @@ -499,6 +500,11 @@ impl HttpCache { // from Stale: waited for cache lock, then retried and found asset was gone CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => { self.phase = CachePhase::Miss; + // It's possible that we've set the meta on lookup and have come back around + // here after not being able to acquire the cache lock, and our item has since + // purged or expired. We should be sure that the meta is not set in this case + // as there shouldn't be a meta set for cache misses. + self.inner_mut().meta = None; self.inner_mut().traces.start_miss_span(); } _ => panic!("wrong phase {:?}", self.phase), @@ -1042,7 +1048,7 @@ impl HttpCache { /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock. pub async fn cache_lock_wait(&mut self) -> LockStatus { let inner = self.inner_mut(); - let _span = inner.traces.child("cache_lock"); + let mut span = inner.traces.child("cache_lock"); let lock = inner.lock.take(); // remove the lock from self if let Some(Locked::Read(r)) = lock { let now = Instant::now(); @@ -1054,7 +1060,10 @@ impl HttpCache { .lock_duration .map_or(lock_duration, |d| d + lock_duration), ); - r.lock_status() // TODO: tag the span with lock status + let status = r.lock_status(); + let tag_value: &'static str = status.into(); + span.set_tag(|| Tag::new("status", tag_value)); + status } else { // should always call is_cache_locked() before this function panic!("cache_lock_wait on wrong type of lock") diff --git a/pingora-cache/src/lock.rs b/pingora-cache/src/lock.rs index 7f50691bb..8853cb17e 100644 --- a/pingora-cache/src/lock.rs +++ b/pingora-cache/src/lock.rs @@ -100,12 +100,14 @@ impl CacheLock { } } +use log::warn; use std::sync::atomic::{AtomicU8, Ordering}; use std::time::{Duration, Instant}; +use strum::IntoStaticStr; use tokio::sync::Semaphore; /// Status which the read locks could possibly see. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)] pub enum LockStatus { /// Waiting for the writer to populate the asset Waiting, @@ -180,7 +182,7 @@ impl LockCore { } fn lock_status(&self) -> LockStatus { - self.lock_status.load(Ordering::Relaxed).into() + self.lock_status.load(Ordering::SeqCst).into() } } @@ -197,11 +199,22 @@ impl ReadLock { return; } - // TODO: should subtract now - start so that the lock don't wait beyond start + timeout - // Also need to be careful not to wake everyone up at the same time + // TODO: need to be careful not to wake everyone up at the same time // (maybe not an issue because regular cache lock release behaves that way) - let _ = timeout(self.0.timeout, self.0.lock.acquire()).await; - // permit is returned to Semaphore right away + if let Some(duration) = self.0.timeout.checked_sub(self.0.lock_start.elapsed()) { + match timeout(duration, self.0.lock.acquire()).await { + Ok(Ok(_)) => { // permit is returned to Semaphore right away + } + Ok(Err(e)) => { + warn!("error acquiring semaphore {e:?}") + } + Err(_) => { + self.0 + .lock_status + .store(LockStatus::Timeout.into(), Ordering::SeqCst); + } + } + } } /// Test if it is still locked @@ -211,7 +224,7 @@ impl ReadLock { /// Whether the lock is expired, e.g., the writer has been holding the lock for too long pub fn expired(&self) -> bool { - // NOTE: this whether the lock is currently expired + // NOTE: this is whether the lock is currently expired // not whether it was timed out during wait() self.0.lock_start.elapsed() >= self.0.timeout } diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index 016203431..58bd2099a 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -33,8 +33,59 @@ pub trait Connect: std::fmt::Debug { async fn connect(&self, addr: &SocketAddr) -> Result; } +/// Settings for binding on connect +#[derive(Clone, Debug, Default)] +pub struct BindTo { + // local ip address + pub addr: Option, + // port range + port_range: Option<(u16, u16)>, + // whether we fallback and try again on bind errors when a port range is set + fallback: bool, +} + +impl BindTo { + /// Sets the port range we will bind to where the first item in the tuple is the lower bound + /// and the second item is the upper bound. + /// + /// Note this bind option is only supported on Linux since 6.3, this is a no-op on other systems. + /// To reset the range, pass a `None` or `Some((0,0))`, more information can be found [here](https://man7.org/linux/man-pages/man7/ip.7.html) + pub fn set_port_range(&mut self, range: Option<(u16, u16)>) -> Result<()> { + if range.is_none() && self.port_range.is_none() { + // nothing to do + return Ok(()); + } + + match range { + // 0,0 is valid for resets + None | Some((0, 0)) => self.port_range = Some((0, 0)), + // set the port range if valid + Some((low, high)) if low > 0 && low < high => { + self.port_range = Some((low, high)); + } + _ => return Error::e_explain(SocketError, "invalid port range: {range}"), + } + Ok(()) + } + + /// Set whether we fallback on no address available if a port range is set + pub fn set_fallback(&mut self, fallback: bool) { + self.fallback = fallback + } + + /// Configured bind port range + pub fn port_range(&self) -> Option<(u16, u16)> { + self.port_range + } + + /// Whether we attempt to fallback on no address available + pub fn will_fallback(&self) -> bool { + self.fallback && self.port_range.is_some() + } +} + /// Establish a connection (l4) to the given peer using its settings and an optional bind address. -pub(crate) async fn connect

(peer: &P, bind_to: Option) -> Result +pub(crate) async fn connect

(peer: &P, bind_to: Option) -> Result where P: Peer + Send + Sync, { @@ -142,12 +193,8 @@ pub(crate) fn bind_to_random( peer: &P, v4_list: &[InetSocketAddr], v6_list: &[InetSocketAddr], -) -> Option { - let selected = peer.get_peer_options().and_then(|o| o.bind_to); - if selected.is_some() { - return selected; - } - +) -> Option { + // helper function for randomly picking address fn bind_to_ips(ips: &[InetSocketAddr]) -> Option { match ips.len() { 0 => None, @@ -159,13 +206,31 @@ pub(crate) fn bind_to_random( } } - match peer.address() { + let mut bind_to = peer.get_peer_options().and_then(|o| o.bind_to.clone()); + if bind_to.as_ref().map(|b| b.addr).is_some() { + // already have a bind address selected + return bind_to; + } + + let addr = match peer.address() { SocketAddr::Inet(sockaddr) => match sockaddr { InetSocketAddr::V4(_) => bind_to_ips(v4_list), InetSocketAddr::V6(_) => bind_to_ips(v6_list), }, SocketAddr::Unix(_) => None, + }; + + if addr.is_some() { + if let Some(bind_to) = bind_to.as_mut() { + bind_to.addr = addr; + } else { + bind_to = Some(BindTo { + addr, + ..Default::default() + }); + } } + bind_to } use crate::protocols::raw_connect; @@ -238,16 +303,25 @@ mod tests { #[tokio::test] async fn test_conn_error_addr_not_avail() { let peer = HttpPeer::new("127.0.0.1:121".to_string(), false, "".to_string()); - let new_session = connect(&peer, Some("192.0.2.2:0".parse().unwrap())).await; + let addr = "192.0.2.2:0".parse().ok(); + let bind_to = BindTo { + addr, + ..Default::default() + }; + let new_session = connect(&peer, Some(bind_to)).await; assert_eq!(new_session.unwrap_err().etype(), &InternalError) } #[tokio::test] async fn test_conn_error_other() { let peer = HttpPeer::new("240.0.0.1:80".to_string(), false, "".to_string()); // non localhost - + let addr = "127.0.0.1:0".parse().ok(); // create an error: cannot send from src addr: localhost to dst addr: a public IP - let new_session = connect(&peer, Some("127.0.0.1:0".parse().unwrap())).await; + let bind_to = BindTo { + addr, + ..Default::default() + }; + let new_session = connect(&peer, Some(bind_to)).await; let error = new_session.unwrap_err(); // XXX: some system will allow the socket to bind and connect without error, only to timeout assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout) @@ -371,4 +445,114 @@ mod tests { assert_eq!(err.etype(), &ConnectionClosed); assert!(!err.retry()); } + + #[cfg(target_os = "linux")] + #[tokio::test(flavor = "multi_thread")] + async fn test_bind_to_port_range_on_connect() { + fn get_ip_local_port_range() -> (u16, u16) { + let path = "/proc/sys/net/ipv4/ip_local_port_range"; + let file = std::fs::read_to_string(path).unwrap(); + let mut parts = file.split_whitespace(); + ( + parts.next().unwrap().parse().unwrap(), + parts.next().unwrap().parse().unwrap(), + ) + } + + // one-off mock server + async fn mock_inet_connect_server() { + use tokio::net::TcpListener; + let listener = TcpListener::bind("127.0.0.1:10020").await.unwrap(); + if let Ok((mut stream, _addr)) = listener.accept().await { + stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await.unwrap(); + // wait a bit so that the client can read + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + + fn in_port_range(session: Stream, lower: u16, upper: u16) -> bool { + let digest = session.get_socket_digest(); + let local_addr = digest + .as_ref() + .and_then(|s| s.local_addr()) + .unwrap() + .as_inet() + .unwrap(); + + // assert range + local_addr.port() >= lower && local_addr.port() <= upper + } + + tokio::spawn(async { + mock_inet_connect_server().await; + }); + // wait for the server to start + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // need to read /proc/sys/net/ipv4/ip_local_port_range for this test to work + // IP_LOCAL_PORT_RANGE clamp only works on ports in /proc/sys/net/ipv4/ip_local_port_range + let (low, _) = get_ip_local_port_range(); + let high = low + 1; + + let peer = HttpPeer::new("127.0.0.1:10020".to_string(), false, "".to_string()); + let mut bind_to = BindTo { + addr: "127.0.0.1:0".parse().ok(), + ..Default::default() + }; + bind_to.set_port_range(Some((low, high))).unwrap(); + + let session1 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session1, low, high)); + + // execute more connect() + let session2 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session2, low, high)); + let session3 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session3, low, high)); + + // disabled fallback, should be AddrNotAvailable error + let err = connect(&peer, Some(bind_to.clone())).await.unwrap_err(); + assert_eq!(err.etype(), &InternalError); + + // enable fallback, assert not in port range but successful + bind_to.set_fallback(true); + let session4 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(!in_port_range(session4, low, high)); + + // works without bind IP, shift up to use new ports + let low = low + 2; + let high = low + 1; + let mut bind_to = BindTo::default(); + bind_to.set_port_range(Some((low, high))).unwrap(); + let session5 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session5, low, high)); + } + + #[test] + fn test_bind_to_port_ranges() { + let addr = "127.0.0.1:0".parse().ok(); + let mut bind_to = BindTo { + addr, + ..Default::default() + }; + + // None because the previous value was None + bind_to.set_port_range(None).unwrap(); + assert!(bind_to.port_range.is_none()); + + // zeroes are handled + bind_to.set_port_range(Some((0, 0))).unwrap(); + assert_eq!(bind_to.port_range, Some((0, 0))); + + // zeroes because the previous value was Some + bind_to.set_port_range(None).unwrap(); + assert_eq!(bind_to.port_range, Some((0, 0))); + + // low > high is error + assert!(bind_to.set_port_range(Some((2000, 1000))).is_err()); + + // low < high success + bind_to.set_port_range(Some((1000, 2000))).unwrap(); + assert_eq!(bind_to.port_range, Some((1000, 2000))); + } } diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index 2d4584c24..cbe299f5d 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -24,8 +24,8 @@ use crate::server::configuration::ServerConf; use crate::tls::ssl::SslConnector; use crate::upstreams::peer::{Peer, ALPN}; -use l4::connect as l4_connect; pub use l4::Connect as L4Connect; +use l4::{connect as l4_connect, BindTo}; use log::{debug, error, warn}; use offload::OffloadRuntime; use parking_lot::RwLock; @@ -273,7 +273,7 @@ impl TransportConnector { // connection timeout if there is one async fn do_connect( peer: &P, - bind_to: Option, + bind_to: Option, alpn_override: Option, tls_ctx: &SslConnector, ) -> Result { @@ -296,7 +296,7 @@ async fn do_connect( // Perform the actual L4 and tls connection steps with no timeout async fn do_connect_inner( peer: &P, - bind_to: Option, + bind_to: Option, alpn_override: Option, tls_ctx: &SslConnector, ) -> Result { diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index ea99d7430..8dca9d495 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -319,7 +319,7 @@ impl HttpSession { .map_or(b"", |h| h.as_bytes()) } - /// Return a string `$METHOD $PATH $HOST`. Mostly for logging and debug purpose + /// Return a string `$METHOD $PATH, Host: $HOST`. Mostly for logging and debug purpose pub fn request_summary(&self) -> String { format!( "{} {}, Host: {}", diff --git a/pingora-core/src/protocols/http/v2/server.rs b/pingora-core/src/protocols/http/v2/server.rs index 447a4e309..6d6b393bf 100644 --- a/pingora-core/src/protocols/http/v2/server.rs +++ b/pingora-core/src/protocols/http/v2/server.rs @@ -344,7 +344,7 @@ impl HttpSession { Ok(end_stream) } - /// Return a string `$METHOD $PATH $HOST`. Mostly for logging and debug purpose + /// Return a string `$METHOD $PATH, Host: $HOST`. Mostly for logging and debug purpose pub fn request_summary(&self) -> String { format!( "{} {}, Host: {}:{}", diff --git a/pingora-core/src/protocols/l4/ext.rs b/pingora-core/src/protocols/l4/ext.rs index 5123bdffa..4b65f84f3 100644 --- a/pingora-core/src/protocols/l4/ext.rs +++ b/pingora-core/src/protocols/l4/ext.rs @@ -27,6 +27,8 @@ use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use tokio::net::{TcpSocket, TcpStream, UnixStream}; +use crate::connectors::l4::BindTo; + /// The (copy of) the kernel struct tcp_info returns #[repr(C)] #[derive(Copy, Clone, Debug)] @@ -160,9 +162,12 @@ fn cvt_linux_error(t: i32) -> io::Result { #[cfg(target_os = "linux")] fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> { - const IP_BIND_ADDRESS_NO_PORT: i32 = 24; - - set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int) + set_opt( + fd, + libc::IPPROTO_IP, + libc::IP_BIND_ADDRESS_NO_PORT, + val as c_int, + ) } #[cfg(not(target_os = "linux"))] @@ -170,6 +175,26 @@ fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> { Ok(()) } +/// IP_LOCAL_PORT_RANGE is only supported on Linux 6.3 and higher, +/// ip_local_port_range() is a no-op on unsupported versions. +/// See the [man page](https://man7.org/linux/man-pages/man7/ip.7.html) for more details. +#[cfg(target_os = "linux")] +fn ip_local_port_range(fd: RawFd, low: u16, high: u16) -> io::Result<()> { + const IP_LOCAL_PORT_RANGE: i32 = 51; + let range: u32 = (low as u32) | ((high as u32) << 16); + + let result = set_opt(fd, libc::IPPROTO_IP, IP_LOCAL_PORT_RANGE, range as c_int); + match result { + Err(e) if e.raw_os_error() != Some(libc::ENOPROTOOPT) => Err(e), + _ => Ok(()), // no error or ENOPROTOOPT + } +} + +#[cfg(not(target_os = "linux"))] +fn ip_local_port_range(_fd: RawFd, _low: u16, _high: u16) -> io::Result<()> { + Ok(()) +} + #[cfg(target_os = "linux")] fn set_so_keepalive(fd: RawFd, val: bool) -> io::Result<()> { set_opt(fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE, val as c_int) @@ -310,14 +335,42 @@ pub fn get_socket_cookie(_fd: RawFd) -> io::Result { Ok(0) // SO_COOKIE is a Linux concept } -/// connect() to the given address while optionally binding to the specific source address. +/// connect() to the given address while optionally binding to the specific source address and port range. /// /// The `set_socket` callback can be used to tune the socket before `connect()` is called. /// +/// If a [`BindTo`] is set with a port range and fallback setting enabled this function will retry +/// on EADDRNOTAVAIL ignoring the port range. +/// /// `IP_BIND_ADDRESS_NO_PORT` is used. -pub(crate) async fn connect_with Result<()>>( +/// `IP_LOCAL_PORT_RANGE` is used if a port range is set on [`BindTo`]. +pub(crate) async fn connect_with Result<()> + Clone>( addr: &SocketAddr, - bind_to: Option<&SocketAddr>, + bind_to: Option<&BindTo>, + set_socket: F, +) -> Result { + if bind_to.as_ref().map_or(false, |b| b.will_fallback()) { + // if we see an EADDRNOTAVAIL error clear the port range and try again + let connect_result = inner_connect_with(addr, bind_to, set_socket.clone()).await; + if let Err(e) = connect_result.as_ref() { + if matches!(e.etype(), BindError) { + let mut new_bind_to = BindTo::default(); + new_bind_to.addr = bind_to.as_ref().and_then(|b| b.addr); + // reset the port range + new_bind_to.set_port_range(None).unwrap(); + return inner_connect_with(addr, Some(&new_bind_to), set_socket).await; + } + } + connect_result + } else { + // not retryable + inner_connect_with(addr, bind_to, set_socket).await + } +} + +async fn inner_connect_with Result<()>>( + addr: &SocketAddr, + bind_to: Option<&BindTo>, set_socket: F, ) -> Result { let socket = if addr.is_ipv4() { @@ -328,14 +381,23 @@ pub(crate) async fn connect_with Result<()>>( .or_err(SocketError, "failed to create socket")?; if cfg!(target_os = "linux") { - ip_bind_addr_no_port(socket.as_raw_fd(), true) - .or_err(SocketError, "failed to set socket opts")?; - - if let Some(baddr) = bind_to { - socket - .bind(*baddr) - .or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?; - }; + ip_bind_addr_no_port(socket.as_raw_fd(), true).or_err( + SocketError, + "failed to set socket opts IP_BIND_ADDRESS_NO_PORT", + )?; + + if let Some(bind_to) = bind_to { + if let Some((low, high)) = bind_to.port_range() { + ip_local_port_range(socket.as_raw_fd(), low, high) + .or_err(SocketError, "failed to set socket opts IP_LOCAL_PORT_RANGE")?; + } + + if let Some(baddr) = bind_to.addr { + socket + .bind(baddr) + .or_err_with(BindError, || format!("failed to bind to socket {}", baddr))?; + } + } } // TODO: add support for bind on other platforms @@ -349,8 +411,9 @@ pub(crate) async fn connect_with Result<()>>( /// connect() to the given address while optionally binding to the specific source address. /// -/// `IP_BIND_ADDRESS_NO_PORT` is used. -pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result { +/// `IP_BIND_ADDRESS_NO_PORT` is used +/// `IP_LOCAL_PORT_RANGE` is used if a port range is set on [`BindTo`]. +pub async fn connect(addr: &SocketAddr, bind_to: Option<&BindTo>) -> Result { connect_with(addr, bind_to, |_| Ok(())).await } @@ -365,7 +428,8 @@ fn wrap_os_connect_error(e: std::io::Error, context: String) -> Box { match e.kind() { ErrorKind::ConnectionRefused => Error::because(ConnectRefused, context, e), ErrorKind::TimedOut => Error::because(ConnectTimedout, context, e), - ErrorKind::PermissionDenied | ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => { + ErrorKind::AddrNotAvailable => Error::because(BindError, context, e), + ErrorKind::PermissionDenied | ErrorKind::AddrInUse => { Error::because(InternalError, context, e) } _ => match e.raw_os_error() { diff --git a/pingora-core/src/server/configuration/mod.rs b/pingora-core/src/server/configuration/mod.rs index 5174d4072..21428aa69 100644 --- a/pingora-core/src/server/configuration/mod.rs +++ b/pingora-core/src/server/configuration/mod.rs @@ -118,7 +118,7 @@ impl Default for ServerConf { /// Command-line options /// -/// Call `Opt::from_args()` to build this object from the process's command line arguments. +/// Call `Opt::parse_args()` to build this object from the process's command line arguments. #[derive(Parser, Debug, Default)] #[clap(name = "basic", long_about = None)] pub struct Opt { diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index c3659f2ae..d9e57ddc4 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -201,7 +201,7 @@ impl Server { /// independent services. /// /// Command line options can either be passed by parsing the command line arguments via - /// `Opt::from_args()`, or be generated by other means. + /// `Opt::parse_args()`, or be generated by other means. pub fn new(opt: impl Into>) -> Result { let opt = opt.into(); let (tx, rx) = watch::channel(false); diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index fad847125..78eb762e2 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -29,7 +29,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use crate::connectors::L4Connect; +use crate::connectors::{l4::BindTo, L4Connect}; use crate::protocols::l4::socket::SocketAddr; use crate::protocols::ConnFdReusable; use crate::protocols::TcpKeepalive; @@ -67,7 +67,7 @@ pub trait Peer: Display + Clone { fn tls(&self) -> bool; /// The SNI to send, if TLS is used fn sni(&self) -> &str; - /// To decide whether a [`Peer`] can use the connection established by another [`Peer`]. + /// To decide whether a [`Peer`] can use the connection established by another [`Peer`]. /// /// The connections to two peers are considered reusable to each other if their reuse hashes are /// the same @@ -110,8 +110,8 @@ pub trait Peer: Display + Clone { None => None, } } - /// Which local source address this connection should be bind to. - fn bind_to(&self) -> Option<&InetSocketAddr> { + /// Information about the local source address this connection should be bound to. + fn bind_to(&self) -> Option<&BindTo> { match self.get_peer_options() { Some(opt) => opt.bind_to.as_ref(), None => None, @@ -243,7 +243,7 @@ impl Peer for BasicPeer { !self.sni.is_empty() } - fn bind_to(&self) -> Option<&InetSocketAddr> { + fn bind_to(&self) -> Option<&BindTo> { None } @@ -294,7 +294,7 @@ impl Scheme { /// See [`Peer`] for the meaning of the fields #[derive(Clone, Debug)] pub struct PeerOptions { - pub bind_to: Option, + pub bind_to: Option, pub connection_timeout: Option, pub total_connection_timeout: Option, pub read_timeout: Option, @@ -365,7 +365,7 @@ impl PeerOptions { impl Display for PeerOptions { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - if let Some(b) = self.bind_to { + if let Some(b) = self.bind_to.as_ref() { write!(f, "bind_to: {:?},", b)?; } if let Some(t) = self.connection_timeout { diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index f872d82dc..890af982d 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -607,6 +607,8 @@ impl HttpProxy { }; if let Some(e) = final_error.as_ref() { + // If we have errored and are still holding a cache lock, release it. + session.cache.disable(NoCacheReason::InternalError); let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await; // final error will have > 0 status unless downstream connection is dead diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index c2499f594..02174c105 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -1077,9 +1077,6 @@ mod test_cache { init(); let url = "http://127.0.0.1:6148/sleep/test_cache_lock_network_error.txt"; - // FIXME: Dangling lock happens in this test because the first request aborted without - // properly release the lock. This is a bug - let task1 = tokio::spawn(async move { let res = reqwest::Client::new() .get(url)