Skip to content

Commit fbe581c

Browse files
committed
unify get_value_from_channel/expect_recv
1 parent 7c0cc55 commit fbe581c

File tree

7 files changed

+61
-47
lines changed

7 files changed

+61
-47
lines changed

dns_server/src/crawler_p2p/crawler/tests/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,7 @@ fn dont_connect_to_initially_banned_peer(#[case] seed: Seed) {
570570
crawler.assert_banned_addresses(&[(node1.as_bannable(), ban_end_time)]);
571571
}
572572

573+
// Check that a peer is banned on CrawlerEvent::ConnectionError.
573574
#[rstest]
574575
#[trace]
575576
#[case(Seed::from_entropy())]

dns_server/src/crawler_p2p/crawler_manager/tests/mod.rs

+19-19
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use p2p::{
2222
config::{BanDuration, BanThreshold},
2323
types::socket_address::SocketAddress,
2424
};
25-
use p2p_test_utils::{assert_no_value_in_channel, get_value_from_channel};
25+
use p2p_test_utils::{expect_no_recv, expect_recv};
2626

2727
use crate::{
2828
crawler_p2p::crawler_manager::tests::mock_manager::{
@@ -41,15 +41,15 @@ async fn basic() {
4141
state.node_online(node1);
4242
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 60).await;
4343
assert_eq!(
44-
get_value_from_channel(&mut command_rx).await.unwrap(),
44+
expect_recv!(command_rx),
4545
DnsServerCommand::AddAddress(node1.socket_addr().ip())
4646
);
4747

4848
// Node goes offline, DNS record removed
4949
state.node_offline(node1);
5050
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 60).await;
5151
assert_eq!(
52-
get_value_from_channel(&mut command_rx).await.unwrap(),
52+
expect_recv!(command_rx),
5353
DnsServerCommand::DelAddress(node1.socket_addr().ip())
5454
);
5555
}
@@ -72,7 +72,7 @@ async fn long_offline() {
7272
state.node_online(node1);
7373
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 24 * 60).await;
7474
assert_eq!(
75-
get_value_from_channel(&mut command_rx).await.unwrap(),
75+
expect_recv!(command_rx),
7676
DnsServerCommand::AddAddress(node1.socket_addr().ip())
7777
);
7878
}
@@ -90,21 +90,21 @@ async fn announced_online() {
9090

9191
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 60).await;
9292
assert_eq!(
93-
get_value_from_channel(&mut command_rx).await.unwrap(),
93+
expect_recv!(command_rx),
9494
DnsServerCommand::AddAddress(node1.socket_addr().ip())
9595
);
9696

9797
state.announce_address(node1, node2);
9898
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 60).await;
9999
assert_eq!(
100-
get_value_from_channel(&mut command_rx).await.unwrap(),
100+
expect_recv!(command_rx),
101101
DnsServerCommand::AddAddress(node2.socket_addr().ip())
102102
);
103103

104104
state.announce_address(node2, node3);
105105
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 60).await;
106106
assert_eq!(
107-
get_value_from_channel(&mut command_rx).await.unwrap(),
107+
expect_recv!(command_rx),
108108
DnsServerCommand::AddAddress(node3.socket_addr().ip())
109109
);
110110

@@ -121,7 +121,7 @@ async fn announced_offline() {
121121

122122
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 60).await;
123123
assert_eq!(
124-
get_value_from_channel(&mut command_rx).await.unwrap(),
124+
expect_recv!(command_rx),
125125
DnsServerCommand::AddAddress(node1.socket_addr().ip())
126126
);
127127
assert_eq!(state.connection_attempts.lock().unwrap().len(), 1);
@@ -136,7 +136,7 @@ async fn announced_offline() {
136136
state.announce_address(node1, node2);
137137
advance_time(&mut crawler, &time_getter, Duration::from_secs(60), 24 * 60).await;
138138
assert_eq!(
139-
get_value_from_channel(&mut command_rx).await.unwrap(),
139+
expect_recv!(command_rx),
140140
DnsServerCommand::AddAddress(node2.socket_addr().ip())
141141
);
142142
assert_eq!(state.connection_attempts.lock().unwrap().len(), 3);
@@ -164,14 +164,14 @@ async fn private_ip() {
164164

165165
// Check that only nodes with public addresses and on the default port are added to DNS
166166
assert_eq!(
167-
get_value_from_channel(&mut command_rx).await.unwrap(),
167+
expect_recv!(command_rx),
168168
DnsServerCommand::AddAddress(node1.socket_addr().ip())
169169
);
170170
assert_eq!(
171-
get_value_from_channel(&mut command_rx).await.unwrap(),
171+
expect_recv!(command_rx),
172172
DnsServerCommand::AddAddress(node2.socket_addr().ip())
173173
);
174-
assert_no_value_in_channel(&mut command_rx).await;
174+
expect_no_recv!(command_rx);
175175

176176
// Check that all reachable nodes are stored in the DB
177177
assert_known_addresses(&crawler, &[node1, node2, node3, node4, node5, node6]);
@@ -202,14 +202,14 @@ async fn ban_unban() {
202202

203203
// Only normal nodes are added to DNS
204204
assert_eq!(
205-
get_value_from_channel(&mut command_rx).await.unwrap(),
205+
expect_recv!(command_rx),
206206
DnsServerCommand::AddAddress(node1.socket_addr().ip())
207207
);
208208
assert_eq!(
209-
get_value_from_channel(&mut command_rx).await.unwrap(),
209+
expect_recv!(command_rx),
210210
DnsServerCommand::AddAddress(node3.socket_addr().ip())
211211
);
212-
assert_no_value_in_channel(&mut command_rx).await;
212+
expect_no_recv!(command_rx);
213213

214214
// node2 is banned
215215
assert_banned_addresses(&crawler, &[(node2.as_bannable(), node2_ban_end_time)]);
@@ -226,7 +226,7 @@ async fn ban_unban() {
226226

227227
// Check that it's been removed from DNS.
228228
assert_eq!(
229-
get_value_from_channel(&mut command_rx).await.unwrap(),
229+
expect_recv!(command_rx),
230230
DnsServerCommand::DelAddress(node1.socket_addr().ip())
231231
);
232232

@@ -252,7 +252,7 @@ async fn ban_unban() {
252252
(node2.as_bannable(), node2_ban_end_time),
253253
],
254254
);
255-
assert_no_value_in_channel(&mut command_rx).await;
255+
expect_no_recv!(command_rx);
256256

257257
// Wait enough time for node2 to be unbanned.
258258
let time_until_node2_unban =
@@ -262,7 +262,7 @@ async fn ban_unban() {
262262
// node2 is no longer banned; its address has been added to DNS.
263263
assert_banned_addresses(&crawler, &[(node1.as_bannable(), node1_ban_end_time)]);
264264
assert_eq!(
265-
get_value_from_channel(&mut command_rx).await.unwrap(),
265+
expect_recv!(command_rx),
266266
DnsServerCommand::AddAddress(node2.socket_addr().ip())
267267
);
268268

@@ -274,7 +274,7 @@ async fn ban_unban() {
274274
// node1 is no longer banned; its address has been added to DNS.
275275
assert_banned_addresses(&crawler, &[]);
276276
assert_eq!(
277-
get_value_from_channel(&mut command_rx).await.unwrap(),
277+
expect_recv!(command_rx),
278278
DnsServerCommand::AddAddress(node1.socket_addr().ip())
279279
);
280280
}

p2p/p2p-test-utils/src/lib.rs

+35-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
#![allow(clippy::unwrap_used)]
1717

18-
use std::{fmt::Debug, sync::Arc, time::Duration};
18+
use std::{sync::Arc, time::Duration};
1919

2020
use chainstate::{
2121
chainstate_interface::ChainstateInterface, make_chainstate, ChainstateConfig,
@@ -30,7 +30,6 @@ use common::{
3030
use mempool::{MempoolHandle, MempoolSubsystemInterface};
3131
use subsystem::manager::{ManagerJoinHandle, ShutdownTrigger};
3232
use test_utils::mock_time_getter::mocked_time_getter_milliseconds;
33-
use tokio::{sync::mpsc::UnboundedReceiver, time};
3433
use utils::atomics::SeqCstAtomicU64;
3534

3635
pub fn start_subsystems(
@@ -122,12 +121,40 @@ pub const LONG_TIMEOUT: Duration = Duration::from_secs(60);
122121
/// A short timeout for events that shouldn't occur.
123122
pub const SHORT_TIMEOUT: Duration = Duration::from_millis(500);
124123

125-
pub async fn get_value_from_channel<T>(rx: &mut UnboundedReceiver<T>) -> Option<T> {
126-
time::timeout(LONG_TIMEOUT, rx.recv())
127-
.await
128-
.expect("Failed to receive value in time")
124+
/// Await for the specified future for some reasonably big amount of time; panic if the timeout
125+
/// is reached.
126+
// Note: this is implemented as a macro until #[track_caller] works correctly with async functions
127+
// (needed to print the caller location if 'unwrap' fails). Same for the other macros below.
128+
#[macro_export]
129+
macro_rules! expect_future_val {
130+
($fut:expr) => {
131+
tokio::time::timeout($crate::LONG_TIMEOUT, $fut)
132+
.await
133+
.expect("Failed to receive value in time")
134+
};
129135
}
130136

131-
pub async fn assert_no_value_in_channel<T: Debug>(rx: &mut UnboundedReceiver<T>) {
132-
time::timeout(SHORT_TIMEOUT, rx.recv()).await.unwrap_err();
137+
/// Await for the specified future for a short time, expecting a timeout.
138+
#[macro_export]
139+
macro_rules! expect_no_future_val {
140+
($fut:expr) => {
141+
tokio::time::timeout($crate::SHORT_TIMEOUT, $fut).await.unwrap_err();
142+
};
143+
}
144+
145+
/// Try receiving a message from the tokio channel; panic if the channel is closed or the timeout
146+
/// is reached.
147+
#[macro_export]
148+
macro_rules! expect_recv {
149+
($rx:expr) => {
150+
$crate::expect_future_val!($rx.recv()).unwrap()
151+
};
152+
}
153+
154+
/// Try receiving a message from the tokio channel; expect that a timeout is reached.
155+
#[macro_export]
156+
macro_rules! expect_no_recv {
157+
($rx:expr) => {
158+
$crate::expect_no_future_val!($rx.recv())
159+
};
133160
}

p2p/src/peer_manager/tests/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod utils;
2222

2323
use std::{sync::Arc, time::Duration};
2424

25+
use p2p_test_utils::expect_recv;
2526
use tokio::sync::{mpsc, oneshot};
2627

2728
use ::utils::atomics::SeqCstAtomicBool;
@@ -35,7 +36,6 @@ use tokio::{
3536
};
3637

3738
use crate::{
38-
expect_recv,
3939
interface::types::ConnectedPeer,
4040
message::{PeerManagerMessage, PingRequest, PingResponse},
4141
net::{

p2p/src/peer_manager/tests/ping.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
use std::{sync::Arc, time::Duration};
1717

1818
use common::{chain::config, primitives::user_agent::mintlayer_core_user_agent};
19-
use p2p_test_utils::P2pBasicTestTimeGetter;
19+
use p2p_test_utils::{expect_recv, P2pBasicTestTimeGetter};
2020
use test_utils::{assert_matches, assert_matches_return_val};
2121

2222
use crate::{
2323
config::{NodeType, P2pConfig},
24-
expect_recv,
2524
message::{PeerManagerMessage, PingRequest, PingResponse},
2625
net::{
2726
default_backend::{

p2p/src/sync/tests/helpers/mod.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::{collections::BTreeMap, panic, sync::Arc};
1717

1818
use async_trait::async_trait;
1919
use crypto::random::Rng;
20-
use p2p_test_utils::{assert_no_value_in_channel, get_value_from_channel, SHORT_TIMEOUT};
20+
use p2p_test_utils::{expect_future_val, expect_no_recv, expect_recv, SHORT_TIMEOUT};
2121
use p2p_types::socket_address::SocketAddress;
2222
use test_utils::random::Seed;
2323
use tokio::{
@@ -212,7 +212,7 @@ impl TestNode {
212212

213213
/// Receives a message from the sync manager.
214214
pub async fn message(&mut self) -> (PeerId, SyncMessage) {
215-
get_value_from_channel(&mut self.sync_msg_receiver).await.unwrap()
215+
expect_recv!(&mut self.sync_msg_receiver)
216216
}
217217

218218
/// Try to receive a message from the sync manager.
@@ -231,7 +231,7 @@ impl TestNode {
231231

232232
/// Panics if the sync manager returns an error.
233233
pub async fn assert_no_error(&mut self) {
234-
assert_no_value_in_channel(&mut self.error_receiver).await;
234+
expect_no_recv!(self.error_receiver);
235235
}
236236

237237
/// Receives the `AdjustPeerScore` event from the peer manager.
@@ -500,9 +500,7 @@ struct SyncingEventReceiverMock {
500500
#[async_trait]
501501
impl SyncingEventReceiver for SyncingEventReceiverMock {
502502
async fn poll_next(&mut self) -> Result<SyncingEvent> {
503-
get_value_from_channel(&mut self.events_receiver)
504-
.await
505-
.ok_or(P2pError::ChannelClosed)
503+
expect_future_val!(self.events_receiver.recv()).ok_or(P2pError::ChannelClosed)
506504
}
507505
}
508506

p2p/src/testing_utils.rs

-11
Original file line numberDiff line numberDiff line change
@@ -225,17 +225,6 @@ pub fn peerdb_inmemory_store() -> PeerDbStorageImpl<storage::inmemory::InMemory>
225225
PeerDbStorageImpl::new(storage).unwrap()
226226
}
227227

228-
/// Receive a message from the tokio channel.
229-
/// Panics if the channel is closed or no message received in 10 seconds.
230-
#[macro_export]
231-
macro_rules! expect_recv {
232-
// Implemented as a macro until #[track_caller] works correctly with async functions
233-
// (needed to print the caller location if unwraps fail)
234-
($x:expr) => {
235-
tokio::time::timeout(Duration::from_secs(10), $x.recv()).await.unwrap().unwrap()
236-
};
237-
}
238-
239228
pub fn test_p2p_config() -> P2pConfig {
240229
P2pConfig {
241230
bind_addresses: Default::default(),

0 commit comments

Comments
 (0)