Skip to content
This repository was archived by the owner on Sep 5, 2024. It is now read-only.

Commit 06e709b

Browse files
starting to work on converting integration test (Connect) with new sync paradigm
1 parent 3541b6a commit 06e709b

File tree

4 files changed

+78
-17
lines changed

4 files changed

+78
-17
lines changed

src/error.rs

+6
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,9 @@ impl PartialEq for Error {
6161
/// Wraps `std::result::Result` around our `Error` enum
6262
pub type Result<T> = std::result::Result<T, Error>;
6363

64+
impl From<Error> for String {
65+
fn from(err: Error) -> Self {
66+
format!("{:?}", err)
67+
}
68+
}
69+

src/message.rs

+10
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ pub enum MessageWrapped {
163163
Init(PeerPubkey),
164164
}
165165

166+
impl MessageWrapped {
167+
/// Grab this wrapped message's sender pubkey
168+
pub fn pubkey_sender(&self) -> &PeerPubkey {
169+
match self {
170+
Self::Sealed(sealed) => sealed.pubkey_sender(),
171+
Self::Init(pubkey) => pubkey,
172+
}
173+
}
174+
}
175+
166176
#[cfg(test)]
167177
mod tests {
168178
use super::*;

tests/common/mod.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use futures::{
1111
use serde::{Serialize, de::DeserializeOwned};
1212
use std::{
1313
collections::HashMap,
14+
fmt::Debug,
1415
ops::Deref,
1516
};
1617
use tp2p::{
17-
ConnectionInfo, Sync,
1818
action::Action,
19+
sync::{ConnectionInfo, Sync},
1920
};
2021

2122
pub fn serialize<T: Serialize>(obj: &T) -> Result<Vec<u8>, String> {
@@ -30,10 +31,10 @@ pub fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, String> {
3031
Ok(obj)
3132
}
3233

33-
pub async fn action_dispatcher<U, T, S>(sync: &mut Sync, tx: &Sender<PeerEvent>, action: Action<U, T, S>) -> Result<(), String>
34-
where U: Clone + Debug + serde::Serialize + serde::de::DeserializeOwned,
35-
T: Clone + Debug + serde::Serialize + serde::de::DeserializeOwned,
36-
S: Clone + Debug + serde::Serialize + serde::de::DeserializeOwned,
34+
pub async fn action_dispatcher<U, T, S>(sync: &mut Sync<T>, tx: &Sender<PeerEvent>, action: Action<U, T, S>) -> Result<(), String>
35+
where U: Debug,
36+
T: Debug + Eq + std::hash::Hash,
37+
S: Debug,
3738
{
3839
if let Action::Sync(sync_action) = action {
3940
sync.apply_sync_actions(vec![sync_action]);

tests/connect.rs

+56-12
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,83 @@
1-
/*
21
mod common;
32

43
use async_std::{
54
channel,
65
task,
76
};
7+
use chrono::{DateTime, Utc};
88
use common::{
99
action_dispatcher,
1010
deserialize,
1111
PeerEvent,
1212
PeerManager,
1313
};
1414
use tp2p::{
15-
Sync,
15+
event::Event,
1616
message::MessageWrapped,
1717
peer::{Keypair, PeeringConfig, ConfirmationMode, SubscriptionConfig},
18+
sync::Sync,
1819
};
1920

21+
fn now() -> DateTime<Utc> {
22+
Utc::now()
23+
}
24+
2025
macro_rules! main_loop {
21-
($sync:expr, $peer:expr, $tx:expr, $rx:expr, $name:expr) => {
26+
($sync:expr, $peer:expr, $tx:expr, $rx:expr, $name:expr, $U:ty, $T:ty, $S:ty) => {
2227
loop {
2328
match $rx.try_recv() {
2429
Ok(PeerEvent::Connect(from)) => {
2530
println!(concat!($name, ": new connection! {:?}"), from);
2631
}
2732
Ok(PeerEvent::Recv(from, message_bytes)) => {
2833
let msg: MessageWrapped = deserialize(message_bytes.as_ref())?;
29-
//println!(concat!($name, ": recv: {:?} -- {:?}"), from, msg); // DEBUG: remove
30-
let actions = $sync.process_incoming_message(&msg, from)
31-
.map_err(|err| format!("process: {:?}", err))?;
34+
let actions = match &msg {
35+
MessageWrapped::Init(pubkey) => {
36+
$sync.process_init_message(&msg, &from, &now())?
37+
}
38+
MessageWrapped::Sealed(sealed) => {
39+
let message_opened = $sync.unwrap_incoming_message(sealed)?;
40+
match message_opened.body() {
41+
Event::Hello => {
42+
$sync.process_event_hello(&message_opened, sealed.pubkey_sender(), &from, &now())?
43+
}
44+
Event::PeerInit { .. } => {
45+
$sync.process_event_peer_init(&message_opened, sealed.pubkey_sender(), &from, &now())?
46+
}
47+
Event::PeerConfirm { .. } => {
48+
$sync.process_event_peer_confirm(&message_opened, sealed.pubkey_sender(), &now())?
49+
}
50+
Event::Ping => {
51+
$sync.process_event_ping(&message_opened, sealed.pubkey_sender(), &now())?
52+
}
53+
Event::Pong => {
54+
$sync.process_event_pong(&message_opened, sealed.pubkey_sender(), &now())?
55+
}
56+
Event::QueryMessagesByID { ids } => {
57+
let messages = {
58+
drop(ids);
59+
vec![]
60+
};
61+
$sync.process_event_query_messages_by_id(&message_opened, sealed.pubkey_sender(), &messages)?
62+
}
63+
Event::QueryMessagesByDepth { topic, depth } => {
64+
let messages = {
65+
drop(topic);
66+
drop(depth);
67+
vec![]
68+
};
69+
$sync.process_event_query_messages_by_depth(&message_opened, sealed.pubkey_sender(), &messages)?
70+
}
71+
Event::Subscribe(..) => {
72+
}
73+
_ => panic!("oi"),
74+
}
75+
}
76+
};
77+
3278
for action in actions {
3379
println!(concat!($name, ": action -- {:?}"), action);
34-
action_dispatcher(&mut $sync, &$tx, action).await?;
80+
action_dispatcher::<$U, $T, $S>(&mut $sync, &$tx, action).await?;
3581
}
3682
}
3783
Err(channel::TryRecvError::Closed) => {
@@ -46,7 +92,6 @@ macro_rules! main_loop {
4692

4793
#[async_std::test]
4894
async fn peer_connect() -> Result<(), String> {
49-
env_logger::init();
5095
let peer1_task = task::spawn(async move {
5196
let keypair = Keypair::new_random();
5297
let peering_config = PeeringConfig::new(
@@ -61,7 +106,7 @@ async fn peer_connect() -> Result<(), String> {
61106
let peer_task = task::spawn(async move {
62107
peer.start("127.0.0.1", 50020).await.expect("error running peer");
63108
});
64-
main_loop! { sync , peer, tx, rx, "peer1" }
109+
main_loop! { sync , peer, tx, rx, "peer1", (), (), () }
65110

66111
peer_task.await;
67112
let res: Result<(), String> = Ok(());
@@ -84,12 +129,12 @@ async fn peer_connect() -> Result<(), String> {
84129
peer.start("127.0.0.1", 50021).await.expect("error running peer");
85130
});
86131

87-
let actions = sync.init_comm("127.0.0.1:50020").expect("peer_init failed");
132+
let actions = sync.init_comm::<(), ()>("127.0.0.1:50020", &now()).expect("peer_init failed");
88133
for action in actions {
89134
println!("peer2: action -- {:?}", action);
90135
action_dispatcher(&mut sync, &tx, action).await?;
91136
}
92-
main_loop! { sync , peer, tx, rx, "peer2" }
137+
main_loop! { sync , peer, tx, rx, "peer2", (), (), () }
93138

94139
peer_task.await;
95140
let res: Result<(), String> = Ok(());
@@ -101,5 +146,4 @@ async fn peer_connect() -> Result<(), String> {
101146

102147
Ok(())
103148
}
104-
*/
105149

0 commit comments

Comments
 (0)