Skip to content

Commit 1d7af88

Browse files
committed
fix: Fall back to IPv4 sockets if IPv6 is unavailable
Also fix broadcasting of UDP messages which before would abort on a send failure instead of just dumping the message to all known addresses. Now only debug logs and only errors if all sends failed.
1 parent d1710f5 commit 1d7af88

File tree

4 files changed

+44
-25
lines changed

4 files changed

+44
-25
lines changed

mgmtd/src/bee_msg.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ pub async fn notify_nodes<M: Msg + Serializable>(
196196
) {
197197
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");
198198

199-
if let Err(err) = async {
200-
for t in node_types {
199+
for t in node_types {
200+
if let Err(err) = async {
201201
let nodes = ctx
202202
.db
203203
.read_tx(move |tx| db::node::get_with_type(tx, *t))
@@ -206,12 +206,12 @@ pub async fn notify_nodes<M: Msg + Serializable>(
206206
ctx.conn
207207
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
208208
.await?;
209-
}
210209

211-
Ok(()) as Result<_>
212-
}
213-
.await
214-
{
215-
log::error!("Notification could not be sent to all nodes: {err:#}");
210+
Ok(()) as Result<_>
211+
}
212+
.await
213+
{
214+
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
215+
}
216216
}
217217
}

mgmtd/src/grpc.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use sqlite::{TransactionExt, check_affected_rows};
1616
use sqlite_check::sql;
1717
use std::fmt::Debug;
1818
use std::future::Future;
19-
use std::net::SocketAddr;
19+
use std::net::{SocketAddr, TcpListener};
2020
use std::pin::Pin;
2121
use tonic::transport::{Identity, Server, ServerTlsConfig};
2222
use tonic::{Code, Request, Response, Status};
@@ -181,13 +181,12 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
181181
builder
182182
};
183183

184-
let serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);
185-
184+
let ctx2 = ctx.clone();
186185
let service = pm::management_server::ManagementServer::with_interceptor(
187186
ManagementService { ctx: ctx.clone() },
188187
move |req: Request<()>| {
189188
// If authentication is enabled, require the secret passed with every request
190-
if let Some(required_secret) = ctx.info.auth_secret {
189+
if let Some(required_secret) = ctx2.info.auth_secret {
191190
let check = || -> Result<()> {
192191
let Some(request_secret) = req.metadata().get("auth-secret") else {
193192
bail!("Request requires authentication but no secret was provided")
@@ -211,6 +210,14 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
211210
},
212211
);
213212

213+
let mut serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);
214+
215+
// Test for IPv6 available
216+
if TcpListener::bind(serve_addr).is_err() {
217+
log::debug!("gRPC: IPv6 not available, falling back to IPv4 sockets");
218+
serve_addr = SocketAddr::new("0.0.0.0".parse()?, ctx.info.user_config.grpc_port);
219+
}
220+
214221
log::info!("Serving gRPC requests on {serve_addr}");
215222

216223
tokio::spawn(async move {

mgmtd/src/lib.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use sqlite::TransactionExt;
2727
use sqlite_check::sql;
2828
use std::collections::HashSet;
2929
use std::future::Future;
30-
use std::net::SocketAddr;
30+
use std::net::{SocketAddr, TcpListener};
3131
use std::sync::Arc;
3232
use tokio::net::UdpSocket;
3333
use tokio::sync::mpsc;
@@ -60,14 +60,16 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
6060
// Static configuration which doesn't change at runtime
6161
let info = Box::leak(Box::new(info));
6262

63+
let mut beemsg_serve_addr = SocketAddr::new("::".parse()?, info.user_config.beemsg_port);
64+
65+
// Test for IPv6 available, fall back to IPv4 sockets if not
66+
if TcpListener::bind(beemsg_serve_addr).is_err() {
67+
log::debug!("BeeMsg: IPv6 not available, falling back to IPv4 sockets");
68+
beemsg_serve_addr = SocketAddr::new("0.0.0.0".parse()?, info.user_config.beemsg_port);
69+
}
70+
6371
// UDP socket for in- and outgoing messages
64-
let udp_socket = Arc::new(
65-
UdpSocket::bind(SocketAddr::new(
66-
"::0".parse()?,
67-
info.user_config.beemsg_port,
68-
))
69-
.await?,
70-
);
72+
let udp_socket = Arc::new(UdpSocket::bind(beemsg_serve_addr).await?);
7173

7274
// Node address store and connection pool
7375
let conn_pool = Pool::new(
@@ -121,8 +123,9 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
121123
);
122124

123125
// Listen for incoming TCP connections
126+
// Fall back to ipv4 socket if ipv6 is not available
124127
incoming::listen_tcp(
125-
SocketAddr::new("::0".parse()?, ctx.info.user_config.beemsg_port),
128+
beemsg_serve_addr,
126129
ctx.clone(),
127130
info.auth_secret.is_some(),
128131
run_state.clone(),

shared/src/conn/outgoing.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,21 @@ impl Pool {
194194
buf.serialize_msg(msg)?;
195195

196196
for node_uid in peers {
197-
let Some(addrs) = self.store.get_node_addrs(node_uid) else {
198-
bail!("No network address found for node with uid {node_uid:?}");
199-
};
197+
let addrs = self.store.get_node_addrs(node_uid).unwrap_or_default();
200198

199+
let mut sent = false;
201200
for addr in addrs.iter() {
202-
buf.send_to_socket(&self.udp_socket, addr).await?;
201+
if let Err(err) = buf.send_to_socket(&self.udp_socket, addr).await {
202+
log::debug!("Sending datagram to {addr} failed: {err}");
203+
continue;
204+
}
205+
sent = true;
206+
}
207+
208+
if !sent {
209+
log::error!(
210+
"Failed to send datagram to node with uid {node_uid}: Sending failed for all known addresses"
211+
);
203212
}
204213
}
205214

0 commit comments

Comments
 (0)