Skip to content

Commit cf33e0e

Browse files
authored
Merge pull request #1309 from mintlayer/attempt_to_fix_spuriously_failing_test
Attempt to fix spuriously failing test "dont_make_announcements_while_blocks_are_being_sent". Also, PR #1316 was merged into this branch.
2 parents 3d3b1d9 + 73ba596 commit cf33e0e

File tree

13 files changed

+440
-116
lines changed

13 files changed

+440
-116
lines changed

p2p/backend-test-suite/src/ban.rs

+2
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,14 @@ where
133133
};
134134
messaging_handle_2
135135
.send_message(peer, SyncMessage::HeaderList(HeaderList::new(Vec::new())))
136+
.await
136137
.unwrap();
137138
messaging_handle_2
138139
.send_message(
139140
peer,
140141
SyncMessage::HeaderList(HeaderList::new(vec![block.header().clone()])),
141142
)
143+
.await
142144
.unwrap();
143145
});
144146

p2p/backend-test-suite/src/block_announcement.rs

+3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ where
9494
peer2.peer_id,
9595
SyncMessage::HeaderList(HeaderList::new(vec![block.header().clone()])),
9696
)
97+
.await
9798
.unwrap();
9899

99100
let mut sync_msg_rx_2 = match sync2.poll_next().await.unwrap() {
@@ -130,6 +131,7 @@ where
130131
peer1.peer_id,
131132
SyncMessage::HeaderList(HeaderList::new(vec![block.header().clone()])),
132133
)
134+
.await
133135
.unwrap();
134136

135137
let mut sync_msg_rx_1 = match sync1.poll_next().await.unwrap() {
@@ -236,6 +238,7 @@ where
236238
peer2.peer_id,
237239
SyncMessage::HeaderList(HeaderList::new(vec![block.header().clone()])),
238240
)
241+
.await
239242
.unwrap();
240243

241244
shutdown.store(true);

p2p/src/error.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,15 @@ pub enum ProtocolError {
4949
DuplicatedBlockRequest(Id<Block>),
5050
#[error("Headers aren't connected")]
5151
DisconnectedHeaders,
52-
#[error("Received a message ({0}) that wasn't expected")]
52+
#[error("Peer sent a message ({0}) that wasn't expected")]
5353
UnexpectedMessage(String),
54+
#[error("Peer sent a block ({0}) that wasn't requested")]
55+
UnsolicitedBlockReceived(Id<Block>),
56+
#[error("Peer sent block {expected_block_id} while it was expected to send {actual_block_id}")]
57+
BlocksReceivedInWrongOrder {
58+
expected_block_id: Id<Block>,
59+
actual_block_id: Id<Block>,
60+
},
5461
#[error("Empty block list requested")]
5562
ZeroBlocksInRequest,
5663
#[error("Handshake expected")]
@@ -227,6 +234,11 @@ impl BanScore for ProtocolError {
227234
ProtocolError::DuplicatedBlockRequest(_) => 20,
228235
ProtocolError::DisconnectedHeaders => 20,
229236
ProtocolError::UnexpectedMessage(_) => 20,
237+
ProtocolError::UnsolicitedBlockReceived(_) => 20,
238+
ProtocolError::BlocksReceivedInWrongOrder {
239+
expected_block_id: _,
240+
actual_block_id: _,
241+
} => 20,
230242
ProtocolError::ZeroBlocksInRequest => 20,
231243
ProtocolError::HandshakeExpected => 100,
232244
ProtocolError::AddressListLimitExceeded => 100,

p2p/src/net/default_backend/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,10 @@ where
142142
}
143143
}
144144

145+
#[async_trait]
145146
impl MessagingService for MessagingHandle {
146-
fn send_message(&mut self, peer_id: PeerId, message: SyncMessage) -> crate::Result<()> {
147+
async fn send_message(&mut self, peer_id: PeerId, message: SyncMessage) -> crate::Result<()> {
148+
// Note: send_message was made async to be able to use a bounded channel in tests.
147149
Ok(self.command_sender.send(types::Command::SendMessage {
148150
peer_id,
149151
message: message.into(),

p2p/src/net/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ where
122122
}
123123

124124
/// An interface for sending messages and announcements to peers.
125+
#[async_trait]
125126
pub trait MessagingService: Clone {
126127
/// Sends a message to the peer.
127-
fn send_message(&mut self, peer: PeerId, message: SyncMessage) -> crate::Result<()>;
128+
async fn send_message(&mut self, peer: PeerId, message: SyncMessage) -> crate::Result<()>;
128129
}
129130

130131
#[async_trait]

p2p/src/sync/peer_v1.rs

+60-36
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ struct IncomingDataState {
103103
/// requested the blocks for.
104104
pending_headers: Vec<SignedBlockHeader>,
105105
/// A list of blocks that we requested from this peer.
106-
requested_blocks: BTreeSet<Id<Block>>,
106+
requested_blocks: VecDeque<Id<Block>>,
107107
/// The id of the best block header that we've received from the peer and that we also have.
108108
/// This includes headers received by any means, e.g. via HeaderList messages, as part
109109
/// of a locator during peer's header requests, via block responses.
@@ -158,7 +158,7 @@ where
158158
time_getter,
159159
incoming: IncomingDataState {
160160
pending_headers: Vec::new(),
161-
requested_blocks: BTreeSet::new(),
161+
requested_blocks: VecDeque::new(),
162162
peers_best_block_that_we_have: None,
163163
singular_unconnected_headers_count: 0,
164164
},
@@ -225,15 +225,15 @@ where
225225
}
226226
}
227227

228-
fn send_message(&mut self, message: SyncMessage) -> Result<()> {
229-
self.messaging_handle.send_message(self.id(), message)
228+
async fn send_message(&mut self, message: SyncMessage) -> Result<()> {
229+
self.messaging_handle.send_message(self.id(), message).await
230230
}
231231

232-
fn send_headers(&mut self, headers: HeaderList) -> Result<()> {
232+
async fn send_headers(&mut self, headers: HeaderList) -> Result<()> {
233233
if let Some(last_header) = headers.headers().last() {
234234
self.outgoing.best_sent_block_header = Some(last_header.block_id().into());
235235
}
236-
self.send_message(SyncMessage::HeaderList(headers))
236+
self.send_message(SyncMessage::HeaderList(headers)).await
237237
}
238238

239239
async fn handle_new_tip(&mut self, new_tip_id: &Id<Block>) -> Result<()> {
@@ -290,23 +290,17 @@ where
290290
.await?;
291291

292292
if headers.is_empty() {
293-
log::warn!(
294-
concat!(
295-
"[peer id = {}] Got new tip event with block id {}, ",
296-
"but there is nothing to send"
297-
),
293+
log::debug!(
294+
"[peer id = {}] Got new tip event with block id {}, but there is nothing to send",
298295
self.id(),
299296
new_tip_id,
300297
);
301298
} else if best_block_id != new_tip_id {
302299
// If we got here, another "new tip" event should be generated soon,
303300
// so we may ignore this one (and it makes sense to ignore it to avoid sending
304301
// the same header list multiple times).
305-
log::warn!(
306-
concat!(
307-
"[peer id = {}] Got new tip event with block id {}, ",
308-
"but the tip has changed since then to {}"
309-
),
302+
log::info!(
303+
"[peer id = {}] Got new tip event with block id {}, but the tip has changed since then to {}",
310304
self.id(),
311305
new_tip_id,
312306
best_block_id
@@ -317,7 +311,7 @@ where
317311
self.id(),
318312
headers.len()
319313
);
320-
return self.send_headers(HeaderList::new(headers));
314+
return self.send_headers(HeaderList::new(headers)).await;
321315
}
322316
} else {
323317
// Note: if we got here, then we haven't received a single header request or
@@ -346,7 +340,7 @@ where
346340
&& self.common_services.has_service(Service::Transactions)
347341
{
348342
self.add_known_transaction(txid);
349-
self.send_message(SyncMessage::NewTransaction(txid))
343+
self.send_message(SyncMessage::NewTransaction(txid)).await
350344
} else {
351345
Ok(())
352346
}
@@ -357,9 +351,6 @@ where
357351
async fn request_headers(&mut self) -> Result<()> {
358352
let locator = self.chainstate_handle.call(|this| Ok(this.get_locator()?)).await?;
359353
if locator.len() > *self.p2p_config.protocol_config.msg_max_locator_count {
360-
// Note: msg_max_locator_count is not supposed to be configurable outside of tests,
361-
// so we should never get here in production code. Moreover, currently it's not
362-
// modified even in tests. TODO: make it a constant.
363354
log::warn!(
364355
"[peer id = {}] Sending locator of the length {}, which exceeds the maximum length {:?}",
365356
self.id(),
@@ -371,7 +362,8 @@ where
371362
log::debug!("[peer id = {}] Sending header list request", self.id());
372363
self.send_message(SyncMessage::HeaderListRequest(HeaderListRequest::new(
373364
locator,
374-
)))?;
365+
)))
366+
.await?;
375367

376368
self.peer_activity
377369
.set_expecting_headers_since(Some(self.time_getter.get_time()));
@@ -445,7 +437,7 @@ where
445437
// all headers that were available at the moment.
446438
self.have_sent_all_headers = headers.len() < header_count_limit;
447439

448-
self.send_headers(HeaderList::new(headers))
440+
self.send_headers(HeaderList::new(headers)).await
449441
}
450442

451443
/// Processes the blocks request.
@@ -719,7 +711,7 @@ where
719711
.await?;
720712
}
721713

722-
self.request_blocks(new_block_headers)
714+
self.request_blocks(new_block_headers).await
723715
}
724716

725717
async fn handle_block_response(&mut self, block: Block) -> Result<()> {
@@ -734,10 +726,28 @@ where
734726
// The code below will set it again if needed.
735727
self.peer_activity.set_expecting_blocks_since(None);
736728

737-
if self.incoming.requested_blocks.take(&block.get_id()).is_none() {
738-
return Err(P2pError::ProtocolError(ProtocolError::UnexpectedMessage(
739-
"block response".to_owned(),
740-
)));
729+
if self.incoming.requested_blocks.front().is_some_and(|id| id == &block.get_id()) {
730+
self.incoming.requested_blocks.pop_front();
731+
} else {
732+
let idx = self.incoming.requested_blocks.iter().position(|id| id == &block.get_id());
733+
// Note: we treat wrongly ordered blocks in the same way as unsolicited ones, i.e.
734+
// we don't remove their ids from the list.
735+
if idx.is_some() {
736+
return Err(P2pError::ProtocolError(
737+
ProtocolError::BlocksReceivedInWrongOrder {
738+
expected_block_id: *self
739+
.incoming
740+
.requested_blocks
741+
.front()
742+
.expect("The deque is known to be non-empty"),
743+
actual_block_id: block.get_id(),
744+
},
745+
));
746+
} else {
747+
return Err(P2pError::ProtocolError(
748+
ProtocolError::UnsolicitedBlockReceived(block.get_id()),
749+
));
750+
}
741751
}
742752

743753
let block = self.chainstate_handle.call(|c| Ok(c.preliminary_block_check(block)?)).await?;
@@ -798,7 +808,7 @@ where
798808
self.request_headers().await?;
799809
} else {
800810
// Download remaining blocks.
801-
self.request_blocks(headers)?;
811+
self.request_blocks(headers).await?;
802812
}
803813
} else {
804814
// We expect additional blocks from the peer, update the timestamp.
@@ -821,7 +831,7 @@ where
821831
None => TransactionResponse::NotFound(id),
822832
};
823833

824-
self.send_message(SyncMessage::TransactionResponse(res))?;
834+
self.send_message(SyncMessage::TransactionResponse(res)).await?;
825835

826836
Ok(())
827837
}
@@ -905,7 +915,7 @@ where
905915
}
906916

907917
if !(self.mempool_handle.call(move |m| m.contains_transaction(&tx)).await?) {
908-
self.send_message(SyncMessage::TransactionRequest(tx))?;
918+
self.send_message(SyncMessage::TransactionRequest(tx)).await?;
909919
assert!(self.announced_transactions.insert(tx));
910920
}
911921

@@ -916,7 +926,7 @@ where
916926
///
917927
/// The number of blocks requested equals `P2pConfig::requested_blocks_limit`, the remaining
918928
/// headers are stored in the peer context.
919-
fn request_blocks(&mut self, mut headers: Vec<SignedBlockHeader>) -> Result<()> {
929+
async fn request_blocks(&mut self, mut headers: Vec<SignedBlockHeader>) -> Result<()> {
920930
debug_assert!(self.incoming.pending_headers.is_empty());
921931
debug_assert!(self.incoming.requested_blocks.is_empty());
922932
debug_assert!(!headers.is_empty());
@@ -936,7 +946,8 @@ where
936946
);
937947
self.send_message(SyncMessage::BlockListRequest(BlockListRequest::new(
938948
block_ids.clone(),
939-
)))?;
949+
)))
950+
.await?;
940951
self.incoming.requested_blocks.extend(block_ids);
941952

942953
self.peer_activity.set_expecting_blocks_since(Some(self.time_getter.get_time()));
@@ -945,7 +956,7 @@ where
945956
}
946957

947958
async fn send_block(&mut self, id: Id<Block>) -> Result<()> {
948-
let (block, index) = self
959+
let (block, block_index) = self
949960
.chainstate_handle
950961
.call(move |c| {
951962
let index = c.get_block_index(&id);
@@ -960,14 +971,27 @@ where
960971
// to delete block indices of missing blocks when resetting their failure flags).
961972
// P2p should handle such situations correctly (see issue #1033 for more details).
962973
let block = block?.unwrap_or_else(|| panic!("Unknown block requested: {id}"));
963-
self.outgoing.best_sent_block = index?;
974+
let block_index = block_index?.expect("Block index must exist");
975+
976+
let old_best_sent_block_id = self.outgoing.best_sent_block.as_ref().map(|idx| {
977+
let id: Id<GenBlock> = (*idx.block_id()).into();
978+
id
979+
});
980+
let new_best_sent_block_id = self
981+
.chainstate_handle
982+
.call(move |c| choose_peers_best_block(c, old_best_sent_block_id, Some(id.into())))
983+
.await?;
984+
985+
if new_best_sent_block_id == Some(id.into()) {
986+
self.outgoing.best_sent_block = Some(block_index);
987+
}
964988

965989
log::debug!(
966990
"[peer id = {}] Sending block with id = {} to the peer",
967991
self.id(),
968992
block.get_id()
969993
);
970-
self.send_message(SyncMessage::BlockResponse(BlockResponse::new(block)))
994+
self.send_message(SyncMessage::BlockResponse(BlockResponse::new(block))).await
971995
}
972996

973997
async fn disconnect_if_stalling(&mut self) -> Result<()> {

0 commit comments

Comments
 (0)