-
Notifications
You must be signed in to change notification settings - Fork 7
/
message_reader.rs
221 lines (197 loc) · 7.41 KB
/
message_reader.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr};
use async_std::io::ReadExt;
use substrate_stellar_sdk::{types::StellarMessage, XdrCodec};
use tokio::sync::{mpsc, mpsc::error::TryRecvError};
/// Polls for messages coming from the Stellar Node and communicates it back to the user
///
/// # Arguments
/// * `connector` - contains the config and necessary info for connecting to Stellar Node
/// * `send_to_user_sender` - sends message from Stellar to the user
/// * `send_to_node_receiver` - receives message from user and writes it to the write half of the
/// stream.
pub(crate) async fn poll_messages_from_stellar(
mut connector: Connector,
send_to_user_sender: mpsc::Sender<StellarMessage>,
mut send_to_node_receiver: mpsc::Receiver<StellarMessage>,
) {
log::info!("poll_messages_from_stellar(): started.");
loop {
if send_to_user_sender.is_closed() {
log::info!("poll_messages_from_stellar(): closing receiver during disconnection");
// close this channel as communication to user was closed.
break
}
// check for messages from user.
match send_to_node_receiver.try_recv() {
Ok(msg) =>
if let Err(e) = connector.send_to_node(msg).await {
log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}");
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {},
}
// check for messages from Stellar Node.
let xdr = match read_message_from_stellar(&mut connector).await {
Err(e) => {
log::error!("poll_messages_from_stellar(): {e:?}");
break
},
Ok(xdr) => xdr,
};
match connector.process_raw_message(xdr).await {
Ok(Some(stellar_msg)) =>
// push message to user
if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await {
log::warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}",
String::from_utf8(stellar_msg.to_base64_xdr())
.unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr()))
);
},
Ok(None) => {},
Err(e) => {
log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}");
break
},
}
}
// make sure to shutdown the connector
connector.stop();
send_to_node_receiver.close();
drop(send_to_user_sender);
log::debug!("poll_messages_from_stellar(): stopped.");
}
/// Returns Xdr format of the `StellarMessage` sent from the Stellar Node
async fn read_message_from_stellar(connector: &mut Connector) -> Result<Xdr, Error> {
// holds the number of bytes that were missing from the previous stellar message.
let mut lack_bytes_from_prev = 0;
let mut readbuf: Vec<u8> = vec![];
let mut buff_for_reading = vec![0; 4];
loop {
// check whether or not we should read the bytes as:
// 1. the length of the next stellar message
// 2. the remaining bytes of the previous stellar message
match connector.tcp_stream.read(&mut buff_for_reading).await {
Ok(0) => continue,
Ok(_) if lack_bytes_from_prev == 0 => {
// if there are no more bytes lacking from the previous message,
// then check the size of next stellar message.
let expect_msg_len = get_xdr_message_length(&buff_for_reading);
// If it's not enough, skip it.
if expect_msg_len == 0 {
// there's nothing to read; wait for the next iteration
log::trace!("read_message_from_stellar(): expect_msg_len == 0");
continue
}
// let's start reading the actual stellar message.
readbuf = vec![0; expect_msg_len];
match read_message(
connector,
&mut lack_bytes_from_prev,
&mut readbuf,
expect_msg_len,
)
.await
{
Ok(None) => continue,
Ok(Some(xdr)) => return Ok(xdr),
Err(e) => {
log::trace!("read_message_from_stellar(): ERROR: {e:?}");
return Err(e)
},
}
},
Ok(size) => {
// The next few bytes was read. Add it to the readbuf.
lack_bytes_from_prev = lack_bytes_from_prev.saturating_sub(size);
readbuf.append(&mut buff_for_reading);
// make sure to cleanup the buffer
buff_for_reading = vec![0; 4];
// let's read the continuation number of bytes from the previous message.
match read_unfinished_message(connector, &mut lack_bytes_from_prev, &mut readbuf)
.await
{
Ok(None) => continue,
Ok(Some(xdr)) => return Ok(xdr),
Err(e) => {
log::trace!("read_message_from_stellar(): ERROR: {e:?}");
return Err(e)
},
}
},
Err(e) => {
log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}");
return Err(Error::ReadFailed(e.to_string()))
},
}
}
}
/// Returns Xdr when all bytes from the stream have successfully been converted; else None.
/// This reads a number of bytes based on the expected message length.
///
/// # Arguments
/// * `connector` - a ref struct that contains the config and necessary info for connecting to
/// Stellar Node
/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message
/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message
/// * `xpect_msg_len` - the expected # of bytes of the Stellar message
async fn read_message(
connector: &mut Connector,
lack_bytes_from_prev: &mut usize,
readbuf: &mut Vec<u8>,
xpect_msg_len: usize,
) -> Result<Option<Xdr>, Error> {
let actual_msg_len = connector
.tcp_stream
.read(readbuf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?;
// only when the message has the exact expected size bytes, should we send to user.
if actual_msg_len == xpect_msg_len {
return Ok(Some(readbuf.clone()))
}
// The next bytes are remnants from the previous stellar message.
// save it and read it on the next loop.
*lack_bytes_from_prev = xpect_msg_len - actual_msg_len;
*readbuf = readbuf[0..actual_msg_len].to_owned();
log::trace!(
"read_message(): received only partial message. Need {lack_bytes_from_prev} bytes to complete."
);
Ok(None)
}
/// Returns Xdr when all bytes from the stream have successfully been converted; else None.
/// Reads a continuation of bytes that belong to the previous message
///
/// # Arguments
/// * `connector` - a ref struct that contains the config and necessary info for connecting to
/// Stellar Node
/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message
/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message
async fn read_unfinished_message(
connector: &mut Connector,
lack_bytes_from_prev: &mut usize,
readbuf: &mut Vec<u8>,
) -> Result<Option<Xdr>, Error> {
// let's read the continuation number of bytes from the previous message.
let mut cont_buf = vec![0; *lack_bytes_from_prev];
let actual_msg_len = connector
.tcp_stream
.read(&mut cont_buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?;
// this partial message completes the previous message.
if actual_msg_len == *lack_bytes_from_prev {
log::trace!("read_unfinished_message(): received continuation from the previous message.");
readbuf.append(&mut cont_buf);
return Ok(Some(readbuf.clone()))
}
// this partial message is not enough to complete the previous message.
if actual_msg_len > 0 {
*lack_bytes_from_prev -= actual_msg_len;
cont_buf = cont_buf[0..actual_msg_len].to_owned();
readbuf.append(&mut cont_buf);
log::trace!(
"read_unfinished_message(): not enough bytes to complete the previous message. Need {lack_bytes_from_prev} bytes to complete."
);
}
Ok(None)
}