Skip to content

Commit

Permalink
optimize p2p network layer
Browse files Browse the repository at this point in the history
Signed-off-by: ozkanonur <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Apr 16, 2023
1 parent 8ce972d commit 911bc31
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 52 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
## ${next-version} - ${release-date}

**Features:**

**Enhancements/Fixes:**
- p2p stack is improved [#1755](https://github.com/KomodoPlatform/atomicDEX-API/pull/1755)
- - Validate topics if they are mixed or not.
- - Do early return if the message data is not valid (since no point to iterate over and over on the invalid message)
- - Avoid decoding messages that have more than 25 topics
- - Break the loop right after processing any of `SWAP_PREFIX`, `WATCHER_PREFIX`, `TX_HELPER_PREFIX` topic.


## v1.0.2-beta - 2023-04-11

**Features:**
Expand Down
17 changes: 16 additions & 1 deletion mm2src/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ impl Decoder for GossipsubCodec {
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
const MAX_TOPICS: usize = 25;

let packet = some_or_return_ok_none!(self.length_codec.decode(src)?);

let rpc = rpc_proto::Rpc::decode(&packet[..])?;
Expand All @@ -228,12 +230,25 @@ impl Decoder for GossipsubCodec {
"sequence number has an incorrect size",
));
}

let topics = publish.topic_ids.into_iter().map(TopicHash::from_raw);
if topics.len() > MAX_TOPICS {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Too many topics provided. Allowed topic count: {}, Received topic count: {}",
MAX_TOPICS,
topics.len()
),
));
}

messages.push(GossipsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?,
data: publish.data.unwrap_or_default(),
sequence_number: BigEndian::read_u64(&seq_no),
topics: publish.topic_ids.into_iter().map(TopicHash::from_raw).collect(),
topics: topics.collect(),
});
}

Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1,
use sha2::{Digest, Sha256};

pub use atomicdex_behaviour::{spawn_gossipsub, AdexBehaviourError, NodeType, WssCerts};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId, TopicHash};
pub use libp2p::identity::error::DecodingError;
pub use libp2p::identity::secp256k1::PublicKey as Libp2pSecpPublic;
pub use libp2p::identity::PublicKey as Libp2pPublic;
Expand Down
94 changes: 71 additions & 23 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, Adex
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic,
MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR};
use mm2_metrics::{mm_label, mm_timing};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
Expand All @@ -39,7 +39,8 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;
use wasm_timer::Instant;

use crate::mm2::{lp_ordermatch, lp_stats, lp_swap};
use crate::mm2::lp_ordermatch;
use crate::mm2::{lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;

Expand Down Expand Up @@ -138,36 +139,94 @@ async fn process_p2p_message(
mut message: GossipsubMessage,
i_am_relay: bool,
) {
fn is_valid(topics: &[TopicHash]) -> Result<(), String> {
if topics.is_empty() {
return Err("At least one topic must be provided.".to_string());
}

let first_topic_prefix = topics[0].as_str().split(TOPIC_SEPARATOR).next().unwrap_or_default();
for item in topics.iter().skip(1) {
if !item.as_str().starts_with(first_topic_prefix) {
return Err(format!(
"Topics are invalid, received more than one topic kind. Topics '{:?}",
topics
));
}
}

Ok(())
}

let mut to_propagate = false;
let mut orderbook_pairs = vec![];

message.topics.dedup();
drop_mutability!(message);

for topic in message.topics {
if let Err(err) = is_valid(&message.topics) {
log::error!("{}", err);
return;
}

let inform_about_break = |used: &str, all: &[TopicHash]| {
log::debug!(
"Topic '{}' proceed and loop is killed. Whole topic list was '{:?}'",
used,
all
);
};

for topic in message.topics.iter() {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);

match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
if let Some(pair) = split.next() {
orderbook_pairs.push(pair.to_string());
let fut = lp_ordermatch::handle_orderbook_msg(
ctx.clone(),
&message.topics,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if let Err(e) = fut.await {
log::error!("{}", e);
return;
}

to_propagate = true;
break;
},
Some(lp_swap::SWAP_PREFIX) => {
lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await;
if let Err(e) = lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await
{
log::error!("{}", e);
return;
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
lp_swap::process_watcher_msg(ctx.clone(), &message.data).await;
if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) {
log::error!("{}", e);
return;
}
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
continue;
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
Expand All @@ -182,25 +241,14 @@ async fn process_p2p_message(
})
}
}

inform_about_break(topic.as_str(), &message.topics);
break;
},
None | Some(_) => (),
}
}

if !orderbook_pairs.is_empty() {
let process_fut = lp_ordermatch::process_msg(
ctx.clone(),
orderbook_pairs,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if process_fut.await {
to_propagate = true;
}
}

if to_propagate && i_am_relay {
propagate_message(&ctx, message_id, peer_id);
}
Expand Down
91 changes: 79 additions & 12 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use http::Response;
use keys::{AddressFormat, KeyPair};
use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicPrefix, TOPIC_SEPARATOR};
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicHash, TopicPrefix,
TOPIC_SEPARATOR};
use mm2_metrics::mm_gauge;
use mm2_number::{construct_detailed, BigDecimal, BigRational, Fraction, MmNumber, MmNumberMultiRepr};
#[cfg(test)] use mocktopus::macros::*;
Expand Down Expand Up @@ -90,7 +91,8 @@ pub use lp_bot::{start_simple_market_maker_bot, stop_simple_market_maker_bot, St

#[path = "lp_ordermatch/my_orders_storage.rs"]
mod my_orders_storage;
#[path = "lp_ordermatch/new_protocol.rs"] mod new_protocol;
#[path = "lp_ordermatch/new_protocol.rs"]
pub(crate) mod new_protocol;
#[path = "lp_ordermatch/order_requests_tracker.rs"]
mod order_requests_tracker;
#[path = "lp_ordermatch/orderbook_depth.rs"] mod orderbook_depth;
Expand Down Expand Up @@ -123,6 +125,22 @@ const TRIE_ORDER_HISTORY_TIMEOUT: u64 = 300;
#[cfg(test)]
const TRIE_ORDER_HISTORY_TIMEOUT: u64 = 3;

pub type OrderbookP2PHandlerResult = Result<(), MmError<OrderbookP2PHandlerError>>;

#[derive(Display)]
pub enum OrderbookP2PHandlerError {
#[display(fmt = "'{}' is an invalid topic for the orderbook handler.", _0)]
InvalidTopic(String),

#[display(fmt = "Message decoding was failed. Error: {}", _0)]
DecodeError(String),

#[display(fmt = "Pubkey '{}' is not allowed.", _0)]
PubkeyNotAllowed(String),

Internal(String),
}

/// Alphabetically ordered orderbook pair
type AlbOrderedOrderbookPair = String;
type PubkeyOrders = Vec<(Uuid, OrderbookP2PItem)>;
Expand Down Expand Up @@ -468,55 +486,104 @@ fn remove_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair:
pubkey_state.trie_roots.remove(alb_pair);
}

pub async fn handle_orderbook_msg(
ctx: MmArc,
topics: &[TopicHash],
from_peer: String,
msg: &[u8],
i_am_relay: bool,
) -> OrderbookP2PHandlerResult {
if let Err(e) = decode_signed::<new_protocol::OrdermatchMessage>(msg) {
return MmError::err(OrderbookP2PHandlerError::DecodeError(e.to_string()));
};

let mut orderbook_pairs = vec![];

for topic in topics {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);
match (split.next(), split.next()) {
(Some(ORDERBOOK_PREFIX), Some(pair)) => {
orderbook_pairs.push(pair.to_string());
},
_ => {
return MmError::err(OrderbookP2PHandlerError::InvalidTopic(topic.as_str().to_owned()));
},
};
}

if !orderbook_pairs.is_empty() {
process_msg(ctx, orderbook_pairs, from_peer, msg, i_am_relay).await?;
}

Ok(())
}

/// Attempts to decode a message and process it returning whether the message is valid and worth rebroadcasting
pub async fn process_msg(ctx: MmArc, _topics: Vec<String>, from_peer: String, msg: &[u8], i_am_relay: bool) -> bool {
pub async fn process_msg(
ctx: MmArc,
_topics: Vec<String>,
from_peer: String,
msg: &[u8],
i_am_relay: bool,
) -> OrderbookP2PHandlerResult {
match decode_signed::<new_protocol::OrdermatchMessage>(msg) {
Ok((message, _sig, pubkey)) => {
if is_pubkey_banned(&ctx, &pubkey.unprefixed().into()) {
log::warn!("Pubkey {} is banned", pubkey.to_hex());
return false;
return MmError::err(OrderbookP2PHandlerError::PubkeyNotAllowed(pubkey.to_hex()));
}
match message {
new_protocol::OrdermatchMessage::MakerOrderCreated(created_msg) => {
let order: OrderbookItem = (created_msg, hex::encode(pubkey.to_bytes().as_slice())).into();
insert_or_update_order(&ctx, order);
true
Ok(())
},
new_protocol::OrdermatchMessage::PubkeyKeepAlive(keep_alive) => {
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay).await
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay)
.await
.then_some(())
.ok_or_else(|| {
OrderbookP2PHandlerError::Internal("`process_orders_keep_alive` was failed.".to_string())
.into()
})
},
new_protocol::OrdermatchMessage::TakerRequest(taker_request) => {
let msg = TakerRequest::from_new_proto_and_pubkey(taker_request, pubkey.unprefixed().into());
process_taker_request(ctx, pubkey.unprefixed().into(), msg).await;
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerReserved(maker_reserved) => {
let msg = MakerReserved::from_new_proto_and_pubkey(maker_reserved, pubkey.unprefixed().into());
// spawn because process_maker_reserved may take significant time to run
let spawner = ctx.spawner();
spawner.spawn(process_maker_reserved(ctx, pubkey.unprefixed().into(), msg));
true
Ok(())
},
new_protocol::OrdermatchMessage::TakerConnect(taker_connect) => {
process_taker_connect(ctx, pubkey.unprefixed().into(), taker_connect.into()).await;
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerConnected(maker_connected) => {
process_maker_connected(ctx, pubkey.unprefixed().into(), maker_connected.into()).await;
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => {
delete_order(&ctx, &pubkey.to_hex(), cancelled_msg.uuid.into());
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => {
process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg)
.then_some(())
.ok_or_else(|| {
OrderbookP2PHandlerError::Internal("`process_maker_order_updated` was failed.".to_string())
.into()
})
},
}
},
Err(e) => {
error!("Error {} while decoding signed message", e);
false
MmError::err(OrderbookP2PHandlerError::DecodeError(e.to_string()))
},
}
}
Expand Down
Loading

0 comments on commit 911bc31

Please sign in to comment.