Skip to content

Commit

Permalink
improve p2p stack (#1755)
Browse files Browse the repository at this point in the history
* optimize p2p network layer

Signed-off-by: ozkanonur <work@onurozkan.dev>

* update default gas limit for tendermint

Signed-off-by: ozkanonur <work@onurozkan.dev>

* rollback topic limit

Signed-off-by: ozkanonur <work@onurozkan.dev>

* remove unused arg from `lp_ordermatch::process_msg`

Signed-off-by: ozkanonur <work@onurozkan.dev>

* use OrderbookP2PHandlerResult for process_orders_keep_alive and process_maker_order_updated

* return one error for failed SwapMsg, SwapStatus deserialization

* fix process_swap_msg in wasm to return error if SwapMsg can't be deserialized

* remove additional space in SwapMsg, SwapStatus decode error message

* roll back crate visibility for new_protocol mod

---------

Signed-off-by: ozkanonur <work@onurozkan.dev>
Co-authored-by: shamardy <shamardy@yahoo.com>
  • Loading branch information
onur-ozkan and shamardy authored Apr 22, 2023
1 parent 52eb282 commit 547a30a
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 118 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
**Features:**

**Enhancements/Fixes:**
- p2p stack is improved [#1755](https://github.com/KomodoPlatform/atomicDEX-API/pull/1755)
- - Validate topics if they are mixed or not.
- - Do early return if the message data is not valid (since no point to iterate over and over on the invalid message)
- - Break the loop right after processing any of `SWAP_PREFIX`, `WATCHER_PREFIX`, `TX_HELPER_PREFIX` topic.
- An issue was fixed where we don't have to wait for all EVM nodes to sync the latest account nonce [#1757](https://github.com/KomodoPlatform/atomicDEX-API/pull/1757)
- optimized dev and release compilation profiles and removed ci [#1759](https://github.com/KomodoPlatform/atomicDEX-API/pull/1759)


## v1.0.2-beta - 2023-04-11

**Features:**
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const ABCI_REQUEST_PROVE: bool = false;
/// 0.25 is good average gas price on atom and iris
const DEFAULT_GAS_PRICE: f64 = 0.25;
pub(super) const TIMEOUT_HEIGHT_DELTA: u64 = 100;
pub const GAS_LIMIT_DEFAULT: u64 = 100_000;
pub const GAS_LIMIT_DEFAULT: u64 = 125_000;
pub(crate) const TX_DEFAULT_MEMO: &str = "";

// https://github.com/irisnet/irismod/blob/5016c1be6fdbcffc319943f33713f4a057622f0a/modules/htlc/types/validation.go#L19-L22
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1,
use sha2::{Digest, Sha256};

pub use atomicdex_behaviour::{spawn_gossipsub, AdexBehaviourError, NodeType, WssCerts};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId, TopicHash};
pub use libp2p::identity::error::DecodingError;
pub use libp2p::identity::secp256k1::PublicKey as Libp2pSecpPublic;
pub use libp2p::identity::PublicKey as Libp2pPublic;
Expand Down
99 changes: 76 additions & 23 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, Adex
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic,
MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR};
use mm2_metrics::{mm_label, mm_timing};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
Expand All @@ -39,7 +39,8 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;
use wasm_timer::Instant;

use crate::mm2::{lp_ordermatch, lp_stats, lp_swap};
use crate::mm2::lp_ordermatch;
use crate::mm2::{lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;

Expand Down Expand Up @@ -138,36 +139,99 @@ async fn process_p2p_message(
mut message: GossipsubMessage,
i_am_relay: bool,
) {
fn is_valid(topics: &[TopicHash]) -> Result<(), String> {
if topics.is_empty() {
return Err("At least one topic must be provided.".to_string());
}

let first_topic_prefix = topics[0].as_str().split(TOPIC_SEPARATOR).next().unwrap_or_default();
for item in topics.iter().skip(1) {
if !item.as_str().starts_with(first_topic_prefix) {
return Err(format!(
"Topics are invalid, received more than one topic kind. Topics '{:?}",
topics
));
}
}

Ok(())
}

let mut to_propagate = false;
let mut orderbook_pairs = vec![];

message.topics.dedup();
drop_mutability!(message);

for topic in message.topics {
if let Err(err) = is_valid(&message.topics) {
log::error!("{}", err);
return;
}

let inform_about_break = |used: &str, all: &[TopicHash]| {
log::debug!(
"Topic '{}' proceed and loop is killed. Whole topic list was '{:?}'",
used,
all
);
};

for topic in message.topics.iter() {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);

match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
if let Some(pair) = split.next() {
orderbook_pairs.push(pair.to_string());
let fut = lp_ordermatch::handle_orderbook_msg(
ctx.clone(),
&message.topics,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if let Err(e) = fut.await {
if e.get_inner().is_warning() {
log::warn!("{}", e);
} else {
log::error!("{}", e);
}
return;
}

to_propagate = true;
break;
},
Some(lp_swap::SWAP_PREFIX) => {
lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await;
if let Err(e) =
lp_swap::process_swap_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await
{
log::error!("{}", e);
return;
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
lp_swap::process_watcher_msg(ctx.clone(), &message.data).await;
if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) {
log::error!("{}", e);
return;
}
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
continue;
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
Expand All @@ -182,25 +246,14 @@ async fn process_p2p_message(
})
}
}

inform_about_break(topic.as_str(), &message.topics);
break;
},
None | Some(_) => (),
}
}

if !orderbook_pairs.is_empty() {
let process_fut = lp_ordermatch::process_msg(
ctx.clone(),
orderbook_pairs,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if process_fut.await {
to_propagate = true;
}
}

if to_propagate && i_am_relay {
propagate_message(&ctx, message_id, peer_id);
}
Expand Down
Loading

0 comments on commit 547a30a

Please sign in to comment.