-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathnode.rs
190 lines (174 loc) · 5.82 KB
/
node.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
use anyhow::Result;
use async_channel::Sender;
use iroh::{
endpoint::Connection,
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
use n0_future::{
boxed::{BoxFuture, BoxStream},
task, Stream, StreamExt,
};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use tracing::info;
#[derive(Debug, Clone)]
pub struct EchoNode {
router: Router,
accept_events: broadcast::Sender<AcceptEvent>,
}
impl EchoNode {
pub async fn spawn() -> Result<Self> {
let endpoint = iroh::Endpoint::builder()
.discovery_n0()
.alpns(vec![Echo::ALPN.to_vec()])
.bind()
.await?;
let (event_sender, _event_receiver) = broadcast::channel(128);
let echo = Echo::new(event_sender.clone());
let router = Router::builder(endpoint)
.accept(Echo::ALPN, echo)
.spawn()
.await?;
Ok(Self {
router,
accept_events: event_sender,
})
}
pub fn endpoint(&self) -> &Endpoint {
self.router.endpoint()
}
pub fn accept_events(&self) -> BoxStream<AcceptEvent> {
let receiver = self.accept_events.subscribe();
Box::pin(BroadcastStream::new(receiver).filter_map(|event| event.ok()))
}
pub fn connect(
&self,
node_id: NodeId,
payload: String,
) -> impl Stream<Item = ConnectEvent> + Unpin {
let (event_sender, event_receiver) = async_channel::bounded(16);
let endpoint = self.router.endpoint().clone();
task::spawn(async move {
let res = connect(&endpoint, node_id, payload, event_sender.clone()).await;
let error = res.as_ref().err().map(|err| err.to_string());
event_sender.send(ConnectEvent::Closed { error }).await.ok();
});
Box::pin(event_receiver)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ConnectEvent {
Connected,
Sent { bytes_sent: u64 },
Received { bytes_received: u64 },
Closed { error: Option<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum AcceptEvent {
Accepted {
node_id: NodeId,
},
Echoed {
node_id: NodeId,
bytes_sent: u64,
},
Closed {
node_id: NodeId,
error: Option<String>,
},
}
#[derive(Debug, Clone)]
pub struct Echo {
event_sender: broadcast::Sender<AcceptEvent>,
}
impl Echo {
pub const ALPN: &[u8] = b"iroh/example-browser-echo/0";
pub fn new(event_sender: broadcast::Sender<AcceptEvent>) -> Self {
Self { event_sender }
}
}
impl Echo {
async fn handle_connection(self, connection: Connection) -> Result<()> {
// Wait for the connection to be fully established.
let node_id = connection.remote_node_id()?;
self.event_sender
.send(AcceptEvent::Accepted { node_id })
.ok();
let res = self.handle_connection_0(&connection).await;
let error = res.as_ref().err().map(|err| err.to_string());
self.event_sender
.send(AcceptEvent::Closed { node_id, error })
.ok();
res
}
async fn handle_connection_0(&self, connection: &Connection) -> Result<()> {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
info!("Accepted connection from {node_id}");
// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;
// Echo any bytes received back directly.
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
info!("Copied over {bytes_sent} byte(s)");
self.event_sender
.send(AcceptEvent::Echoed {
node_id,
bytes_sent,
})
.ok();
// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;
// Wait until the remote closes the connection, which it does once it
// received the response.
connection.closed().await;
Ok(())
}
}
impl ProtocolHandler for Echo {
/// The `accept` method is called for each incoming connection for our ALPN.
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
Box::pin(self.clone().handle_connection(connection))
}
}
async fn connect(
endpoint: &Endpoint,
node_id: NodeId,
payload: String,
event_sender: Sender<ConnectEvent>,
) -> Result<()> {
let connection = endpoint.connect(node_id, Echo::ALPN).await?;
event_sender.send(ConnectEvent::Connected).await?;
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
let send_task = task::spawn({
let event_sender = event_sender.clone();
async move {
let bytes_sent = payload.len();
send_stream.write_all(payload.as_bytes()).await?;
event_sender
.send(ConnectEvent::Sent {
bytes_sent: bytes_sent as u64,
})
.await?;
anyhow::Ok(())
}
});
let n = tokio::io::copy(&mut recv_stream, &mut tokio::io::sink()).await?;
// We know we received the last data, so we close the connection.
connection.close(1u8.into(), b"done");
event_sender
.send(ConnectEvent::Received {
bytes_received: n as u64,
})
.await?;
send_task.await??;
Ok(())
}