-
I'm trying to use Rust-libp2p with webrtc-direct transport and relay behaviour as the relay server for browser clients. (As browser-to-browser is not yet supported in rust-wasm). the js relay uses ws as a transport and has only const PUBSUB_PEER_DISCOVERY = "peer-discovery/1.0.0";
const server = await createLibp2p({
addresses: {
listen: ["/ip4/127.0.0.1/tcp/0/ws"],
},
transports: [
webSockets({
filter: filters.all,
}),
],
connectionEncryption: [noise()],
streamMuxers: [yamux(), mplex()],
services: {
identify: identify(),
pubsub: gossipsub(),
relay: circuitRelayServer({
reservations: {
maxReservations: Infinity,
},
}),
},
peerDiscovery: [
pubsubPeerDiscovery({
// Every 10 seconds publish our multiaddrs
interval: 10_000,
// The topic that the relay is also subscribed to
topics: [PUBSUB_PEER_DISCOVERY],
}),
],
connectionManager: {
minConnections: 0,
},
}); When using the browser code and adding WebRTCDirect and my own rust server as bootnode. i can see in the browser that connecting to the rust peer works, but the relay protocol is not returning the relayed address, shown below by the empty listening addresses. It also does not find the other browser peer which is also connected to the rust peer. This is the screen with the rust node. This is what i would expect from using my rust relay server code is below, how is it misconfigured? mod config;
mod topic;
use anyhow::Context;
use libp2p::{
core::muxing::StreamMuxerBox,
futures::StreamExt,
gossipsub, identify,
multiaddr::{Multiaddr, Protocol},
ping, relay,
swarm::{NetworkBehaviour, SwarmEvent},
StreamProtocol, Transport,
};
use libp2p_webrtc as webrtc;
use std::time::Duration;
#[derive(NetworkBehaviour)]
struct Behaviour {
gossibsub: gossipsub::Behaviour,
relay: relay::Behaviour,
kademlia: libp2p::kad::Behaviour<libp2p::kad::store::MemoryStore>,
ping: ping::Behaviour,
identify: identify::Behaviour,
}
const IDENTIFY_PROTOCOL_VERSION: &str = "/ipfs/id/1.0.0";
const PUBSUB_PEER_DISCOVERY: &str = "peer-discovery/1.0.0";
const KADEMLIA_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt().try_init();
let config = config::Config::parse()?;
let local_key = read_or_create_identity(std::path::Path::new(&config.local_key_path))
.await
.context("Failed to read identity")?;
let local_peer_id = local_key.public().to_peer_id();
let webrtc_cert = read_or_create_certificate(std::path::Path::new(&config.cert_path))
.await
.context("Failed to read certificate")?;
let identify_config = identify::Behaviour::new(
identify::Config::new(IDENTIFY_PROTOCOL_VERSION.to_string(), local_key.public())
.with_interval(std::time::Duration::from_secs(60)), // do this so we can get timeouts for dropped WebRTC connections
);
let relay_config = relay::Behaviour::new(
local_peer_id,
relay::Config {
max_reservations: usize::MAX,
max_reservations_per_peer: 100,
reservation_rate_limiters: Vec::default(),
circuit_src_rate_limiters: Vec::default(),
max_circuits: usize::MAX,
max_circuits_per_peer: 100,
..Default::default()
},
);
use std::hash::{Hash, Hasher};
let message_id_fn = |message: &gossipsub::Message| {
let mut s = std::hash::DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let gossipsub_config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Permissive)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.flood_publish(true)
.build()
.expect("Valid config");
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(local_key.clone()),
gossipsub_config,
)
.expect("Correct configuration");
let cfg = libp2p::kad::Config::new(KADEMLIA_PROTOCOL_NAME);
let store = libp2p::kad::store::MemoryStore::new(local_peer_id);
let kad_behaviour = libp2p::kad::Behaviour::with_config(local_peer_id, store, cfg);
let behaviour = Behaviour {
gossibsub: gossipsub,
kademlia: kad_behaviour,
identify: identify_config,
relay: relay_config,
ping: ping::Behaviour::default(),
};
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_key)
.with_tokio()
.with_other_transport(|id_keys| {
Ok(webrtc::tokio::Transport::new(id_keys.clone(), webrtc_cert)
.map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))))
})?
.with_behaviour(|_| behaviour)?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(
Duration::from_secs(u64::MAX), // Allows us to observe the pings.
)
})
.build();
swarm
.behaviour_mut()
.gossibsub
.subscribe(&gossipsub::IdentTopic::new(PUBSUB_PEER_DISCOVERY))
.expect("topic subscribe failed");
let address_webrtc = Multiaddr::from(config.listen_address)
.with(Protocol::Udp(config.ports.webrtc))
.with(Protocol::WebRTCDirect);
swarm.listen_on(address_webrtc.clone())?;
loop {
tokio::select! {
swarm_event = swarm.next() => {
match swarm_event.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
if let Some(external_ip) = config.external_address {
let external_address = address
.replace(0, |_| Some(external_ip.into()))
.expect("address.len > 1 and we always return `Some`");
swarm.add_external_address(external_address);
}
let p2p_address = address.with(Protocol::P2p(*swarm.local_peer_id()));
log::info!("Listening on {p2p_address}");
}
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
log::info!("Connected to {peer_id}");
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
log::warn!("Failed to dial {peer_id:?}: {error}");
}
SwarmEvent::IncomingConnectionError { error, .. } => {
log::warn!("{:#}", error)
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
log::warn!("Connection to {peer_id} closed: {cause:?}")
}
SwarmEvent::Behaviour(BehaviourEvent::Relay(e)) => {
log::info!("{:?}", e);
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Error { peer_id, error, .. })) => {
match error {
libp2p::swarm::StreamUpgradeError::Timeout => {
// When a browser tab closes, we don't get a swarm event
// maybe there's a way to get this with TransportEvent
// but for now remove the peer from routing table if there's an Identify timeout
swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
log::warn!("Removed {peer_id} from the routing table (if it was in there).");
}
_ => {
log::error!("Received identify error {error:?}");
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Sent { peer_id, .. })) => {
println!("Sent identify info to {peer_id:?}")
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. })) => {
println!("Add to {info:?}");
let identify::Info {
listen_addrs,
protocols,
observed_addr,
..
} = info;
swarm.add_external_address(observed_addr.clone());
protocols.iter().for_each(|p| log::info!("protocol {}", p));
if protocols.iter().any(|p| p == &KADEMLIA_PROTOCOL_NAME) {
for addr in listen_addrs {
log::info!("identify::Event::Received listen addr: {}", addr);
// TODO (fixme): the below doesn't work because the address is still missing /webrtc/p2p even after https://github.com/libp2p/js-libp2p-webrtc/pull/121
// swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
let webrtc_address = addr
.with(Protocol::WebRTCDirect)
.with(Protocol::P2p(peer_id));
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, webrtc_address.clone());
log::info!("Added {webrtc_address} to the routing table.");
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(e)) => {
log::info!("BehaviourEvent::Identify {:?}", e)
}
SwarmEvent::Behaviour(BehaviourEvent::Gossibsub(e)) => {
log::info!("BehaviourEvent::Gossibsub {:?}", e)
}
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(e)) => {
log::info!("BehaviourEvent::Kademlia {:?}", e)
}
event => {
log::info!("Other type of event: {:?}", event);
}
}
},
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
Ok(())
}
async fn read_or_create_certificate(
path: &std::path::Path,
) -> anyhow::Result<libp2p_webrtc::tokio::Certificate> {
if path.exists() {
let pem = tokio::fs::read_to_string(&path).await?;
log::info!("Using existing certificate from {}", path.display());
return Ok(libp2p_webrtc::tokio::Certificate::from_pem(&pem)?);
}
let cert = libp2p_webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?;
tokio::fs::write(&path, &cert.serialize_pem().as_bytes()).await?;
log::info!(
"Generated new certificate and wrote it to {}",
path.display()
);
Ok(cert)
}
async fn read_or_create_identity(
path: &std::path::Path,
) -> anyhow::Result<libp2p::identity::Keypair> {
if path.exists() {
let bytes = tokio::fs::read(&path).await?;
log::info!("Using existing identity from {}", path.display());
return Ok(libp2p::identity::Keypair::from_protobuf_encoding(&bytes)?); // This only works for ed25519 but that is what we are using.
}
let identity = libp2p::identity::Keypair::generate_ed25519();
tokio::fs::write(&path, &identity.to_protobuf_encoding()?).await?;
log::info!("Generated new identity and wrote it to {}", path.display());
Ok(identity)
} The logs:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Your code looks fine from a glance but you would likely need to provide additional logs to the tool youre using to see what it is reporting when attempting to use the node as a relay and maybe from the local node too since Do note, during my testing with using webrtc (or specifically, webrtc-direct) as a relay, I would get errors when attempting to connect to a peer through that relay. Likely due to the design and the lack of browser-to-browser, in which case I would suggest using the websocket transport if you need the relay until such feature is implemented for the webrtc transport. See #5453. |
Beta Was this translation helpful? Give feedback.
-
you were right, webrtc was the problem here. This is my updated code using websockets as a transport: mod config;
mod topic;
use anyhow::Context;
use libp2p::{
futures::StreamExt,
gossipsub, identify,
multiaddr::{Multiaddr, Protocol},
noise, ping, relay,
swarm::{NetworkBehaviour, SwarmEvent},
tls, yamux, StreamProtocol,
};
use std::time::Duration;
#[derive(NetworkBehaviour)]
struct Behaviour {
gossibsub: gossipsub::Behaviour,
relay: relay::Behaviour,
kademlia: libp2p::kad::Behaviour<libp2p::kad::store::MemoryStore>,
ping: ping::Behaviour,
identify: identify::Behaviour,
}
const IDENTIFY_PROTOCOL_VERSION: &str = "/ipfs/id/1.0.0";
const PUBSUB_PEER_DISCOVERY: &str = "peer-discovery/1.0.0";
const KADEMLIA_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt().try_init();
let config = config::Config::parse()?;
let local_key = read_or_create_identity(std::path::Path::new(&config.local_key_path))
.await
.context("Failed to read identity")?;
let local_peer_id = local_key.public().to_peer_id();
let identify_config = identify::Behaviour::new(
identify::Config::new(IDENTIFY_PROTOCOL_VERSION.to_string(), local_key.public())
.with_interval(std::time::Duration::from_secs(60)), // do this so we can get timeouts for dropped WebRTC connections
);
let relay_config = relay::Behaviour::new(
local_peer_id,
relay::Config {
max_reservations: usize::MAX,
max_reservations_per_peer: 100,
reservation_rate_limiters: Vec::default(),
circuit_src_rate_limiters: Vec::default(),
max_circuits: usize::MAX,
max_circuits_per_peer: 100,
..Default::default()
},
);
use std::hash::{Hash, Hasher};
let message_id_fn = |message: &gossipsub::Message| {
let mut s = std::hash::DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let gossipsub_config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Permissive)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.flood_publish(true)
.build()
.expect("Valid config");
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(local_key.clone()),
gossipsub_config,
)
.expect("Correct configuration");
let cfg = libp2p::kad::Config::new(KADEMLIA_PROTOCOL_NAME);
let store = libp2p::kad::store::MemoryStore::new(local_peer_id);
let kad_behaviour = libp2p::kad::Behaviour::with_config(local_peer_id, store, cfg);
let behaviour = Behaviour {
gossibsub: gossipsub,
kademlia: kad_behaviour,
identify: identify_config,
relay: relay_config,
ping: ping::Behaviour::default(),
};
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_key)
.with_tokio()
.with_tcp(
Default::default(),
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.unwrap()
.with_websocket(
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.await?
.with_behaviour(|_| behaviour)?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(
Duration::from_secs(u64::MAX), // Allows us to observe the pings.
)
})
.build();
swarm
.behaviour_mut()
.gossibsub
.subscribe(&gossipsub::IdentTopic::new(PUBSUB_PEER_DISCOVERY))
.expect("topic subscribe failed");
let address_websocket = Multiaddr::from(config.listen_address)
.with(Protocol::Tcp(config.ports.tcp))
.with(Protocol::Ws(std::borrow::Cow::Borrowed("/")));
log::info!("listening on {address_websocket}");
swarm.listen_on(address_websocket.clone())?;
loop {
tokio::select! {
swarm_event = swarm.next() => {
match swarm_event.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
if let Some(external_ip) = config.external_address {
let external_address = address
.replace(0, |_| Some(external_ip.into()))
.expect("address.len > 1 and we always return `Some`");
swarm.add_external_address(external_address);
}
let p2p_address = address.with(Protocol::P2p(*swarm.local_peer_id()));
log::info!("Listening on {p2p_address}");
}
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
log::info!("Connected to {peer_id}");
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
log::warn!("Failed to dial {peer_id:?}: {error}");
}
SwarmEvent::IncomingConnectionError { error, .. } => {
log::warn!("{:#}", error)
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
log::warn!("Connection to {peer_id} closed: {cause:?}")
}
SwarmEvent::Behaviour(BehaviourEvent::Relay(e)) => {
log::info!("{:?}", e);
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Error { peer_id, error, .. })) => {
match error {
libp2p::swarm::StreamUpgradeError::Timeout => {
// When a browser tab closes, we don't get a swarm event
// maybe there's a way to get this with TransportEvent
// but for now remove the peer from routing table if there's an Identify timeout
swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
log::warn!("Removed {peer_id} from the routing table (if it was in there).");
}
_ => {
log::error!("Received identify error {error:?}");
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Sent { peer_id, .. })) => {
println!("Sent identify info to {peer_id:?}")
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. })) => {
println!("Add to {info:?} from {peer_id:?}");
let identify::Info {
listen_addrs,
protocols,
observed_addr,
..
} = info;
swarm.add_external_address(observed_addr.clone());
protocols.iter().for_each(|p| log::info!("protocol {}", p));
if protocols.iter().any(|p| p == &KADEMLIA_PROTOCOL_NAME) {
for addr in listen_addrs {
log::info!("identify::Event::Received listen addr: {}", addr);
// TODO (fixme): the below doesn't work because the address is still missing /webrtc/p2p even after https://github.com/libp2p/js-libp2p-webrtc/pull/121
// swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
// let webrtc_address = addr
// .with(Protocol::WebRTCDirect)
// .with(Protocol::P2p(peer_id));
// swarm
// .behaviour_mut()
// .kademlia
// .add_address(&peer_id, webrtc_address.clone());
// log::info!("Added {webrtc_address} to the routing table.");
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(e)) => {
log::info!("BehaviourEvent::Identify {:?}", e)
}
SwarmEvent::Behaviour(BehaviourEvent::Gossibsub(e)) => {
log::info!("BehaviourEvent::Gossibsub {:?}", e)
}
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(e)) => {
log::info!("BehaviourEvent::Kademlia {:?}", e)
}
event => {
log::info!("Other type of event: {:?}", event);
}
}
},
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
Ok(())
}
async fn read_or_create_identity(
path: &std::path::Path,
) -> anyhow::Result<libp2p::identity::Keypair> {
if path.exists() {
let bytes = tokio::fs::read(&path).await?;
log::info!("Using existing identity from {}", path.display());
return Ok(libp2p::identity::Keypair::from_protobuf_encoding(&bytes)?); // This only works for ed25519 but that is what we are using.
}
let identity = libp2p::identity::Keypair::generate_ed25519();
tokio::fs::write(&path, &identity.to_protobuf_encoding()?).await?;
log::info!("Generated new identity and wrote it to {}", path.display());
Ok(identity)
} |
Beta Was this translation helpful? Give feedback.
Your code looks fine from a glance but you would likely need to provide additional logs to the tool youre using to see what it is reporting when attempting to use the node as a relay and maybe from the local node too since
NegotiationFailed
usually means the protocols could not be agreed on and might give more insight to what is going on internally as well as the other side.Do note, during my testing with using webrtc (or specifically, webrtc-direct) as a relay, I would get errors when attempting to connect to a peer through that relay. Likely due to the design and the lack of browser-to-browser, in which case I would suggest using the websocket transport if you need the relay until such…