Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feat(ws): add methods taking in a tungstenite config (#2373)
Browse files Browse the repository at this point in the history
* feat(ws): add methods taking in a tungstenite config

Signed-off-by: Francesco <francesco@fulminlabs.org>

* fix(ws): change args order in connect_with_config

Signed-off-by: Francesco <francesco@fulminlabs.org>

* fix(ws): use config in reconnect if present

Signed-off-by: Francesco <francesco@fulminlabs.org>

* fix(ws): fix wasm build

Signed-off-by: Francesco <francesco@fulminlabs.org>

* fix(ws): make code more dry

Signed-off-by: Francesco <francesco@fulminlabs.org>

---------

Signed-off-by: Francesco <francesco@fulminlabs.org>
  • Loading branch information
fulminmaxi authored May 15, 2023
1 parent 4b6fc29 commit 3e7606d
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 6 deletions.
9 changes: 9 additions & 0 deletions ethers-providers/src/rpc/transports/ws/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ impl WsBackend {
Ok(Self::new(ws))
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_config(
details: ConnectionDetails,
config: WebSocketConfig,
) -> Result<(Self, BackendDriver), WsClientError> {
let ws = connect_async_with_config(details, Some(config)).await?.0.fuse();
Ok(Self::new(ws))
}

pub fn new(server: InternalStream) -> (Self, BackendDriver) {
let (handler, to_handle) = mpsc::unbounded();
let (dispatcher, to_dispatch) = mpsc::unbounded();
Expand Down
98 changes: 93 additions & 5 deletions ethers-providers/src/rpc/transports/ws/manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(not(target_arch = "wasm32"))]
use super::WebSocketConfig;
use super::{
backend::{BackendDriver, WsBackend},
ActiveSub, ConnectionDetails, InFlight, Instruction, Notification, PubSubItem, Response, SubId,
Expand Down Expand Up @@ -196,6 +198,9 @@ pub struct RequestManager {
backend: BackendDriver,
// The URL and optional auth info for the connection
conn: ConnectionDetails,
#[cfg(not(target_arch = "wasm32"))]
// An Option wrapping a tungstenite WebsocketConfig. If None, the default config is used.
config: Option<WebSocketConfig>,
// Instructions from the user-facing providers
instructions: mpsc::UnboundedReceiver<Instruction>,
}
Expand All @@ -209,16 +214,84 @@ impl RequestManager {
Self::connect_with_reconnects(conn, DEFAULT_RECONNECTS).await
}

async fn connect_internal(
conn: ConnectionDetails,
) -> Result<
(
BackendDriver,
(mpsc::UnboundedSender<Instruction>, mpsc::UnboundedReceiver<Instruction>),
SharedChannelMap,
),
WsClientError,
> {
let (ws, backend) = WsBackend::connect(conn).await?;

ws.spawn();

Ok((backend, mpsc::unbounded(), Default::default()))
}

#[cfg(target_arch = "wasm32")]
pub async fn connect_with_reconnects(
conn: ConnectionDetails,
reconnects: usize,
) -> Result<(Self, WsClient), WsClientError> {
let (ws, backend) = WsBackend::connect(conn.clone()).await?;
let (backend, (instructions_tx, instructions_rx), channel_map) =
Self::connect_internal(conn.clone()).await?;

Ok((
Self {
id: Default::default(),
reconnects,
subs: SubscriptionManager::new(channel_map.clone()),
reqs: Default::default(),
backend,
conn,
instructions: instructions_rx,
},
WsClient { instructions: instructions_tx, channel_map },
))
}

let (instructions_tx, instructions_rx) = mpsc::unbounded();
let channel_map: SharedChannelMap = Default::default();
#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_reconnects(
conn: ConnectionDetails,
reconnects: usize,
) -> Result<(Self, WsClient), WsClientError> {
let (backend, (instructions_tx, instructions_rx), channel_map) =
Self::connect_internal(conn.clone()).await?;

ws.spawn();
Ok((
Self {
id: Default::default(),
reconnects,
subs: SubscriptionManager::new(channel_map.clone()),
reqs: Default::default(),
backend,
conn,
config: None,
instructions: instructions_rx,
},
WsClient { instructions: instructions_tx, channel_map },
))
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_config(
conn: ConnectionDetails,
config: WebSocketConfig,
) -> Result<(Self, WsClient), WsClientError> {
Self::connect_with_config_and_reconnects(conn, config, DEFAULT_RECONNECTS).await
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_config_and_reconnects(
conn: ConnectionDetails,
config: WebSocketConfig,
reconnects: usize,
) -> Result<(Self, WsClient), WsClientError> {
let (backend, (instructions_tx, instructions_rx), channel_map) =
Self::connect_internal(conn.clone()).await?;

Ok((
Self {
Expand All @@ -228,12 +301,27 @@ impl RequestManager {
reqs: Default::default(),
backend,
conn,
config: Some(config),
instructions: instructions_rx,
},
WsClient { instructions: instructions_tx, channel_map },
))
}

#[cfg(target_arch = "wasm32")]
async fn reconnect_backend(&mut self) -> Result<(WsBackend, BackendDriver), WsClientError> {
WsBackend::connect(self.conn.clone()).await
}

#[cfg(not(target_arch = "wasm32"))]
async fn reconnect_backend(&mut self) -> Result<(WsBackend, BackendDriver), WsClientError> {
if let Some(config) = self.config {
WsBackend::connect_with_config(self.conn.clone(), config).await
} else {
WsBackend::connect(self.conn.clone()).await
}
}

async fn reconnect(&mut self) -> Result<(), WsClientError> {
if self.reconnects == 0 {
return Err(WsClientError::TooManyReconnects)
Expand All @@ -242,7 +330,7 @@ impl RequestManager {

tracing::info!(remaining = self.reconnects, url = self.conn.url, "Reconnecting to backend");
// create the new backend
let (s, mut backend) = WsBackend::connect(self.conn.clone()).await?;
let (s, mut backend) = self.reconnect_backend().await?;

// spawn the new backend
s.spawn();
Expand Down
30 changes: 30 additions & 0 deletions ethers-providers/src/rpc/transports/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,36 @@ impl WsClient {
Ok(this)
}

#[cfg(not(target_arch = "wasm32"))]
/// Establishes a new websocket connection. This method allows specifying a custom websocket
/// configuration, see the [tungstenite docs](https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocketConfig.html) for all avaible options.
pub async fn connect_with_config(
conn: impl Into<ConnectionDetails>,
config: impl Into<WebSocketConfig>,
) -> Result<Self, WsClientError> {
let (man, this) = RequestManager::connect_with_config(conn.into(), config.into()).await?;
man.spawn();
Ok(this)
}

#[cfg(not(target_arch = "wasm32"))]
/// Establishes a new websocket connection with auto-reconnects. This method allows specifying a
/// custom websocket configuration, see the [tungstenite docs](https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocketConfig.html) for all avaible options.
pub async fn connect_with_config_and_reconnects(
conn: impl Into<ConnectionDetails>,
config: impl Into<WebSocketConfig>,
reconnects: usize,
) -> Result<Self, WsClientError> {
let (man, this) = RequestManager::connect_with_config_and_reconnects(
conn.into(),
config.into(),
reconnects,
)
.await?;
man.spawn();
Ok(this)
}

#[tracing::instrument(skip(self, params), err)]
async fn make_request<R>(&self, method: &str, params: Box<RawValue>) -> Result<R, WsClientError>
where
Expand Down
3 changes: 2 additions & 1 deletion ethers-providers/src/rpc/transports/ws/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,11 @@ mod aliases {
#[cfg(not(target_arch = "wasm32"))]
mod aliases {
pub use tokio_tungstenite::{
connect_async,
connect_async, connect_async_with_config,
tungstenite::{self, protocol::CloseFrame},
};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub type WebSocketConfig = tungstenite::protocol::WebSocketConfig;
pub type Message = tungstenite::protocol::Message;
pub type WsError = tungstenite::Error;
pub type WsStreamItem = Result<Message, WsError>;
Expand Down

0 comments on commit 3e7606d

Please sign in to comment.