Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate p2p messages and topics #1755

Merged
merged 10 commits into from
Apr 22, 2023
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
- p2p stack is improved [#1755](https://github.com/KomodoPlatform/atomicDEX-API/pull/1755)
- - Validate topics if they are mixed or not.
- - Do early return if the message data is not valid (since no point to iterate over and over on the invalid message)
- - Avoid decoding messages that have more than 25 topics
- - Break the loop right after processing any of `SWAP_PREFIX`, `WATCHER_PREFIX`, `TX_HELPER_PREFIX` topic.
- An issue was fixed where we don't have to wait for all EVM nodes to sync the latest account nonce [#1757](https://github.com/KomodoPlatform/atomicDEX-API/pull/1757)


## v1.0.2-beta - 2023-04-11
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const ABCI_REQUEST_PROVE: bool = false;
/// 0.25 is good average gas price on atom and iris
const DEFAULT_GAS_PRICE: f64 = 0.25;
pub(super) const TIMEOUT_HEIGHT_DELTA: u64 = 100;
pub const GAS_LIMIT_DEFAULT: u64 = 100_000;
pub const GAS_LIMIT_DEFAULT: u64 = 125_000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document(either here in the PR comments or in the changelog) why you are changing this value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because HTLCs requiring more gas than our limit lately. Since gas prices can be updated from the coins configuration, this should never be problem even if we make it 200_000.

pub(crate) const TX_DEFAULT_MEMO: &str = "";

// https://github.com/irisnet/irismod/blob/5016c1be6fdbcffc319943f33713f4a057622f0a/modules/htlc/types/validation.go#L19-L22
Expand Down
17 changes: 1 addition & 16 deletions mm2src/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ impl Decoder for GossipsubCodec {
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
const MAX_TOPICS: usize = 25;

let packet = some_or_return_ok_none!(self.length_codec.decode(src)?);

let rpc = rpc_proto::Rpc::decode(&packet[..])?;
Expand All @@ -230,25 +228,12 @@ impl Decoder for GossipsubCodec {
"sequence number has an incorrect size",
));
}

let topics = publish.topic_ids.into_iter().map(TopicHash::from_raw);
if topics.len() > MAX_TOPICS {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Too many topics provided. Allowed topic count: {}, Received topic count: {}",
MAX_TOPICS,
topics.len()
),
));
}

messages.push(GossipsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?,
data: publish.data.unwrap_or_default(),
sequence_number: BigEndian::read_u64(&seq_no),
topics: topics.collect(),
topics: publish.topic_ids.into_iter().map(TopicHash::from_raw).collect(),
});
}

Expand Down
6 changes: 5 additions & 1 deletion mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ async fn process_p2p_message(
);

if let Err(e) = fut.await {
log::error!("{}", e);
if e.get_inner().is_warning() {
log::warn!("{}", e);
} else {
log::error!("{}", e);
}
return;
}

Expand Down
101 changes: 54 additions & 47 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ use std::time::Duration;
use trie_db::NodeCodec as NodeCodecT;
use uuid::Uuid;

use crate::mm2::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, P2PRequest};
use crate::mm2::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, P2PRequest,
P2PRequestError};
use crate::mm2::lp_swap::{calc_max_maker_vol, check_balance_for_maker_swap, check_balance_for_taker_swap,
check_other_coin_balance_for_swap, get_max_maker_vol, insert_new_swap_to_db,
is_pubkey_banned, lp_atomic_locktime, p2p_keypair_and_peer_id_to_broadcast,
Expand Down Expand Up @@ -138,9 +139,26 @@ pub enum OrderbookP2PHandlerError {
#[display(fmt = "Pubkey '{}' is not allowed.", _0)]
PubkeyNotAllowed(String),

#[display(fmt = "P2P request error: {}", _0)]
P2PRequestError(String),

#[display(
fmt = "Couldn't find an order {}, ignoring, it will be synced upon pubkey keep alive",
_0
)]
OrderNotFound(Uuid),

Internal(String),
}

impl OrderbookP2PHandlerError {
pub(crate) fn is_warning(&self) -> bool { matches!(self, OrderbookP2PHandlerError::OrderNotFound(_)) }
}

impl From<P2PRequestError> for OrderbookP2PHandlerError {
fn from(e: P2PRequestError) -> Self { OrderbookP2PHandlerError::P2PRequestError(e.to_string()) }
}

/// Alphabetically ordered orderbook pair
type AlbOrderedOrderbookPair = String;
type PubkeyOrders = Vec<(Uuid, OrderbookP2PItem)>;
Expand Down Expand Up @@ -265,7 +283,7 @@ async fn process_orders_keep_alive(
from_pubkey: String,
keep_alive: new_protocol::PubkeyKeepAlive,
i_am_relay: bool,
) -> bool {
) -> OrderbookP2PHandlerResult {
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed");
let to_request = ordermatch_ctx
.orderbook
Expand All @@ -275,17 +293,21 @@ async fn process_orders_keep_alive(
let req = match to_request {
Some(req) => req,
// The message was processed, simply forward it
None => return true,
None => return Ok(()),
};

let resp =
request_one_peer::<SyncPubkeyOrderbookStateRes>(ctx.clone(), P2PRequest::Ordermatch(req), propagated_from_peer)
.await;

let response = match resp {
Ok(Some(resp)) => resp,
_ => return false,
};
let response = request_one_peer::<SyncPubkeyOrderbookStateRes>(
ctx.clone(),
P2PRequest::Ordermatch(req),
propagated_from_peer.clone(),
)
.await?
.ok_or_else(|| {
MmError::new(OrderbookP2PHandlerError::P2PRequestError(format!(
"No response was received from peer {} for SyncPubkeyOrderbookState request!",
propagated_from_peer
)))
})?;

let mut orderbook = ordermatch_ctx.orderbook.lock();
for (pair, diff) in response.pair_orders_diff {
Expand All @@ -300,27 +322,27 @@ async fn process_orders_keep_alive(
DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(&mut orderbook, values, params),
};
}
true

Ok(())
}

fn process_maker_order_updated(ctx: MmArc, from_pubkey: String, updated_msg: new_protocol::MakerOrderUpdated) -> bool {
fn process_maker_order_updated(
ctx: MmArc,
from_pubkey: String,
updated_msg: new_protocol::MakerOrderUpdated,
) -> OrderbookP2PHandlerResult {
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed");
let uuid = updated_msg.uuid();
let mut orderbook = ordermatch_ctx.orderbook.lock();
match orderbook.find_order_by_uuid_and_pubkey(&uuid, &from_pubkey) {
Some(mut order) => {
order.apply_updated(&updated_msg);
orderbook.insert_or_update_order_update_trie(order);
true
},
None => {
log::warn!(
"Couldn't find an order {}, ignoring, it will be synced upon pubkey keep alive",
uuid
);
false
},
}

let mut order = orderbook
.find_order_by_uuid_and_pubkey(&uuid, &from_pubkey)
.ok_or_else(|| MmError::new(OrderbookP2PHandlerError::OrderNotFound(uuid)))?;
order.apply_updated(&updated_msg);
drop_mutability!(order);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it require to drop mutability explicitly?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop_mutability! is done to ensure that the order cannot be accidentally modified later in this part of the code.

orderbook.insert_or_update_order_update_trie(order);

Ok(())
}

// fn verify_pubkey_orderbook(orderbook: &GetOrderbookPubkeyItem) -> Result<(), String> {
Expand Down Expand Up @@ -511,21 +533,17 @@ pub async fn handle_orderbook_msg(
};
}

drop_mutability!(orderbook_pairs);

if !orderbook_pairs.is_empty() {
process_msg(ctx, orderbook_pairs, from_peer, msg, i_am_relay).await?;
process_msg(ctx, from_peer, msg, i_am_relay).await?;
}

Ok(())
}

/// Attempts to decode a message and process it returning whether the message is valid and worth rebroadcasting
pub async fn process_msg(
ctx: MmArc,
_topics: Vec<String>,
from_peer: String,
msg: &[u8],
i_am_relay: bool,
) -> OrderbookP2PHandlerResult {
pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: bool) -> OrderbookP2PHandlerResult {
match decode_signed::<new_protocol::OrdermatchMessage>(msg) {
Ok((message, _sig, pubkey)) => {
if is_pubkey_banned(&ctx, &pubkey.unprefixed().into()) {
Expand All @@ -538,13 +556,7 @@ pub async fn process_msg(
Ok(())
},
new_protocol::OrdermatchMessage::PubkeyKeepAlive(keep_alive) => {
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay)
.await
.then_some(())
.ok_or_else(|| {
OrderbookP2PHandlerError::Internal("`process_orders_keep_alive` was failed.".to_string())
.into()
})
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay).await
},
new_protocol::OrdermatchMessage::TakerRequest(taker_request) => {
let msg = TakerRequest::from_new_proto_and_pubkey(taker_request, pubkey.unprefixed().into());
Expand Down Expand Up @@ -572,11 +584,6 @@ pub async fn process_msg(
},
new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => {
process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg)
.then_some(())
.ok_or_else(|| {
OrderbookP2PHandlerError::Internal("`process_maker_order_updated` was failed.".to_string())
.into()
})
},
}
},
Expand Down
20 changes: 10 additions & 10 deletions mm2src/mm2_test_helpers/dummy_files/iris_nimda_history.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.027385",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "3930374535314439384537304239343431413230443630420000000000000000",
Expand Down Expand Up @@ -45,7 +45,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.028433",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "4138433845353931363133303743353941344143374536320000000000000000",
Expand Down Expand Up @@ -74,7 +74,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.026403",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "3436313245464444363537373544434341433838463935340000000000000000",
Expand Down Expand Up @@ -103,7 +103,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.028433",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "3931453345363544374341333833394331463141394435320000000000000000",
Expand Down Expand Up @@ -134,7 +134,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.025763",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "3643343243423346353232433844414333343138443338440000000000000000",
Expand Down Expand Up @@ -162,7 +162,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.022114",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "4631313139343742323142303344373639303343333441370000000000000000",
Expand Down Expand Up @@ -190,7 +190,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.026861",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "3236413546344639343144434531363135373046373530330000000000000000",
Expand Down Expand Up @@ -218,7 +218,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.026861",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "4344413036444433353735413332323330363430414634310000000000000000",
Expand Down Expand Up @@ -246,7 +246,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.022114",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "4339384239303045393643333738453237333945314344330000000000000000",
Expand Down Expand Up @@ -274,7 +274,7 @@
"type": "Tendermint",
"coin": "IRIS-TEST",
"amount": "0.02316",
"gas_limit": 100000
"gas_limit": 125000
},
"coin": "IRIS-NIMDA",
"internal_id": "4136303630343838314237323638453734433246453537450000000000000000",
Expand Down
Loading