Skip to content

Commit

Permalink
self review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Dec 24, 2024
1 parent 9db99ac commit 4739ec3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 37 deletions.
4 changes: 2 additions & 2 deletions mm2src/coins/eth/wallet_connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub(crate) async fn send_transaction_with_walletconnect(
data: &[u8],
gas: U256,
) -> Result<SignedTransaction, TransactionErr> {
info!(target: "sign-and-send", "get_gas_price…");
info!(target: "WalletConnect: sign-and-send", "get_gas_price…");
let pay_for_gas_option = try_tx_s!(
coin.get_swap_pay_for_gas_option(coin.get_swap_transaction_fee_policy())
.await
Expand All @@ -249,7 +249,7 @@ pub(crate) async fn send_transaction_with_walletconnect(
};
// Please note that this method may take a long time
// due to `eth_sendTransaction` requests.
info!(target: "sign-and-send", "WalletConnect signing and sending tx…");
info!(target: "WalletConnect: sign-and-send", "signing and sending tx…");
let (signed_tx, _) = try_tx_s!(coin.wc_send_tx(wc, params).await);

Ok(signed_tx)
Expand Down
9 changes: 4 additions & 5 deletions mm2src/kdf_walletconnect/src/inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{error::WalletConnectError,
update::reply_session_update_request},
WalletConnectCtxImpl};

use common::log::info;
use common::log::{info, LogOnError};
use mm2_err_handle::prelude::*;
use relay_rpc::domain::{MessageId, Topic};
use relay_rpc::rpc::{params::ResponseParamsSuccess, Params, Request, Response};
Expand Down Expand Up @@ -72,10 +72,9 @@ pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtxImpl, respons
.lock()
.expect("pending request lock shouldn't fail!")
.remove(&message_id);
if let Err(err) = process_session_propose_response(ctx, topic, &propose).await {
common::log::error!("Failed to process session propose response: {err:?}");
}
return;
return process_session_propose_response(ctx, topic, &propose)
.await
.error_log_with_msg("Failed to process session propose response");
},
Ok(data) => Ok(SessionMessage {
message_id,
Expand Down
63 changes: 33 additions & 30 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl WalletConnectCtx {
protocol: SUPPORTED_PROTOCOL.to_string(),
data: None,
};
let (inbound_message_tx, mut inbound_message_rx) = unbounded();
let (inbound_message_tx, inbound_message_rx) = unbounded();
let (conn_live_sender, conn_live_receiver) = unbounded();
let (client, _) = Client::new_with_callback(
Handler::new("Komodefi", inbound_message_tx, conn_live_sender),
Expand All @@ -126,18 +126,13 @@ impl WalletConnectCtx {
context
.abortable_system
.weak_spawner()
.spawn(context.clone().spawn_connection_initialization(conn_live_receiver));
.spawn(context.clone().spawn_connection_initialization_fut(conn_live_receiver));

// spawn message handler event loop
context.abortable_system.weak_spawner().spawn({
let context = context.clone();
async move {
while let Some(msg) = inbound_message_rx.next().await {
if let Err(e) = context.handle_published_message(msg).await {
error!("Error processing message: {:?}", e);
}
}
}
});
context
.abortable_system
.weak_spawner()
.spawn(context.clone().spawn_published_message_fut(inbound_message_rx));

Ok(Self(context))
}
Expand All @@ -154,7 +149,7 @@ impl WalletConnectCtxImpl {
/// Establishes initial connection to WalletConnect relay server with linear retry mechanism.
/// Uses increasing delay between retry attempts starting from 1sec and increase exponentially.
/// After successful connection, attempts to restore previous session state from storage.
pub(crate) async fn spawn_connection_initialization(
pub(crate) async fn spawn_connection_initialization_fut(
self: Arc<Self>,
connection_live_rx: UnboundedReceiver<Option<String>>,
) {
Expand Down Expand Up @@ -243,12 +238,12 @@ impl WalletConnectCtxImpl {
.timeout_secs(PUBLISH_TIMEOUT_SECS)
.await
{
Ok(Ok(_)) => {
Ok(res) => {
res.map_to_mm(|err| err.into())?;
info!("[{topic}] Subscribed to topic");
send_proposal_request(self, &topic, required_namespaces, optional_namespaces).await?;
return Ok(url);
},
Ok(Err(err)) => return MmError::err(err.into()),
Err(_) => self.wait_until_client_is_online_loop(attempt).await,
}
}
Expand All @@ -260,17 +255,15 @@ impl WalletConnectCtxImpl {

/// Retrieves the symmetric key associated with a given `topic`.
fn sym_key(&self, topic: &Topic) -> MmResult<SymKey, WalletConnectError> {
if let Some(key) = self.session_manager.sym_key(topic) {
return Ok(key);
}

if let Ok(key) = self.pairing.sym_key(topic) {
return Ok(key);
}

MmError::err(WalletConnectError::InternalError(format!(
"topic sym_key not found:{topic}"
)))
self.session_manager
.sym_key(topic)
.or_else(|| self.pairing.sym_key(topic).ok())
.ok_or_else(|| {
error!("Failed to find sym_key for topic: {topic}");
MmError::new(WalletConnectError::InternalError(format!(
"topic sym_key not found: {topic}"
)))
})
}

/// Handles an inbound published message by decrypting, decoding, and processing it.
Expand All @@ -292,6 +285,15 @@ impl WalletConnectCtxImpl {
Ok(())
}

// Spawns a task that continuously processes published messages from inbound message channel.
async fn spawn_published_message_fut(self: Arc<Self>, mut recv: UnboundedReceiver<PublishedMessage>) {
while let Some(msg) = recv.next().await {
self.handle_published_message(msg)
.await
.error_log_with_msg("Error processing message");
}
}

/// Loads sessions from storage, activates valid ones, and deletes expired ones.
async fn load_session_from_storage(&self) -> MmResult<(), WalletConnectError> {
info!("Loading WalletConnect session from storage");
Expand All @@ -310,9 +312,11 @@ impl WalletConnectCtxImpl {
// delete expired session
if now > session.expiry {
debug!("Session {} expired, trying to delete from storage", session.topic);
if let Err(err) = self.session_manager.storage().delete_session(&session.topic).await {
error!("[{}] Unable to delete session from storage: {err:?}", session.topic);
}
self.session_manager
.storage()
.delete_session(&session.topic)
.await
.error_log_with_msg(&format!("[{}] Unable to delete session from storage", session.topic));
continue;
};

Expand Down Expand Up @@ -471,7 +475,6 @@ impl WalletConnectCtxImpl {
session: &Session,
chain_id: &WcChainId,
) -> MmResult<(), WalletConnectError> {
println!("{:?}", session.namespaces.get(chain_id.chain.as_ref()));
if let Some(Namespace {
chains: Some(chains), ..
}) = session.namespaces.get(chain_id.chain.as_ref())
Expand Down

0 comments on commit 4739ec3

Please sign in to comment.