Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync internal commits 2024-09-06 #75

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
28f94f2a402bbf66341bdac8fa670caf5b7311e9
2351cdf592f9986201d754e6ee1f37f493f69abb
1 change: 1 addition & 0 deletions pingora-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ lru = { workspace = true }
ahash = { workspace = true }
hex = "0.4"
httparse = { workspace = true }
strum = { version = "0.26", features = ["derive"] }

[dev-dependencies]
tokio-test = "0.4"
Expand Down
13 changes: 11 additions & 2 deletions pingora-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use key::{CacheHashKey, HashBinary};
use lock::WritePermit;
use pingora_error::Result;
use pingora_http::ResponseHeader;
use rustracing::tag::Tag;
use std::time::{Duration, Instant, SystemTime};
use trace::CacheTraceCTX;

Expand Down Expand Up @@ -499,6 +500,11 @@ impl HttpCache {
// from Stale: waited for cache lock, then retried and found asset was gone
CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
self.phase = CachePhase::Miss;
// It's possible that we've set the meta on lookup and have come back around
// here after not being able to acquire the cache lock, and our item has since
// purged or expired. We should be sure that the meta is not set in this case
// as there shouldn't be a meta set for cache misses.
self.inner_mut().meta = None;
self.inner_mut().traces.start_miss_span();
}
_ => panic!("wrong phase {:?}", self.phase),
Expand Down Expand Up @@ -1042,7 +1048,7 @@ impl HttpCache {
/// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
pub async fn cache_lock_wait(&mut self) -> LockStatus {
let inner = self.inner_mut();
let _span = inner.traces.child("cache_lock");
let mut span = inner.traces.child("cache_lock");
let lock = inner.lock.take(); // remove the lock from self
if let Some(Locked::Read(r)) = lock {
let now = Instant::now();
Expand All @@ -1054,7 +1060,10 @@ impl HttpCache {
.lock_duration
.map_or(lock_duration, |d| d + lock_duration),
);
r.lock_status() // TODO: tag the span with lock status
let status = r.lock_status();
let tag_value: &'static str = status.into();
span.set_tag(|| Tag::new("status", tag_value));
status
} else {
// should always call is_cache_locked() before this function
panic!("cache_lock_wait on wrong type of lock")
Expand Down
27 changes: 20 additions & 7 deletions pingora-cache/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ impl CacheLock {
}
}

use log::warn;
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::{Duration, Instant};
use strum::IntoStaticStr;
use tokio::sync::Semaphore;

/// Status which the read locks could possibly see.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)]
pub enum LockStatus {
/// Waiting for the writer to populate the asset
Waiting,
Expand Down Expand Up @@ -180,7 +182,7 @@ impl LockCore {
}

fn lock_status(&self) -> LockStatus {
self.lock_status.load(Ordering::Relaxed).into()
self.lock_status.load(Ordering::SeqCst).into()
}
}

Expand All @@ -197,11 +199,22 @@ impl ReadLock {
return;
}

// TODO: should subtract now - start so that the lock don't wait beyond start + timeout
// Also need to be careful not to wake everyone up at the same time
// TODO: need to be careful not to wake everyone up at the same time
// (maybe not an issue because regular cache lock release behaves that way)
let _ = timeout(self.0.timeout, self.0.lock.acquire()).await;
// permit is returned to Semaphore right away
if let Some(duration) = self.0.timeout.checked_sub(self.0.lock_start.elapsed()) {
match timeout(duration, self.0.lock.acquire()).await {
Ok(Ok(_)) => { // permit is returned to Semaphore right away
}
Ok(Err(e)) => {
warn!("error acquiring semaphore {e:?}")
}
Err(_) => {
self.0
.lock_status
.store(LockStatus::Timeout.into(), Ordering::SeqCst);
}
}
}
}

/// Test if it is still locked
Expand All @@ -211,7 +224,7 @@ impl ReadLock {

/// Whether the lock is expired, e.g., the writer has been holding the lock for too long
pub fn expired(&self) -> bool {
// NOTE: this whether the lock is currently expired
// NOTE: this is whether the lock is currently expired
// not whether it was timed out during wait()
self.0.lock_start.elapsed() >= self.0.timeout
}
Expand Down
206 changes: 195 additions & 11 deletions pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,59 @@ pub trait Connect: std::fmt::Debug {
async fn connect(&self, addr: &SocketAddr) -> Result<Stream>;
}

/// Settings for binding on connect
#[derive(Clone, Debug, Default)]
pub struct BindTo {
// local ip address
pub addr: Option<InetSocketAddr>,
// port range
port_range: Option<(u16, u16)>,
// whether we fallback and try again on bind errors when a port range is set
fallback: bool,
}

impl BindTo {
/// Sets the port range we will bind to where the first item in the tuple is the lower bound
/// and the second item is the upper bound.
///
/// Note this bind option is only supported on Linux since 6.3, this is a no-op on other systems.
/// To reset the range, pass a `None` or `Some((0,0))`, more information can be found [here](https://man7.org/linux/man-pages/man7/ip.7.html)
pub fn set_port_range(&mut self, range: Option<(u16, u16)>) -> Result<()> {
if range.is_none() && self.port_range.is_none() {
// nothing to do
return Ok(());
}

match range {
// 0,0 is valid for resets
None | Some((0, 0)) => self.port_range = Some((0, 0)),
// set the port range if valid
Some((low, high)) if low > 0 && low < high => {
self.port_range = Some((low, high));
}
_ => return Error::e_explain(SocketError, "invalid port range: {range}"),
}
Ok(())
}

/// Set whether we fallback on no address available if a port range is set
pub fn set_fallback(&mut self, fallback: bool) {
self.fallback = fallback
}

/// Configured bind port range
pub fn port_range(&self) -> Option<(u16, u16)> {
self.port_range
}

/// Whether we attempt to fallback on no address available
pub fn will_fallback(&self) -> bool {
self.fallback && self.port_range.is_some()
}
}

/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub(crate) async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
pub(crate) async fn connect<P>(peer: &P, bind_to: Option<BindTo>) -> Result<Stream>
where
P: Peer + Send + Sync,
{
Expand Down Expand Up @@ -142,12 +193,8 @@ pub(crate) fn bind_to_random<P: Peer>(
peer: &P,
v4_list: &[InetSocketAddr],
v6_list: &[InetSocketAddr],
) -> Option<InetSocketAddr> {
let selected = peer.get_peer_options().and_then(|o| o.bind_to);
if selected.is_some() {
return selected;
}

) -> Option<BindTo> {
// helper function for randomly picking address
fn bind_to_ips(ips: &[InetSocketAddr]) -> Option<InetSocketAddr> {
match ips.len() {
0 => None,
Expand All @@ -159,13 +206,31 @@ pub(crate) fn bind_to_random<P: Peer>(
}
}

match peer.address() {
let mut bind_to = peer.get_peer_options().and_then(|o| o.bind_to.clone());
if bind_to.as_ref().map(|b| b.addr).is_some() {
// already have a bind address selected
return bind_to;
}

let addr = match peer.address() {
SocketAddr::Inet(sockaddr) => match sockaddr {
InetSocketAddr::V4(_) => bind_to_ips(v4_list),
InetSocketAddr::V6(_) => bind_to_ips(v6_list),
},
SocketAddr::Unix(_) => None,
};

if addr.is_some() {
if let Some(bind_to) = bind_to.as_mut() {
bind_to.addr = addr;
} else {
bind_to = Some(BindTo {
addr,
..Default::default()
});
}
}
bind_to
}

use crate::protocols::raw_connect;
Expand Down Expand Up @@ -238,16 +303,25 @@ mod tests {
#[tokio::test]
async fn test_conn_error_addr_not_avail() {
let peer = HttpPeer::new("127.0.0.1:121".to_string(), false, "".to_string());
let new_session = connect(&peer, Some("192.0.2.2:0".parse().unwrap())).await;
let addr = "192.0.2.2:0".parse().ok();
let bind_to = BindTo {
addr,
..Default::default()
};
let new_session = connect(&peer, Some(bind_to)).await;
assert_eq!(new_session.unwrap_err().etype(), &InternalError)
}

#[tokio::test]
async fn test_conn_error_other() {
let peer = HttpPeer::new("240.0.0.1:80".to_string(), false, "".to_string()); // non localhost

let addr = "127.0.0.1:0".parse().ok();
// create an error: cannot send from src addr: localhost to dst addr: a public IP
let new_session = connect(&peer, Some("127.0.0.1:0".parse().unwrap())).await;
let bind_to = BindTo {
addr,
..Default::default()
};
let new_session = connect(&peer, Some(bind_to)).await;
let error = new_session.unwrap_err();
// XXX: some system will allow the socket to bind and connect without error, only to timeout
assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
Expand Down Expand Up @@ -371,4 +445,114 @@ mod tests {
assert_eq!(err.etype(), &ConnectionClosed);
assert!(!err.retry());
}

#[cfg(target_os = "linux")]
#[tokio::test(flavor = "multi_thread")]
async fn test_bind_to_port_range_on_connect() {
fn get_ip_local_port_range() -> (u16, u16) {
let path = "/proc/sys/net/ipv4/ip_local_port_range";
let file = std::fs::read_to_string(path).unwrap();
let mut parts = file.split_whitespace();
(
parts.next().unwrap().parse().unwrap(),
parts.next().unwrap().parse().unwrap(),
)
}

// one-off mock server
async fn mock_inet_connect_server() {
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:10020").await.unwrap();
if let Ok((mut stream, _addr)) = listener.accept().await {
stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await.unwrap();
// wait a bit so that the client can read
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}

fn in_port_range(session: Stream, lower: u16, upper: u16) -> bool {
let digest = session.get_socket_digest();
let local_addr = digest
.as_ref()
.and_then(|s| s.local_addr())
.unwrap()
.as_inet()
.unwrap();

// assert range
local_addr.port() >= lower && local_addr.port() <= upper
}

tokio::spawn(async {
mock_inet_connect_server().await;
});
// wait for the server to start
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// need to read /proc/sys/net/ipv4/ip_local_port_range for this test to work
// IP_LOCAL_PORT_RANGE clamp only works on ports in /proc/sys/net/ipv4/ip_local_port_range
let (low, _) = get_ip_local_port_range();
let high = low + 1;

let peer = HttpPeer::new("127.0.0.1:10020".to_string(), false, "".to_string());
let mut bind_to = BindTo {
addr: "127.0.0.1:0".parse().ok(),
..Default::default()
};
bind_to.set_port_range(Some((low, high))).unwrap();

let session1 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session1, low, high));

// execute more connect()
let session2 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session2, low, high));
let session3 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session3, low, high));

// disabled fallback, should be AddrNotAvailable error
let err = connect(&peer, Some(bind_to.clone())).await.unwrap_err();
assert_eq!(err.etype(), &InternalError);

// enable fallback, assert not in port range but successful
bind_to.set_fallback(true);
let session4 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(!in_port_range(session4, low, high));

// works without bind IP, shift up to use new ports
let low = low + 2;
let high = low + 1;
let mut bind_to = BindTo::default();
bind_to.set_port_range(Some((low, high))).unwrap();
let session5 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session5, low, high));
}

#[test]
fn test_bind_to_port_ranges() {
let addr = "127.0.0.1:0".parse().ok();
let mut bind_to = BindTo {
addr,
..Default::default()
};

// None because the previous value was None
bind_to.set_port_range(None).unwrap();
assert!(bind_to.port_range.is_none());

// zeroes are handled
bind_to.set_port_range(Some((0, 0))).unwrap();
assert_eq!(bind_to.port_range, Some((0, 0)));

// zeroes because the previous value was Some
bind_to.set_port_range(None).unwrap();
assert_eq!(bind_to.port_range, Some((0, 0)));

// low > high is error
assert!(bind_to.set_port_range(Some((2000, 1000))).is_err());

// low < high success
bind_to.set_port_range(Some((1000, 2000))).unwrap();
assert_eq!(bind_to.port_range, Some((1000, 2000)));
}
}
6 changes: 3 additions & 3 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::server::configuration::ServerConf;
use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};

use l4::connect as l4_connect;
pub use l4::Connect as L4Connect;
use l4::{connect as l4_connect, BindTo};
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;
Expand Down Expand Up @@ -273,7 +273,7 @@ impl TransportConnector {
// connection timeout if there is one
async fn do_connect<P: Peer + Send + Sync>(
peer: &P,
bind_to: Option<SocketAddr>,
bind_to: Option<BindTo>,
alpn_override: Option<ALPN>,
tls_ctx: &SslConnector,
) -> Result<Stream> {
Expand All @@ -296,7 +296,7 @@ async fn do_connect<P: Peer + Send + Sync>(
// Perform the actual L4 and tls connection steps with no timeout
async fn do_connect_inner<P: Peer + Send + Sync>(
peer: &P,
bind_to: Option<SocketAddr>,
bind_to: Option<BindTo>,
alpn_override: Option<ALPN>,
tls_ctx: &SslConnector,
) -> Result<Stream> {
Expand Down
Loading
Loading