Skip to content

Commit e54f466

Browse files
feat: connect to bootnodes and request peer data (#118)
**Motivation** Build the functionality to join the network **Description** Closes #86 --------- Co-authored-by: Tomás Grüner <47506558+MegaRedHand@users.noreply.github.com>
1 parent ff945e3 commit e54f466

File tree

5 files changed

+120
-46
lines changed

5 files changed

+120
-46
lines changed

crates/net/src/discv4.rs

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use ethereum_rust_core::rlp::{
66
structs::{self, Decoder, Encoder},
77
};
88
use ethereum_rust_core::{H256, H512};
9-
use k256::ecdsa::{signature::Signer, SigningKey};
9+
use k256::ecdsa::SigningKey;
1010
use std::net::IpAddr;
1111

1212
#[derive(Debug, Eq, PartialEq)]
@@ -25,7 +25,7 @@ pub(crate) enum Message {
2525
}
2626

2727
impl Message {
28-
pub fn encode_with_header(&self, buf: &mut dyn BufMut, node_signer: SigningKey) {
28+
pub fn encode_with_header(&self, buf: &mut dyn BufMut, node_signer: &SigningKey) {
2929
let signature_size = 65_usize;
3030
let mut data: Vec<u8> = Vec::with_capacity(signature_size.next_power_of_two());
3131
data.resize(signature_size, 0);
@@ -34,7 +34,9 @@ impl Message {
3434

3535
let digest = keccak_hash::keccak_buffer(&mut &data[signature_size..]).unwrap();
3636

37-
let (signature, recovery_id) = node_signer.try_sign(&digest.0).expect("failed to sign");
37+
let (signature, recovery_id) = node_signer
38+
.sign_prehash_recoverable(&digest.0)
39+
.expect("failed to sign");
3840
let b = signature.to_bytes();
3941

4042
data[..signature_size - 1].copy_from_slice(&b);
@@ -91,6 +93,11 @@ impl Message {
9193
_ => Err(RLPDecodeError::MalformedData),
9294
}
9395
}
96+
// TODO: we can put the hash, signature and the message (as payload) inside a new struct
97+
pub fn get_hash(encoded_msg: &[u8]) -> &[u8] {
98+
let hash_len = 32;
99+
&encoded_msg[..hash_len]
100+
}
94101

95102
fn packet_type(&self) -> u8 {
96103
match self {
@@ -142,7 +149,7 @@ pub(crate) struct PingMessage {
142149
/// The Ping message version. Should be set to 4, but mustn't be enforced.
143150
version: u8,
144151
/// The endpoint of the sender.
145-
from: Endpoint,
152+
pub from: Endpoint,
146153
/// The endpoint of the receiver.
147154
to: Endpoint,
148155
/// The expiration time of the message. If the message is older than this time,
@@ -346,14 +353,13 @@ mod tests {
346353

347354
let mut buf = Vec::new();
348355

349-
msg.encode_with_header(&mut buf, signer);
356+
msg.encode_with_header(&mut buf, &signer);
350357
let result = to_hex(&buf);
351-
let hash = "d9b83d9701c6481a99db908b19551c6b082bcb28d5bef44cfa55256bc7977500";
352-
let signature = "f0bff907b5c432e623ba5d3803d6a405bdbaffdfc0373499ac2a243ef3ab52de3a5312c0a9a96593979b746a4cd37ebdf21cf6971cf8c10c94f4d45c1a0f90dd00";
358+
let hash = "d2821841963050aa505c00d8e4fd2d016f95eff739b784e0e26587a58226738e";
359+
let signature = "8a73f13d613c0ba5148787bb52fd04eb984c3dae486bac19433adf658d29bbb352f3acf2d55f2bdae3afff5298723114581e3f34c37815b32b9195a3326dd68700";
353360
let pkt_type = "01";
354361
let encoded_message = "dd04cb840102030482064d8218dbc984ffff0205820bf780850400e78bba";
355362
let expected = [hash, signature, pkt_type, encoded_message].concat();
356-
357363
assert_eq!(result, expected);
358364
}
359365

@@ -370,17 +376,18 @@ mod tests {
370376
.unwrap();
371377
let enr_seq = 1704896740573;
372378
let msg = Message::Pong(PongMessage::new(to, ping_hash, expiration).with_enr_seq(enr_seq));
379+
373380
let key_bytes =
374381
H256::from_str("577d8278cc7748fad214b5378669b420f8221afb45ce930b7f22da49cbc545f3")
375382
.unwrap();
376383
let signer = SigningKey::from_slice(key_bytes.as_bytes()).unwrap();
377384

378385
let mut buf = Vec::new();
379386

380-
msg.encode_with_header(&mut buf, signer);
387+
msg.encode_with_header(&mut buf, &signer);
381388
let result = to_hex(&buf);
382-
let hash = "852ef38c2087413400cb33215709a8cfa6f274929e91704ec27a1ae4d226f85d";
383-
let signature = "a7ab61ec963f779d10918c9bc3c3243c05f45eabbd078e90bf78313904e1c91201a03e78a133c2676e1c2686601e70ab1ec7aa602ad7f65bb468e52367d7123c00";
389+
let hash = "9657e4e2db33b51cbbeb503bd195efcf081d6a83befbb42b4be95d0f7bf27ffe";
390+
let signature = "b1a91caa6105b941d3ecce052dcfea5e4f4290c9e6a89ff72707a8b5116ee87a1ea3fa0086990cd862a8a2347f346f1b118122a28bf2ed2ca371d2c493a86bde01";
384391
let pkt_type = "02";
385392
let msg = "f7c984bebfbc3982765f80a03e1bf98f025f98d54ed2f61bbef63b6b46f50e12d7b937d6bdea19afd640be2384667d9af086018cf3c3bcdd";
386393
let expected = [hash, signature, pkt_type, msg].concat();
@@ -407,14 +414,13 @@ mod tests {
407414

408415
let mut buf = Vec::new();
409416

410-
msg.encode_with_header(&mut buf, signer);
417+
msg.encode_with_header(&mut buf, &signer);
411418
let result = to_hex(&buf);
412-
let hash = "cccfa9bf8e49603f8cc5381579d435bd322d386091732e3da7f6b7df13172b92";
413-
let signature = "b1caebcd4d754552be21df4a100bd4ccd85e9d95b2e29b29db2df681c17c370068e410ea31e7106081c2ed39489c1762125cbd34477b41d940d230d1d3888a4101";
419+
let hash = "58a1d0ea66afd9617c198b60a7417637ae27b847b004dbebc1e29d4067327e35";
420+
let signature = "e1988832d7d7b73925ec584ff818ff3a7bffe1a84fe3835923c3ab17af40071f7c9263176203c80c6ed77f0586479b78884e9e47fdb3287d2aafa92348e5c16700";
414421
let pkt_type = "02";
415422
let msg = "f0c984bebfbc3982765f80a03e1bf98f025f98d54ed2f61bbef63b6b46f50e12d7b937d6bdea19afd640be2384667d9af0";
416423
let expected = [hash, signature, pkt_type, msg].concat();
417-
418424
assert_eq!(result, expected);
419425
}
420426

@@ -432,13 +438,14 @@ mod tests {
432438

433439
let mut buf = Vec::new();
434440

435-
msg.encode_with_header(&mut buf, signer);
441+
msg.encode_with_header(&mut buf, &signer);
436442
let result = to_hex(&buf);
437-
let hash = "dabe1b1a4dc26324120594091bb94dbdd4aa98326cbfecb60a4a67778ebb0e67";
438-
let signature = "28cf71e9a929fa29f4e528f5fdc96e25a3086a777c8967710ddc1ef5f456bae40808baf0840d0449ea5a17ba11bbb012719e679cf3cf8d137106884c393ef15b00";
443+
let hash = "23770430fc208bdc78bc77052bf7ec2e928b38c13c085b87491c15ebebb2050f";
444+
let signature = "7c98bb4759569117031a9fbbeb00314d018eba55135c65ee98dbf6871aaebe61225f36b36e4f60da5b5d6c917e3589dd235acfacc6de4dade116c4bb851b884b01";
439445
let pkt_type = "03";
440446
let encoded_message = "f848b840d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666850400e78bba";
441447
let expected = [hash, signature, pkt_type, encoded_message].concat();
448+
442449
assert_eq!(result, expected);
443450
}
444451

@@ -526,7 +533,7 @@ mod tests {
526533

527534
let mut buf = Vec::new();
528535

529-
msg.encode_with_header(&mut buf, signer.clone());
536+
msg.encode_with_header(&mut buf, &signer);
530537
let result = Message::decode_with_header(&buf).expect("Failed decoding PingMessage");
531538
assert_eq!(result, msg);
532539
}
@@ -556,7 +563,7 @@ mod tests {
556563

557564
let mut buf = Vec::new();
558565

559-
msg.encode_with_header(&mut buf, signer.clone());
566+
msg.encode_with_header(&mut buf, &signer);
560567
let result = Message::decode_with_header(&buf).expect("Failed decoding PingMessage");
561568
assert_eq!(result, msg);
562569
}
@@ -574,7 +581,7 @@ mod tests {
574581

575582
let mut buf = Vec::new();
576583

577-
msg.encode_with_header(&mut buf, signer);
584+
msg.encode_with_header(&mut buf, &signer);
578585
let result = Message::decode_with_header(&buf).unwrap();
579586
assert_eq!(result, msg);
580587
}

crates/net/src/lib.rs

Lines changed: 86 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,79 @@
11
pub(crate) mod discv4;
2-
2+
use discv4::{Endpoint, FindNodeMessage, Message, PingMessage, PongMessage};
3+
use ethereum_rust_core::H512;
4+
use k256::elliptic_curve::sec1::ToEncodedPoint;
5+
use k256::elliptic_curve::PublicKey;
6+
use k256::{ecdsa::SigningKey, elliptic_curve::rand_core::OsRng};
7+
use keccak_hash::H256;
38
use std::{
49
net::SocketAddr,
510
time::{Duration, SystemTime, UNIX_EPOCH},
611
};
7-
8-
use discv4::{Endpoint, Message, PingMessage};
9-
use k256::{ecdsa::SigningKey, elliptic_curve::rand_core::OsRng};
1012
use tokio::{
1113
net::{TcpSocket, UdpSocket},
1214
try_join,
1315
};
1416
use tracing::info;
17+
use types::BootNode;
1518
pub mod types;
1619

1720
const MAX_DISC_PACKET_SIZE: usize = 1280;
1821

19-
pub async fn start_network(udp_addr: SocketAddr, tcp_addr: SocketAddr) {
22+
pub async fn start_network(udp_addr: SocketAddr, tcp_addr: SocketAddr, bootnodes: Vec<BootNode>) {
2023
info!("Starting discovery service at {udp_addr}");
2124
info!("Listening for requests at {tcp_addr}");
2225

23-
let discovery_handle = tokio::spawn(discover_peers(udp_addr));
26+
let discovery_handle = tokio::spawn(discover_peers(udp_addr, bootnodes));
2427
let server_handle = tokio::spawn(serve_requests(tcp_addr));
2528
try_join!(discovery_handle, server_handle).unwrap();
2629
}
2730

28-
async fn discover_peers(udp_addr: SocketAddr) {
31+
async fn discover_peers(udp_addr: SocketAddr, bootnodes: Vec<BootNode>) {
2932
let udp_socket = UdpSocket::bind(udp_addr).await.unwrap();
30-
// This is just a placeholder example. The address is a known bootnode.
31-
let receiver_addr: SocketAddr = ("138.197.51.181:30303").parse().unwrap();
32-
let mut buf = vec![0; MAX_DISC_PACKET_SIZE];
33+
let signer = SigningKey::random(&mut OsRng);
34+
let bootnode = match bootnodes.first() {
35+
Some(b) => b,
36+
None => {
37+
return;
38+
}
39+
};
3340

34-
ping(&udp_socket, udp_addr, receiver_addr).await;
41+
ping(&udp_socket, udp_addr, bootnode.socket_address, &signer).await;
3542

36-
let (read, from) = udp_socket.recv_from(&mut buf).await.unwrap();
37-
info!("Received {read} bytes from {from}");
38-
let msg = Message::decode_with_header(&buf[..read]);
39-
info!("Message: {:?}", msg);
43+
let mut buf = vec![0; MAX_DISC_PACKET_SIZE];
44+
loop {
45+
let (read, from) = udp_socket.recv_from(&mut buf).await.unwrap();
46+
let packet_type = buf[32 + 65];
47+
match packet_type {
48+
0x04 => {
49+
info!("Received NEIGHBORS message from {from}");
50+
}
51+
_ => {
52+
let msg = Message::decode_with_header(&buf[..read]).unwrap();
53+
info!("Received {read} bytes from {from}");
54+
info!("Message: {:?}", msg);
55+
if let Message::Ping(_) = msg {
56+
let ping_hash = H256::from_slice(Message::get_hash(&buf[..read]));
57+
pong(&udp_socket, from, ping_hash, &signer).await;
58+
find_node(&udp_socket, from, &signer).await;
59+
}
60+
}
61+
}
62+
}
4063
}
4164

42-
async fn ping(socket: &UdpSocket, local_addr: SocketAddr, to_addr: SocketAddr) {
65+
async fn ping(
66+
socket: &UdpSocket,
67+
local_addr: SocketAddr,
68+
to_addr: SocketAddr,
69+
signer: &SigningKey,
70+
) {
4371
let mut buf = Vec::new();
4472

4573
let expiration: u64 = (SystemTime::now() + Duration::from_secs(10))
4674
.duration_since(UNIX_EPOCH)
4775
.unwrap()
48-
.as_millis()
49-
.try_into()
50-
.unwrap();
76+
.as_secs();
5177

5278
// TODO: this should send our advertised TCP port
5379
let from = Endpoint {
@@ -61,10 +87,49 @@ async fn ping(socket: &UdpSocket, local_addr: SocketAddr, to_addr: SocketAddr) {
6187
tcp_port: 0,
6288
};
6389

64-
let msg: discv4::Message = discv4::Message::Ping(PingMessage::new(from, to, expiration));
65-
let signer = SigningKey::random(&mut OsRng);
90+
let ping: discv4::Message = discv4::Message::Ping(PingMessage::new(from, to, expiration));
91+
92+
ping.encode_with_header(&mut buf, signer);
93+
socket.send_to(&buf, to_addr).await.unwrap();
94+
}
95+
96+
async fn find_node(socket: &UdpSocket, to_addr: SocketAddr, signer: &SigningKey) {
97+
let public_key = PublicKey::from(signer.verifying_key());
98+
let encoded = public_key.to_encoded_point(false);
99+
let bytes = encoded.as_bytes();
100+
debug_assert_eq!(bytes[0], 4);
101+
102+
let target = H512::from_slice(&bytes[1..]);
103+
104+
let expiration: u64 = (SystemTime::now() + Duration::from_secs(10))
105+
.duration_since(UNIX_EPOCH)
106+
.unwrap()
107+
.as_secs();
66108

109+
let msg: discv4::Message = discv4::Message::FindNode(FindNodeMessage::new(target, expiration));
110+
111+
let mut buf = Vec::new();
67112
msg.encode_with_header(&mut buf, signer);
113+
114+
socket.send_to(&buf, to_addr).await.unwrap();
115+
}
116+
117+
async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer: &SigningKey) {
118+
let mut buf = Vec::new();
119+
120+
let expiration: u64 = (SystemTime::now() + Duration::from_secs(10))
121+
.duration_since(UNIX_EPOCH)
122+
.unwrap()
123+
.as_secs();
124+
125+
let to = Endpoint {
126+
ip: to_addr.ip(),
127+
udp_port: to_addr.port(),
128+
tcp_port: 0,
129+
};
130+
let pong: discv4::Message = discv4::Message::Pong(PongMessage::new(to, ping_hash, expiration));
131+
132+
pong.encode_with_header(&mut buf, signer);
68133
socket.send_to(&buf, to_addr).await.unwrap();
69134
}
70135

crates/net/src/types/bootnode.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use ethereum_rust_core::H512;
22
use std::{net::SocketAddr, num::ParseIntError, str::FromStr};
33

4-
#[derive(Debug, Clone, PartialEq, Eq)]
4+
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
55
pub struct BootNode {
66
pub node_id: H512,
77
pub socket_address: SocketAddr,

ethereum_rust/src/cli.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use clap::{Arg, ArgAction, Command};
2+
use ethereum_rust_net::types::BootNode;
23

34
pub fn cli() -> Command {
45
Command::new("ethereum_rust")
@@ -71,6 +72,7 @@ pub fn cli() -> Command {
7172
Arg::new("bootnodes")
7273
.long("bootnodes")
7374
.value_name("BOOTNODE_LIST")
75+
.value_parser(clap::value_parser!(BootNode))
7476
.value_delimiter(',')
7577
.num_args(1..)
7678
.action(ArgAction::Set),

ethereum_rust/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::{
55
net::{SocketAddr, ToSocketAddrs},
66
};
77
use tokio::try_join;
8-
98
use tracing::{warn, Level};
109
use tracing_subscriber::FmtSubscriber;
1110
mod cli;
@@ -50,8 +49,9 @@ async fn main() {
5049
.get_one::<String>("network")
5150
.expect("network is required");
5251

53-
let bootnodes: Vec<&BootNode> = matches
52+
let bootnodes: Vec<BootNode> = matches
5453
.get_many("bootnodes")
54+
.map(Iterator::copied)
5555
.map(Iterator::collect)
5656
.unwrap_or_default();
5757

@@ -72,7 +72,7 @@ async fn main() {
7272
let _genesis = read_genesis_file(genesis_file_path);
7373

7474
let rpc_api = ethereum_rust_rpc::start_api(http_socket_addr, authrpc_socket_addr);
75-
let networking = ethereum_rust_net::start_network(udp_socket_addr, tcp_socket_addr);
75+
let networking = ethereum_rust_net::start_network(udp_socket_addr, tcp_socket_addr, bootnodes);
7676

7777
try_join!(tokio::spawn(rpc_api), tokio::spawn(networking)).unwrap();
7878
}

0 commit comments

Comments
 (0)