Skip to content

Commit

Permalink
send responses through notifier channel
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Feb 26, 2024
1 parent 665dc2f commit 9de616c
Showing 1 changed file with 16 additions and 48 deletions.
64 changes: 16 additions & 48 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! This module offers a transport layer for managing request-response style communication
//! with Ethereum nodes using websockets in a wait and lock-free manner (with unsafe raw-pointers).
//! This module offers a transport layer for managing request-response style communication with Ethereum
//! nodes using websockets that can work concurrently.
//!
//! In comparison to HTTP transport, this approach proves to be much quicker (low-latency) and consumes
//! less bandwidth. This efficiency is achieved by avoiding the handling of TCP handshakes (connection reusability)
//! for each request.
use super::handle_gui_auth_payload;
use super::http_transport::de_rpc_response;
use crate::eth::eth_rpc::ETH_RPC_REQUEST_TIMEOUT;
use crate::eth::web3_transport::Web3SendOut;
use crate::eth::{EthCoin, RpcTransportEventHandlerShared};
use crate::{MmCoin, RpcTransportEventHandler};
Expand All @@ -20,7 +22,6 @@ use futures_util::{FutureExt, SinkExt, StreamExt};
use instant::{Duration, Instant};
use jsonrpc_core::Call;
use mm2_net::transport::GuiAuthValidationGenerator;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::{AtomicUsize, Ordering},
Arc};
Expand All @@ -29,7 +30,6 @@ use web3::error::{Error, TransportError};
use web3::helpers::to_string;
use web3::{helpers::build_request, RequestId, Transport};

const REQUEST_TIMEOUT_AS_SEC: u64 = 10;
const MAX_ATTEMPTS: u32 = 3;
const SLEEP_DURATION: f64 = 1.;
const KEEPALIVE_DURATION: Duration = Duration::from_secs(10);
Expand All @@ -47,7 +47,6 @@ pub struct WebsocketTransport {
node: WebsocketTransportNode,
event_handlers: Vec<RpcTransportEventHandlerShared>,
pub(crate) gui_auth_validation_generator: Option<GuiAuthValidationGenerator>,
responses: Arc<SafeMapPtr>,
controller_channel: Arc<ControllerChannel>,
connection_guard: Arc<AsyncMutex<()>>,
}
Expand All @@ -67,30 +66,9 @@ enum ControllerMessage {
struct WsRequest {
serialized_request: String,
request_id: RequestId,
response_notifier: oneshot::Sender<()>,
response_notifier: oneshot::Sender<Vec<u8>>,
}

/// A wrapper type for raw pointers used as a mutable Send & Sync HashMap reference.
///
/// Safety notes:
///
/// The implemented algorithm for socket request-response is already thread-safe,
/// so we don't care about race conditions.
///
/// As for deallocations, see the `Drop` implementation below.
#[derive(Debug)]
struct SafeMapPtr(*mut HashMap<RequestId, Vec<u8>>);

impl Drop for SafeMapPtr {
fn drop(&mut self) {
// Let the compiler do the job.
let _ = unsafe { Box::from_raw(self.0) };
}
}

unsafe impl Send for SafeMapPtr {}
unsafe impl Sync for SafeMapPtr {}

enum OuterAction {
None,
Continue,
Expand All @@ -108,7 +86,6 @@ impl WebsocketTransport {
WebsocketTransport {
node,
event_handlers,
responses: Arc::new(SafeMapPtr(Box::into_raw(Default::default()))),
request_id: Arc::new(AtomicUsize::new(1)),
controller_channel: ControllerChannel {
tx: Arc::new(AsyncMutex::new(req_tx)),
Expand All @@ -124,7 +101,7 @@ impl WebsocketTransport {
async fn handle_keepalive(
&self,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<()>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
expires_at: Option<Instant>,
) -> OuterAction {
const SIMPLE_REQUEST: &str = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id": 0 }"#;
Expand Down Expand Up @@ -169,7 +146,7 @@ impl WebsocketTransport {
&self,
request: Option<ControllerMessage>,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<()>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match request {
Some(ControllerMessage::Request(WsRequest {
Expand All @@ -180,7 +157,8 @@ impl WebsocketTransport {
response_notifiers.insert(
request_id,
response_notifier,
Duration::from_secs(REQUEST_TIMEOUT_AS_SEC),
// Since request will be cancelled when timeout occurs, we are free to drop its state.
ETH_RPC_REQUEST_TIMEOUT + Duration::from_secs(3),
);

let mut should_continue = Default::default();
Expand Down Expand Up @@ -219,7 +197,7 @@ impl WebsocketTransport {
async fn handle_response(
&self,
message: Option<Result<tokio_tungstenite_wasm::Message, tokio_tungstenite_wasm::Error>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<()>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match message {
Some(Ok(tokio_tungstenite_wasm::Message::Text(inc_event))) => {
Expand All @@ -234,10 +212,7 @@ impl WebsocketTransport {
if let Some(notifier) = response_notifiers.remove(&request_id) {
let mut res_bytes: Vec<u8> = Vec::new();
if serde_json::to_writer(&mut res_bytes, &inc_event).is_ok() {
let response_map = unsafe { &mut *self.responses.0 };
let _ = response_map.insert(request_id, res_bytes);

notifier.send(()).expect("receiver channel must be alive");
notifier.send(res_bytes).expect("receiver channel must be alive");
}
}
}
Expand Down Expand Up @@ -283,7 +258,7 @@ impl WebsocketTransport {
let _guard = self.connection_guard.lock().await;

// List of awaiting requests
let mut response_notifiers: ExpirableMap<RequestId, oneshot::Sender<()>> = ExpirableMap::default();
let mut response_notifiers: ExpirableMap<RequestId, oneshot::Sender<Vec<u8>>> = ExpirableMap::default();

let mut wsocket = match self
.attempt_to_establish_socket_connection(MAX_ATTEMPTS, SLEEP_DURATION)
Expand Down Expand Up @@ -336,9 +311,6 @@ impl WebsocketTransport {
tx.send(ControllerMessage::Close)
.await
.expect("receiver channel must be alive");

let response_map = unsafe { &mut *self.responses.0 };
response_map.clear();
}

pub(crate) fn maybe_spawn_connection_loop(&self, coin: EthCoin) {
Expand Down Expand Up @@ -381,7 +353,7 @@ async fn send_request(

let mut tx = transport.controller_channel.tx.lock().await;

let (notification_sender, notification_receiver) = futures::channel::oneshot::channel::<()>();
let (notification_sender, notification_receiver) = futures::channel::oneshot::channel::<Vec<u8>>();

event_handlers.on_outgoing_request(serialized_request.as_bytes());

Expand All @@ -393,13 +365,9 @@ async fn send_request(
.await
.map_err(|e| Error::Transport(TransportError::Message(e.to_string())))?;

if let Ok(_ping) = notification_receiver.await {
let response_map = unsafe { &mut *transport.responses.0 };
if let Some(response) = response_map.remove(&request_id) {
event_handlers.on_incoming_response(&response);

return de_rpc_response(response, &transport.node.uri.to_string());
}
if let Ok(response) = notification_receiver.await {
event_handlers.on_incoming_response(&response);
return de_rpc_response(response, &transport.node.uri.to_string());
};

Err(Error::Transport(TransportError::Message(format!(
Expand Down

0 comments on commit 9de616c

Please sign in to comment.