Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions mgmtd/assets/beegfs-mgmtd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@
#
# interfaces = ["*"]

# Prefers an interfaces ipv6 addresses over ipv4.
# By default, ipv4 addresses are preferred. If the interface filter is given, the interface
# order has higher priority than the address family, which is sorted per interface.
# interfaces_prefer_ipv6 = false

# Force disable IPv6.
# ipv6_disable = false,

# Maximum number of outgoing connections per node.
# connection-limit = 12
Expand Down
5 changes: 5 additions & 0 deletions mgmtd/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ generate_structs! {
#[arg(value_parser = nic::NicFilter::parse)]
interfaces: Vec<NicFilter> = vec![],

/// Force disable IPv6.
#[arg(long)]
#[arg(num_args = 0..=1, default_missing_value = "true")]
ipv6_disable: bool = false,

/// Maximum number of outgoing BeeMsg connections per node. [default: 12]
#[arg(long)]
#[arg(value_name = "LIMIT")]
Expand Down
11 changes: 9 additions & 2 deletions mgmtd/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sqlite::{TransactionExt, check_affected_rows};
use sqlite_check::sql;
use std::fmt::Debug;
use std::future::Future;
use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic::{Code, Request, Response, Status};
Expand Down Expand Up @@ -210,7 +210,14 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
},
);

let serve_addr = shared::nic::select_bind_addr(ctx.info.user_config.grpc_port);
let serve_addr = SocketAddr::new(
if ctx.info.use_ipv6 {
Ipv6Addr::UNSPECIFIED.into()
} else {
Ipv4Addr::UNSPECIFIED.into()
},
ctx.info.user_config.grpc_port,
);

log::info!("Serving gRPC requests on {serve_addr}");

Expand Down
12 changes: 11 additions & 1 deletion mgmtd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sqlite::TransactionExt;
use sqlite_check::sql;
use std::collections::HashSet;
use std::future::Future;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
Expand All @@ -39,6 +40,7 @@ pub struct StaticInfo {
pub user_config: Config,
pub auth_secret: Option<AuthSecret>,
pub network_addrs: Vec<Nic>,
pub use_ipv6: bool,
}

/// Starts the management service.
Expand All @@ -59,7 +61,14 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
// Static configuration which doesn't change at runtime
let info = Box::leak(Box::new(info));

let beemsg_serve_addr = shared::nic::select_bind_addr(info.user_config.beemsg_port);
let beemsg_serve_addr = SocketAddr::new(
if info.use_ipv6 {
Ipv6Addr::UNSPECIFIED.into()
} else {
Ipv4Addr::UNSPECIFIED.into()
},
info.user_config.beemsg_port,
);

// UDP socket for in- and outgoing messages
let udp_socket = Arc::new(UdpSocket::bind(beemsg_serve_addr).await?);
Expand All @@ -69,6 +78,7 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
udp_socket.clone(),
info.user_config.connection_limit,
info.auth_secret,
info.use_ipv6,
);

let mut db = sqlite::Connections::new(info.user_config.db_file.as_path());
Expand Down
5 changes: 4 additions & 1 deletion mgmtd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use mgmtd::db::{self};
use mgmtd::license::LicenseVerifier;
use mgmtd::{StaticInfo, start};
use shared::journald_logger;
use shared::nic::check_ipv6;
use shared::types::AuthSecret;
use std::backtrace::{Backtrace, BacktraceStatus};
use std::fmt::Write;
Expand Down Expand Up @@ -97,7 +98,8 @@ doc.beegfs.io.",
None
};

let network_addrs = shared::nic::query_nics(&user_config.interfaces)?;
let use_ipv6 = check_ipv6(user_config.beemsg_port, !user_config.ipv6_disable);
let network_addrs = shared::nic::query_nics(&user_config.interfaces, use_ipv6)?;

// Configure the tokio runtime
let rt = tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -136,6 +138,7 @@ doc.beegfs.io.",
// Start the actual daemon
let run = start(
StaticInfo {
use_ipv6,
user_config,
auth_secret,
network_addrs,
Expand Down
11 changes: 11 additions & 0 deletions shared/src/conn/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Pool {
store: Store,
udp_socket: Arc<UdpSocket>,
auth_secret: Option<AuthSecret>,
use_ipv6: bool,
}

impl Pool {
Expand All @@ -34,11 +35,13 @@ impl Pool {
udp_socket: Arc<UdpSocket>,
connection_limit: usize,
auth_secret: Option<AuthSecret>,
use_ipv6: bool,
) -> Self {
Self {
store: Store::new(connection_limit),
auth_secret,
udp_socket,
use_ipv6,
}
}

Expand Down Expand Up @@ -118,6 +121,10 @@ impl Pool {
log::debug!("Connecting new stream to {node_uid:?}");

for addr in addrs.iter() {
if addr.is_ipv6() && !self.use_ipv6 {
continue;
}

match Stream::connect_tcp(addr).await {
Ok(stream) => {
let mut stream = StoredStream::from_stream(stream, permit);
Expand Down Expand Up @@ -210,6 +217,10 @@ impl Pool {

let mut errs = vec![];
for addr in addrs.iter() {
if addr.is_ipv6() && !self.use_ipv6 {
continue;
}

if let Err(err) = buf.send_to_socket(&self.udp_socket, addr).await {
log::debug!(
"Sending datagram to node with uid {node_uid} using {addr} failed: {err}"
Expand Down
27 changes: 18 additions & 9 deletions shared/src/nic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::types::NicType;
use anyhow::{Result, anyhow};
use serde::Deserializer;
use serde::de::{Unexpected, Visitor};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, Ipv6Addr};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::str::FromStr;

Expand Down Expand Up @@ -187,11 +187,15 @@ impl Ord for Nic {
///
/// Only interfaces matching one of the given names in `filter` will be returned, unless the list
/// is empty.
pub fn query_nics(filter: &[NicFilter]) -> Result<Vec<Nic>> {
pub fn query_nics(filter: &[NicFilter], use_ipv6: bool) -> Result<Vec<Nic>> {
let mut filtered_nics = vec![];

for interface in pnet_datalink::interfaces() {
for ip in interface.ips {
if !use_ipv6 && ip.is_ipv6() {
continue;
}

if let Some(priority) = nic_priority(filter, &interface.name, &ip.ip()) {
filtered_nics.push(Nic {
name: interface.name.clone(),
Expand All @@ -208,17 +212,22 @@ pub fn query_nics(filter: &[NicFilter]) -> Result<Vec<Nic>> {
Ok(filtered_nics)
}

/// Selects address to bind to for listening: Checks if IPv6 sockets are available on this host
/// Checks if IPv6 sockets are available on this host
/// according to our rules: IPv6 must be enabled during boot and at runtime, and IPv6 sockets must
/// be dual stack. Then it returns `::` (IPv6), otherwise `0.0.0.0` (IPv4).
pub fn select_bind_addr(port: u16) -> SocketAddr {
/// be dual stack.
pub fn check_ipv6(port: u16, use_ipv6: bool) -> bool {
if !use_ipv6 {
log::info!("IPv6 is disabled by the configuration, falling back to IPv4 sockets");
return false;
}

// SAFETY: Any data used in the libc calls is local only
unsafe {
// Check if IPv6 socket can be created
let sock = libc::socket(libc::AF_INET6, libc::SOCK_STREAM, 0);
if sock < 0 {
log::info!("IPv6 is unavailable on this host, falling back to IPv4 sockets");
return SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port);
return false;
}
// Make sure the socket is closed on drop
let sock = OwnedFd::from_raw_fd(sock);
Expand All @@ -244,7 +253,7 @@ pub fn select_bind_addr(port: u16) -> SocketAddr {

if res < 0 && std::io::Error::last_os_error().raw_os_error() == Some(libc::EADDRNOTAVAIL) {
log::info!("IPv6 is disabled on this host, falling back to IPv4 sockets");
return SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port);
return false;
}

// Check if dual stack sockets are enabled by querying the socket option
Expand All @@ -263,11 +272,11 @@ pub fn select_bind_addr(port: u16) -> SocketAddr {
log::info!(
"IPv6 dual stack sockets are unavailable on this host, falling back to IPv4 sockets"
);
return SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port);
return false;
}
}

SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), port)
true
}

#[cfg(test)]
Expand Down
Loading