Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate p2p messages and topics #1755

Merged
merged 10 commits into from
Apr 22, 2023
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## ${next-version} - ${release-date}

**Features:**

**Enhancements/Fixes:**
- p2p stack is improved [#1755](https://github.com/KomodoPlatform/atomicDEX-API/pull/1755)
- - Validate topics if they are mixed or not.
- - Do early return if the message data is not valid (since no point to iterate over and over on the invalid message)
- - Break the loop right after processing any of `SWAP_PREFIX`, `WATCHER_PREFIX`, `TX_HELPER_PREFIX` topic.


## v1.0.2-beta - 2023-04-11

**Features:**
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const ABCI_REQUEST_PROVE: bool = false;
/// 0.25 is good average gas price on atom and iris
const DEFAULT_GAS_PRICE: f64 = 0.25;
pub(super) const TIMEOUT_HEIGHT_DELTA: u64 = 100;
pub const GAS_LIMIT_DEFAULT: u64 = 100_000;
pub const GAS_LIMIT_DEFAULT: u64 = 125_000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document(either here in the PR comments or in the changelog) why you are changing this value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because HTLCs requiring more gas than our limit lately. Since gas prices can be updated from the coins configuration, this should never be problem even if we make it 200_000.

pub(crate) const TX_DEFAULT_MEMO: &str = "";

// https://github.com/irisnet/irismod/blob/5016c1be6fdbcffc319943f33713f4a057622f0a/modules/htlc/types/validation.go#L19-L22
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1,
use sha2::{Digest, Sha256};

pub use atomicdex_behaviour::{spawn_gossipsub, AdexBehaviourError, NodeType, WssCerts};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId, TopicHash};
pub use libp2p::identity::error::DecodingError;
pub use libp2p::identity::secp256k1::PublicKey as Libp2pSecpPublic;
pub use libp2p::identity::PublicKey as Libp2pPublic;
Expand Down
94 changes: 71 additions & 23 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, Adex
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic,
MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR};
use mm2_metrics::{mm_label, mm_timing};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
Expand All @@ -39,7 +39,8 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;
use wasm_timer::Instant;

use crate::mm2::{lp_ordermatch, lp_stats, lp_swap};
use crate::mm2::lp_ordermatch;
use crate::mm2::{lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;

Expand Down Expand Up @@ -138,36 +139,94 @@ async fn process_p2p_message(
mut message: GossipsubMessage,
i_am_relay: bool,
) {
fn is_valid(topics: &[TopicHash]) -> Result<(), String> {
if topics.is_empty() {
return Err("At least one topic must be provided.".to_string());
}

let first_topic_prefix = topics[0].as_str().split(TOPIC_SEPARATOR).next().unwrap_or_default();
for item in topics.iter().skip(1) {
if !item.as_str().starts_with(first_topic_prefix) {
return Err(format!(
"Topics are invalid, received more than one topic kind. Topics '{:?}",
shamardy marked this conversation as resolved.
Show resolved Hide resolved
topics
));
}
}

Ok(())
}

let mut to_propagate = false;
let mut orderbook_pairs = vec![];

message.topics.dedup();
drop_mutability!(message);

for topic in message.topics {
if let Err(err) = is_valid(&message.topics) {
log::error!("{}", err);
return;
}

let inform_about_break = |used: &str, all: &[TopicHash]| {
log::debug!(
"Topic '{}' proceed and loop is killed. Whole topic list was '{:?}'",
used,
all
);
};
shamardy marked this conversation as resolved.
Show resolved Hide resolved

for topic in message.topics.iter() {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);

match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
if let Some(pair) = split.next() {
orderbook_pairs.push(pair.to_string());
let fut = lp_ordermatch::handle_orderbook_msg(
ctx.clone(),
&message.topics,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if let Err(e) = fut.await {
log::error!("{}", e);
return;
}

to_propagate = true;
break;
},
Some(lp_swap::SWAP_PREFIX) => {
lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await;
if let Err(e) = lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await
{
log::error!("{}", e);
return;
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
lp_swap::process_watcher_msg(ctx.clone(), &message.data).await;
if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) {
log::error!("{}", e);
return;
}
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
continue;
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
Expand All @@ -182,25 +241,14 @@ async fn process_p2p_message(
})
}
}

inform_about_break(topic.as_str(), &message.topics);
break;
},
None | Some(_) => (),
}
}

if !orderbook_pairs.is_empty() {
let process_fut = lp_ordermatch::process_msg(
ctx.clone(),
orderbook_pairs,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if process_fut.await {
to_propagate = true;
}
}

if to_propagate && i_am_relay {
propagate_message(&ctx, message_id, peer_id);
}
Expand Down
95 changes: 79 additions & 16 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use http::Response;
use keys::{AddressFormat, KeyPair};
use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicPrefix, TOPIC_SEPARATOR};
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicHash, TopicPrefix,
TOPIC_SEPARATOR};
use mm2_metrics::mm_gauge;
use mm2_number::{construct_detailed, BigDecimal, BigRational, Fraction, MmNumber, MmNumberMultiRepr};
#[cfg(test)] use mocktopus::macros::*;
Expand Down Expand Up @@ -90,7 +91,8 @@ pub use lp_bot::{start_simple_market_maker_bot, stop_simple_market_maker_bot, St

#[path = "lp_ordermatch/my_orders_storage.rs"]
mod my_orders_storage;
#[path = "lp_ordermatch/new_protocol.rs"] mod new_protocol;
#[path = "lp_ordermatch/new_protocol.rs"]
pub(crate) mod new_protocol;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more clear it did not stick up, explained before

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify this, didn't understand it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it can be rolled back because stick up logic has been hidden

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

#[path = "lp_ordermatch/order_requests_tracker.rs"]
mod order_requests_tracker;
#[path = "lp_ordermatch/orderbook_depth.rs"] mod orderbook_depth;
Expand Down Expand Up @@ -123,6 +125,22 @@ const TRIE_ORDER_HISTORY_TIMEOUT: u64 = 300;
#[cfg(test)]
const TRIE_ORDER_HISTORY_TIMEOUT: u64 = 3;

pub type OrderbookP2PHandlerResult = Result<(), MmError<OrderbookP2PHandlerError>>;

#[derive(Display)]
pub enum OrderbookP2PHandlerError {
#[display(fmt = "'{}' is an invalid topic for the orderbook handler.", _0)]
InvalidTopic(String),

#[display(fmt = "Message decoding was failed. Error: {}", _0)]
DecodeError(String),

#[display(fmt = "Pubkey '{}' is not allowed.", _0)]
PubkeyNotAllowed(String),

Internal(String),
}

/// Alphabetically ordered orderbook pair
type AlbOrderedOrderbookPair = String;
type PubkeyOrders = Vec<(Uuid, OrderbookP2PItem)>;
Expand Down Expand Up @@ -468,56 +486,101 @@ fn remove_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair:
pubkey_state.trie_roots.remove(alb_pair);
}

pub async fn handle_orderbook_msg(
ctx: MmArc,
topics: &[TopicHash],
from_peer: String,
msg: &[u8],
i_am_relay: bool,
) -> OrderbookP2PHandlerResult {
if let Err(e) = decode_signed::<new_protocol::OrdermatchMessage>(msg) {
return MmError::err(OrderbookP2PHandlerError::DecodeError(e.to_string()));
};

let mut orderbook_pairs = vec![];

for topic in topics {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);
match (split.next(), split.next()) {
(Some(ORDERBOOK_PREFIX), Some(pair)) => {
orderbook_pairs.push(pair.to_string());
},
_ => {
return MmError::err(OrderbookP2PHandlerError::InvalidTopic(topic.as_str().to_owned()));
},
};
}

if !orderbook_pairs.is_empty() {
process_msg(ctx, orderbook_pairs, from_peer, msg, i_am_relay).await?;
}

Ok(())
}

/// Attempts to decode a message and process it returning whether the message is valid and worth rebroadcasting
pub async fn process_msg(ctx: MmArc, _topics: Vec<String>, from_peer: String, msg: &[u8], i_am_relay: bool) -> bool {
pub async fn process_msg(
ctx: MmArc,
_topics: Vec<String>,
onur-ozkan marked this conversation as resolved.
Show resolved Hide resolved
from_peer: String,
msg: &[u8],
i_am_relay: bool,
) -> OrderbookP2PHandlerResult {
match decode_signed::<new_protocol::OrdermatchMessage>(msg) {
Ok((message, _sig, pubkey)) => {
if is_pubkey_banned(&ctx, &pubkey.unprefixed().into()) {
log::warn!("Pubkey {} is banned", pubkey.to_hex());
return false;
return MmError::err(OrderbookP2PHandlerError::PubkeyNotAllowed(pubkey.to_hex()));
}
match message {
new_protocol::OrdermatchMessage::MakerOrderCreated(created_msg) => {
let order: OrderbookItem = (created_msg, hex::encode(pubkey.to_bytes().as_slice())).into();
insert_or_update_order(&ctx, order);
true
Ok(())
},
new_protocol::OrdermatchMessage::PubkeyKeepAlive(keep_alive) => {
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay).await
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay)
.await
.then_some(())
.ok_or_else(|| {
OrderbookP2PHandlerError::Internal("`process_orders_keep_alive` was failed.".to_string())
.into()
})
},
new_protocol::OrdermatchMessage::TakerRequest(taker_request) => {
let msg = TakerRequest::from_new_proto_and_pubkey(taker_request, pubkey.unprefixed().into());
process_taker_request(ctx, pubkey.unprefixed().into(), msg).await;
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerReserved(maker_reserved) => {
let msg = MakerReserved::from_new_proto_and_pubkey(maker_reserved, pubkey.unprefixed().into());
// spawn because process_maker_reserved may take significant time to run
let spawner = ctx.spawner();
spawner.spawn(process_maker_reserved(ctx, pubkey.unprefixed().into(), msg));
true
Ok(())
},
new_protocol::OrdermatchMessage::TakerConnect(taker_connect) => {
process_taker_connect(ctx, pubkey.unprefixed().into(), taker_connect.into()).await;
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerConnected(maker_connected) => {
process_maker_connected(ctx, pubkey.unprefixed().into(), maker_connected.into()).await;
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => {
delete_order(&ctx, &pubkey.to_hex(), cancelled_msg.uuid.into());
true
Ok(())
},
new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => {
process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg)
.then_some(())
.ok_or_else(|| {
OrderbookP2PHandlerError::Internal("`process_maker_order_updated` was failed.".to_string())
.into()
})
},
}
},
Err(e) => {
error!("Error {} while decoding signed message", e);
false
},
Err(e) => MmError::err(OrderbookP2PHandlerError::DecodeError(e.to_string())),
}
}

Expand Down
20 changes: 9 additions & 11 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
// marketmaker
//

use crate::mm2::lp_network::{broadcast_p2p_msg, Libp2pPeerId};
use super::lp_network::P2PRequestResult;
use crate::mm2::lp_network::{broadcast_p2p_msg, Libp2pPeerId, P2PRequestError};
use bitcrypto::{dhash160, sha256};
use coins::eth::Web3RpcError;
use coins::{lp_coinfind, lp_coinfind_or_err, CoinFindError, MmCoinEnum, TradeFee, TransactionEnum};
Expand Down Expand Up @@ -97,7 +98,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
#[path = "lp_swap/recreate_swap_data.rs"] mod recreate_swap_data;
#[path = "lp_swap/saved_swap.rs"] mod saved_swap;
#[path = "lp_swap/swap_lock.rs"] mod swap_lock;
#[path = "lp_swap/swap_watcher.rs"] mod swap_watcher;
#[path = "lp_swap/swap_watcher.rs"] pub(crate) mod swap_watcher;
#[path = "lp_swap/taker_swap.rs"] mod taker_swap;
#[path = "lp_swap/trade_preimage.rs"] mod trade_preimage;

Expand Down Expand Up @@ -236,11 +237,8 @@ pub fn broadcast_p2p_tx_msg(ctx: &MmArc, topic: String, msg: &TransactionEnum, p
broadcast_p2p_msg(ctx, vec![topic], encoded_msg, from);
}

pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) {
let uuid = match Uuid::from_str(topic) {
Ok(u) => u,
Err(_) => return,
};
pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequestResult<()> {
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved
let uuid = Uuid::from_str(topic).map_to_mm(|e| P2PRequestError::DecodeError(e.to_string()))?;

let msg = match decode_signed::<SwapMsg>(msg) {
Ok(m) => m,
Expand All @@ -258,9 +256,7 @@ pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) {
error!("Couldn't deserialize 'SwapStatus': {:?}", swap_status_err);
},
};
// Drop it to avoid dead_code warning
drop(swap_msg_err);
return;
return MmError::err(P2PRequestError::DecodeError(swap_msg_err.to_string()));
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved
},
};

Expand All @@ -280,7 +276,9 @@ pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) {
} else {
warn!("Received message from unexpected sender for swap {}", uuid);
}
}
};

Ok(())
}

pub fn swap_topic(uuid: &Uuid) -> String { pub_sub_topic(SWAP_PREFIX, &uuid.to_string()) }
Expand Down
Loading