Skip to content

Commit

Permalink
test new layer with --no-fail-fast flag
Browse files Browse the repository at this point in the history
Signed-off-by: ozkanonur <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Jul 10, 2023
1 parent 96335e0 commit 8be0055
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 141 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Test
run: |
# wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/master/zcutil/fetch-params-alt.sh | bash
cargo test --bins --lib
cargo test --bins --lib -- --no-fail-fast
mac-x86-64-unit:
timeout-minutes: 90
Expand All @@ -50,7 +50,7 @@ jobs:
- name: Test
run: |
# wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/master/zcutil/fetch-params-alt.sh | bash
cargo test --bins --lib --target x86_64-apple-darwin
cargo test --bins --lib --target x86_64-apple-darwin -- --no-fail-fast
win-x86-64-unit:
timeout-minutes: 90
Expand Down Expand Up @@ -86,7 +86,7 @@ jobs:
# Restart-Service docker
# Get-Service docker
cargo test --bins --lib
cargo test --bins --lib -- --no-fail-fast
linux-x86-64-mm2-integration:
timeout-minutes: 90
Expand All @@ -105,7 +105,7 @@ jobs:
rustup default nightly-2022-10-29
- name: Test
run: cargo test --test 'mm2_tests_main'
run: cargo test --test 'mm2_tests_main' -- --no-fail-fast

# https://docs.github.com/en/actions/learn-github-actions/usage-limits-billing-and-administration#usage-limits
# https://github.com/KomodoPlatform/atomicDEX-API/actions/runs/4419618128/jobs/7748266141#step:4:1790
Expand Down Expand Up @@ -146,7 +146,7 @@ jobs:
rustup default nightly-2022-10-29
- name: Test
run: cargo test --test 'mm2_tests_main'
run: cargo test --test 'mm2_tests_main' -- --no-fail-fast

docker-tests:
timeout-minutes: 90
Expand All @@ -167,7 +167,7 @@ jobs:
- name: Test
run: |
wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/master/zcutil/fetch-params-alt.sh | bash
cargo test --test 'docker_tests_main' --features run-docker-tests
cargo test --test 'docker_tests_main' --features run-docker-tests -- --no-fail-fast
wasm:
timeout-minutes: 90
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mm2_core = { path = "../mm2_core" }
mm2_err_handle = { path = "../mm2_err_handle" }
mm2_gui_storage = { path = "../mm2_gui_storage" }
mm2_io = { path = "../mm2_io" }
mm2-libp2p = { path = "../mm2_libp2p" }
mm2-libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
mm2_metrics = { path = "../mm2_metrics" }
mm2_net = { path = "../mm2_net" }
mm2_number = { path = "../mm2_number" }
Expand Down
155 changes: 63 additions & 92 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use instant::Instant;
use keys::KeyPair;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse,
AdexResponseChannel};
use mm2_libp2p::behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic,
MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR};
Expand Down Expand Up @@ -136,106 +135,80 @@ async fn process_p2p_message(
ctx: MmArc,
peer_id: PeerId,
message_id: MessageId,
mut message: GossipsubMessage,
message: GossipsubMessage,
i_am_relay: bool,
) {
fn is_valid(topics: &[TopicHash]) -> Result<(), String> {
if topics.is_empty() {
return Err("At least one topic must be provided.".to_string());
}

let first_topic_prefix = topics[0].as_str().split(TOPIC_SEPARATOR).next().unwrap_or_default();
for item in topics.iter().skip(1) {
if !item.as_str().starts_with(first_topic_prefix) {
return Err(format!(
"Topics are invalid, received more than one topic kind. Topics '{:?}",
topics
));
}
}

Ok(())
}

let mut to_propagate = false;

message.topics.dedup();
// message.topics.dedup();
drop_mutability!(message);

if let Err(err) = is_valid(&message.topics) {
log::error!("{}", err);
return;
}

let inform_about_break = |used: &str, all: &[TopicHash]| {
let inform_about_break = |used: &str, all: &TopicHash| {
log::debug!(
"Topic '{}' proceed and loop is killed. Whole topic list was '{:?}'",
used,
all
);
};

for topic in message.topics.iter() {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);

match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
let fut = lp_ordermatch::handle_orderbook_msg(
ctx.clone(),
&message.topics,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if let Err(e) = fut.await {
if e.get_inner().is_warning() {
log::warn!("{}", e);
} else {
log::error!("{}", e);
}
return;
let topic = message.topic;
let mut split = topic.as_str().split(TOPIC_SEPARATOR);

match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
let fut = lp_ordermatch::handle_orderbook_msg(
ctx.clone(),
&topic,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if let Err(e) = fut.await {
if e.get_inner().is_warning() {
log::warn!("{}", e);
} else {
log::error!("{}", e);
}
return;
}

to_propagate = true;
break;
},
Some(lp_swap::SWAP_PREFIX) => {
if let Err(e) =
lp_swap::process_swap_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await
{
to_propagate = true;
},
Some(lp_swap::SWAP_PREFIX) => {
if let Err(e) =
lp_swap::process_swap_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await
{
log::error!("{}", e);
return;
}

to_propagate = true;

inform_about_break(topic.as_str(), &topic);
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) {
log::error!("{}", e);
return;
}
}

to_propagate = true;
to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) {
log::error!("{}", e);
inform_about_break(topic.as_str(), &topic);
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
return;
}
}

to_propagate = true;
};

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
ctx.spawner().spawn(async {
let fut = coin.send_raw_tx_bytes(&message.data);
ctx.spawner().spawn(async {
match fut.compat().await {
Ok(id) => log::debug!("Transaction broadcasted successfully: {:?} ", id),
// TODO (After https://github.com/KomodoPlatform/atomicDEX-API/pull/1433)
Expand All @@ -244,14 +217,12 @@ async fn process_p2p_message(
Err(e) => log::error!("Broadcast transaction failed (ignore this error if the transaction already sent by another seednode). {}", e),
};
})
}
}
}

inform_about_break(topic.as_str(), &message.topics);
break;
},
None | Some(_) => (),
}
inform_about_break(topic.as_str(), &topic);
},
None | Some(_) => (),
}

if to_propagate && i_am_relay {
Expand All @@ -263,7 +234,7 @@ fn process_p2p_request(
ctx: MmArc,
_peer_id: PeerId,
request: Vec<u8>,
response_channel: AdexResponseChannel,
response_channel: mm2_libp2p::behaviour::AdexResponseChannel,
) -> P2PRequestResult<()> {
let request = decode_message::<P2PRequest>(&request)?;
let result = match request {
Expand All @@ -287,11 +258,11 @@ fn process_p2p_request(
Ok(())
}

pub fn broadcast_p2p_msg(ctx: &MmArc, topics: Vec<String>, msg: Vec<u8>, from: Option<PeerId>) {
pub fn broadcast_p2p_msg(ctx: &MmArc, topic: String, msg: Vec<u8>, from: Option<PeerId>) {
let ctx = ctx.clone();
let cmd = match from {
Some(from) => AdexBehaviourCmd::PublishMsgFrom { topics, msg, from },
None => AdexBehaviourCmd::PublishMsg { topics, msg },
Some(from) => AdexBehaviourCmd::PublishMsgFrom { topic, msg, from },
None => AdexBehaviourCmd::PublishMsg { topic, msg },
};
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
Expand Down Expand Up @@ -533,6 +504,6 @@ pub fn lp_network_ports(netid: u16) -> Result<NetworkPorts, MmError<NetIdError>>
}

pub fn peer_id_from_secp_public(secp_public: &[u8]) -> Result<PeerId, MmError<DecodingError>> {
let public_key = Libp2pSecpPublic::decode(secp_public)?;
Ok(PeerId::from_public_key(&Libp2pPublic::Secp256k1(public_key)))
let public_key = Libp2pSecpPublic::try_from_bytes(secp_public).unwrap();
Ok(PeerId::from_public_key(&Libp2pPublic::from(public_key)))
}
26 changes: 12 additions & 14 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ fn remove_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair:

pub async fn handle_orderbook_msg(
ctx: MmArc,
topics: &[TopicHash],
topic: &TopicHash,
from_peer: String,
msg: &[u8],
i_am_relay: bool,
Expand All @@ -530,17 +530,15 @@ pub async fn handle_orderbook_msg(

let mut orderbook_pairs = vec![];

for topic in topics {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);
match (split.next(), split.next()) {
(Some(ORDERBOOK_PREFIX), Some(pair)) => {
orderbook_pairs.push(pair.to_string());
},
_ => {
return MmError::err(OrderbookP2PHandlerError::InvalidTopic(topic.as_str().to_owned()));
},
};
}
let mut split = topic.as_str().split(TOPIC_SEPARATOR);
match (split.next(), split.next()) {
(Some(ORDERBOOK_PREFIX), Some(pair)) => {
orderbook_pairs.push(pair.to_string());
},
_ => {
return MmError::err(OrderbookP2PHandlerError::InvalidTopic(topic.as_str().to_owned()));
},
};

drop_mutability!(orderbook_pairs);

Expand Down Expand Up @@ -1056,7 +1054,7 @@ fn maker_order_created_p2p_notify(
let encoded_msg = encode_and_sign(&to_broadcast, key_pair.private_ref()).unwrap();
let item: OrderbookItem = (message, hex::encode(key_pair.public_slice())).into();
insert_or_update_my_order(&ctx, item, order);
broadcast_p2p_msg(&ctx, vec![topic], encoded_msg, peer_id);
broadcast_p2p_msg(&ctx, topic, encoded_msg, peer_id);
}

fn process_my_maker_order_updated(ctx: &MmArc, message: &new_protocol::MakerOrderUpdated) {
Expand All @@ -1080,7 +1078,7 @@ fn maker_order_updated_p2p_notify(
let (secret, peer_id) = p2p_private_and_peer_id_to_broadcast(&ctx, p2p_privkey);
let encoded_msg = encode_and_sign(&msg, &secret).unwrap();
process_my_maker_order_updated(&ctx, &message);
broadcast_p2p_msg(&ctx, vec![topic], encoded_msg, peer_id);
broadcast_p2p_msg(&ctx, topic, encoded_msg, peer_id);
}

fn maker_order_cancelled_p2p_notify(ctx: MmArc, order: &MakerOrder) {
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub fn broadcast_swap_msg_every_delayed<T: 'static + Serialize + Clone + Send>(
pub fn broadcast_swap_message<T: Serialize>(ctx: &MmArc, topic: String, msg: T, p2p_privkey: &Option<KeyPair>) {
let (p2p_private, from) = p2p_private_and_peer_id_to_broadcast(ctx, p2p_privkey.as_ref());
let encoded_msg = encode_and_sign(&msg, &p2p_private).unwrap();
broadcast_p2p_msg(ctx, vec![topic], encoded_msg, from);
broadcast_p2p_msg(ctx, topic, encoded_msg, from);
}

/// Broadcast the tx message once
Expand All @@ -251,7 +251,7 @@ pub fn broadcast_p2p_tx_msg(ctx: &MmArc, topic: String, msg: &TransactionEnum, p

let (p2p_private, from) = p2p_private_and_peer_id_to_broadcast(ctx, p2p_privkey.as_ref());
let encoded_msg = encode_and_sign(&msg.tx_hex(), &p2p_private).unwrap();
broadcast_p2p_msg(ctx, vec![topic], encoded_msg, from);
broadcast_p2p_msg(ctx, topic, encoded_msg, from);
}

pub async fn process_swap_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequestResult<()> {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ async fn broadcast_my_swap_status(ctx: &MmArc, uuid: Uuid) -> Result<(), String>
data: status,
};
let msg = json::to_vec(&status).expect("Swap status ser should never fail");
broadcast_p2p_msg(ctx, vec![swap_topic(&uuid)], msg, None);
broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/ordermatch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crypto::privkey::key_pair_from_seed;
use db_common::sqlite::rusqlite::Connection;
use futures::{channel::mpsc, StreamExt};
use mm2_core::mm_ctx::{MmArc, MmCtx};
use mm2_libp2p::atomicdex_behaviour::AdexBehaviourCmd;
use mm2_libp2p::behaviour::AdexBehaviourCmd;
use mm2_libp2p::{decode_message, PeerId};
use mm2_test_helpers::for_tests::mm_ctx_with_iguana;
use mocktopus::mocking::*;
Expand Down
Loading

0 comments on commit 8be0055

Please sign in to comment.