Skip to content

Commit

Permalink
add some doc-comments
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Jan 31, 2024
1 parent 059971e commit 59b78de
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion mm2src/coins/eth/v2_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl EthCoin {
let conf = coin_conf(&ctx, &ticker);

let decimals = match conf["decimals"].as_u64() {
None | Some(0) => get_token_decimals(&self.web3(), protocol.token_addr)
None | Some(0) => get_token_decimals(self.web3(), protocol.token_addr)
.await
.map_err(Erc20TokenActivationError::InternalError)?,
Some(d) => d as u8,
Expand Down
34 changes: 20 additions & 14 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#![allow(unused)] // TODO: remove this
//! This module offers a transport layer for managing request-response style communication
//! with Ethereum nodes using websockets in a wait and lock-free manner. In comparison to
//! HTTP transport, this approach proves to be much quicker (low-latency) and consumes less
//! bandwidth. This efficiency is achieved by avoiding the handling of TCP
//! handshakes (connection reusability) for each request.

use crate::eth::web3_transport::Web3SendOut;
use crate::eth::RpcTransportEventHandlerShared;
use common::executor::Timer;
use common::log;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
Expand All @@ -11,8 +14,6 @@ use futures_ticker::Ticker;
use futures_util::{FutureExt, SinkExt, StreamExt};
use instant::Duration;
use jsonrpc_core::Call;
use mm2_net::transport::GuiAuthValidationGenerator;
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::sync::{atomic::{AtomicUsize, Ordering},
Arc};
Expand All @@ -39,7 +40,6 @@ pub struct WebsocketTransport {
request_id: Arc<AtomicUsize>,
client: Arc<WebsocketTransportRpcClient>,
event_handlers: Vec<RpcTransportEventHandlerShared>,
// TODO: explain why this is used and how safe it is
responses: SafeMapPtr,
request_handler: RequestHandler,
}
Expand All @@ -57,6 +57,16 @@ struct WsRequest {
response_notifier: oneshot::Sender<()>,
}

/// A wrapper type for raw pointers used as a mutable Send & Sync HashMap reference.
///
/// Safety notes:
///
/// The implemented algorithm for socket request-response is already thread-safe,
/// so we don't care about race conditions.
///
/// With using smart pointer inside, we avoid dangling pointers as well. With this,
/// we achieve a smooth and lock and wait free runtime, which is much lighter and quicker
/// than the safe Rust implementations.
#[derive(Clone, Debug)]
struct SafeMapPtr(*mut HashMap<RequestId, serde_json::Value>);

Expand All @@ -69,10 +79,8 @@ impl WebsocketTransport {
event_handlers: Vec<RpcTransportEventHandlerShared>,
) -> Self {
let client_impl = WebsocketTransportRpcClientImpl { nodes };

let (req_tx, req_rx) = futures::channel::mpsc::unbounded();

let mut hashmap = HashMap::default();
let hashmap = HashMap::default();

WebsocketTransport {
client: Arc::new(WebsocketTransportRpcClient(AsyncMutex::new(client_impl))),
Expand All @@ -90,7 +98,7 @@ impl WebsocketTransport {
// TODO: clear disconnected channels every 30s or so.
let mut response_map: HashMap<RequestId, oneshot::Sender<()>> = HashMap::new();

for node in (*self.client.0.lock().await).nodes.clone() {
for node in self.client.0.lock().await.nodes.clone() {
let mut wsocket = match tokio_tungstenite_wasm::connect(node.uri.to_string()).await {
Ok(ws) => ws,
Err(e) => {
Expand Down Expand Up @@ -164,7 +172,7 @@ impl WebsocketTransport {
async fn stop_connection(self) { todo!() }
}

async fn rpc_send_and_receive(
async fn send_request(
transport: WebsocketTransport,
request: Call,
request_id: RequestId,
Expand All @@ -184,7 +192,7 @@ async fn rpc_send_and_receive(
if let Ok(_ping) = notification_receiver.await {
let response_map = unsafe { &mut *transport.responses.0 };
if let Some(response) = response_map.remove(&request_id) {
return Ok(response.clone());
return Ok(response);
}
};

Expand All @@ -201,7 +209,5 @@ impl Transport for WebsocketTransport {
(request_id, request)
}

fn send(&self, id: RequestId, request: Call) -> Self::Out {
Box::pin(rpc_send_and_receive(self.clone(), request, id))
}
fn send(&self, id: RequestId, request: Call) -> Self::Out { Box::pin(send_request(self.clone(), request, id)) }
}

0 comments on commit 59b78de

Please sign in to comment.