Skip to content

Commit

Permalink
Set explicit 10 seconds timeout for P2P requests in our behavior #910 (
Browse files Browse the repository at this point in the history
…#911)

* Set explicit 10 seconds timeout for P2P requests in our behavior.
Do not hold orderbook lock during GetOrderbook request.

* Limit Node::wait_peers loop to 10 attempts.
  • Loading branch information
artemii235 authored Apr 20, 2021
1 parent 335c960 commit 2adfc36
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
6 changes: 3 additions & 3 deletions mm2src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,6 @@ async fn process_maker_order_updated(
///
/// The function locks [`MmCtx::p2p_ctx`] and [`MmCtx::ordermatch_ctx`]
async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> {
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap();
let mut orderbook = ordermatch_ctx.orderbook.lock().await;

let request = OrdermatchRequest::GetOrderbook {
base: base.to_string(),
rel: rel.to_string(),
Expand All @@ -249,6 +246,9 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul
None => return Ok(()),
};

let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap();
let mut orderbook = ordermatch_ctx.orderbook.lock().await;

let alb_pair = alb_ordered_pair(base, rel);
for (pubkey, GetOrderbookPubkeyItem { orders, .. }) in pubkey_orders {
let pubkey_bytes = match hex::decode(&pubkey) {
Expand Down
5 changes: 5 additions & 0 deletions mm2src/mm2_libp2p/src/atomicdex_behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl Node {
async fn send_cmd(&mut self, cmd: AdexBehaviourCmd) { self.cmd_tx.send(cmd).await.unwrap(); }

async fn wait_peers(&mut self, number: usize) {
let mut attempts = 0;
loop {
let (tx, rx) = oneshot::channel();
self.cmd_tx
Expand All @@ -70,6 +71,10 @@ impl Node {
},
Err(e) => panic!("{}", e),
}
attempts += 1;
if attempts >= 10 {
panic!("wait_peers {} attempts exceeded", attempts);
}
}
}
}
Expand Down
44 changes: 39 additions & 5 deletions mm2src/mm2_libp2p/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use libp2p::request_response::{ProtocolName, ProtocolSupport, RequestId, Request
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use libp2p::NetworkBehaviour;
use libp2p::PeerId;
use log::{debug, error};
use log::{debug, error, warn};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::io;
use std::time::Duration;
use wasm_timer::{Instant, Interval};

const MAX_BUFFER_SIZE: usize = 1024 * 1024 - 100;

Expand All @@ -33,12 +35,17 @@ pub fn build_request_response_behaviour() -> RequestResponseBehaviour {
let (tx, rx) = mpsc::unbounded();
let pending_requests = HashMap::new();
let events = VecDeque::new();
let timeout = Duration::from_secs(10);
let timeout_interval = Interval::new(Duration::from_secs(1));

RequestResponseBehaviour {
tx,
rx,
pending_requests,
events,
inner,
timeout,
timeout_interval,
}
}

Expand All @@ -50,6 +57,11 @@ pub enum RequestResponseBehaviourEvent {
},
}

struct PendingRequest {
tx: oneshot::Sender<PeerResponse>,
initiated_at: Instant,
}

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "RequestResponseBehaviourEvent")]
#[behaviour(poll_method = "poll_event")]
Expand All @@ -59,10 +71,16 @@ pub struct RequestResponseBehaviour {
#[behaviour(ignore)]
tx: RequestResponseSender,
#[behaviour(ignore)]
pending_requests: HashMap<RequestId, oneshot::Sender<PeerResponse>>,
pending_requests: HashMap<RequestId, PendingRequest>,
/// Events that need to be yielded to the outside when polling.
#[behaviour(ignore)]
events: VecDeque<RequestResponseBehaviourEvent>,
/// Timeout for pending requests
#[behaviour(ignore)]
timeout: Duration,
/// Interval for request timeout check
#[behaviour(ignore)]
timeout_interval: Interval,
/// The inner RequestResponse network behaviour.
inner: RequestResponse<Codec<Protocol, PeerRequest, PeerResponse>>,
}
Expand All @@ -81,7 +99,11 @@ impl RequestResponseBehaviour {
response_tx: oneshot::Sender<PeerResponse>,
) -> RequestId {
let request_id = self.inner.send_request(&peer_id, request);
assert!(self.pending_requests.insert(request_id, response_tx).is_none());
let pending_request = PendingRequest {
tx: response_tx,
initiated_at: Instant::now(),
};
assert!(self.pending_requests.insert(request_id, pending_request).is_none());
request_id
}

Expand All @@ -106,6 +128,18 @@ impl RequestResponseBehaviour {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}

while let Poll::Ready(Some(())) = self.timeout_interval.poll_next_unpin(cx) {
let now = Instant::now();
let timeout = self.timeout;
self.pending_requests.retain(|request_id, pending_request| {
let retain = now.duration_since(pending_request.initiated_at) < timeout;
if !retain {
warn!("Request {} timed out", request_id);
}
retain
});
}

Poll::Pending
}

Expand All @@ -124,8 +158,8 @@ impl RequestResponseBehaviour {

fn process_response(&mut self, request_id: RequestId, response: PeerResponse) {
match self.pending_requests.remove(&request_id) {
Some(tx) => {
if let Err(e) = tx.send(response) {
Some(pending) => {
if let Err(e) = pending.tx.send(response) {
error!("{:?}. Request {:?} is not processed", e, request_id);
}
},
Expand Down

0 comments on commit 2adfc36

Please sign in to comment.