Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions mgmtd/src/bee_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ pub async fn notify_nodes<M: Msg + Serializable>(
) {
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");

if let Err(err) = async {
for t in node_types {
for t in node_types {
if let Err(err) = async {
let nodes = ctx
.db
.read_tx(move |tx| db::node::get_with_type(tx, *t))
Expand All @@ -206,12 +206,12 @@ pub async fn notify_nodes<M: Msg + Serializable>(
ctx.conn
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
.await?;
}

Ok(()) as Result<_>
}
.await
{
log::error!("Notification could not be sent to all nodes: {err:#}");
Ok(()) as Result<_>
}
.await
{
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
}
}
}
21 changes: 17 additions & 4 deletions mgmtd/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sqlite::{TransactionExt, check_affected_rows};
use sqlite_check::sql;
use std::fmt::Debug;
use std::future::Future;
use std::net::SocketAddr;
use std::net::{SocketAddr, TcpListener};
use std::pin::Pin;
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic::{Code, Request, Response, Status};
Expand Down Expand Up @@ -181,13 +181,12 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
builder
};

let serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);

let ctx2 = ctx.clone();
let service = pm::management_server::ManagementServer::with_interceptor(
ManagementService { ctx: ctx.clone() },
move |req: Request<()>| {
// If authentication is enabled, require the secret passed with every request
if let Some(required_secret) = ctx.info.auth_secret {
if let Some(required_secret) = ctx2.info.auth_secret {
let check = || -> Result<()> {
let Some(request_secret) = req.metadata().get("auth-secret") else {
bail!("Request requires authentication but no secret was provided")
Expand All @@ -211,6 +210,20 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
},
);

let mut serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);

// Test for IPv6 available, fall back to IPv4 sockets if not
match TcpListener::bind(serve_addr) {
Ok(_) => {}
Err(err) if err.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
log::debug!("gRPC: IPv6 not available, falling back to IPv4 sockets");
serve_addr = SocketAddr::new("0.0.0.0".parse()?, ctx.info.user_config.grpc_port);
}
Err(err) => {
anyhow::bail!(err);
}
}

log::info!("Serving gRPC requests on {serve_addr}");

tokio::spawn(async move {
Expand Down
27 changes: 18 additions & 9 deletions mgmtd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sqlite::TransactionExt;
use sqlite_check::sql;
use std::collections::HashSet;
use std::future::Future;
use std::net::SocketAddr;
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -60,14 +60,22 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
// Static configuration which doesn't change at runtime
let info = Box::leak(Box::new(info));

let mut beemsg_serve_addr = SocketAddr::new("::".parse()?, info.user_config.beemsg_port);

// Test for IPv6 available, fall back to IPv4 sockets if not
match TcpListener::bind(beemsg_serve_addr) {
Ok(_) => {}
Err(err) if err.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
log::debug!("BeeMsg: IPv6 not available, falling back to IPv4 sockets");
beemsg_serve_addr = SocketAddr::new("0.0.0.0".parse()?, info.user_config.beemsg_port);
}
Err(err) => {
anyhow::bail!(err);
}
}

// UDP socket for in- and outgoing messages
let udp_socket = Arc::new(
UdpSocket::bind(SocketAddr::new(
"::0".parse()?,
info.user_config.beemsg_port,
))
.await?,
);
let udp_socket = Arc::new(UdpSocket::bind(beemsg_serve_addr).await?);

// Node address store and connection pool
let conn_pool = Pool::new(
Expand Down Expand Up @@ -121,8 +129,9 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
);

// Listen for incoming TCP connections
// Fall back to ipv4 socket if ipv6 is not available
incoming::listen_tcp(
SocketAddr::new("::0".parse()?, ctx.info.user_config.beemsg_port),
beemsg_serve_addr,
ctx.clone(),
info.auth_secret.is_some(),
run_state.clone(),
Expand Down
30 changes: 26 additions & 4 deletions shared/src/conn/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl Pool {
Ok(())
}

/// Broadcasts a BeeMsg datagram to all given nodes using all their known addresses
///
/// Logs errors if sending failed completely for a node, only fails if serialization fails.
/// Remember that this is UDP and thus no errors only means that the sending was successful,
/// not that the messages reached their destinations.
pub async fn broadcast_datagram<M: Msg + Serializable>(
&self,
peers: impl IntoIterator<Item = Uid>,
Expand All @@ -194,12 +199,29 @@ impl Pool {
buf.serialize_msg(msg)?;

for node_uid in peers {
let Some(addrs) = self.store.get_node_addrs(node_uid) else {
bail!("No network address found for node with uid {node_uid:?}");
};
let addrs = self.store.get_node_addrs(node_uid).unwrap_or_default();

if addrs.is_empty() {
log::error!(
"Failed to send datagram to node with uid {node_uid}: No known addresses"
);
continue;
}

let mut errs = vec![];
for addr in addrs.iter() {
buf.send_to_socket(&self.udp_socket, addr).await?;
if let Err(err) = buf.send_to_socket(&self.udp_socket, addr).await {
log::debug!(
"Sending datagram to node with uid {node_uid} using {addr} failed: {err}"
);
errs.push((addr, err));
}
}

if errs.len() == addrs.len() {
log::error!(
"Failed to send datagram to node with uid {node_uid} on all known addresses: {errs:?}"
);
}
}

Expand Down
Loading