Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Fix: handle panic on Ws error (#1915)
Browse files Browse the repository at this point in the history
* On Ws error close all active subscriptions and force clients to reconnect. the Websocket.

* Comment typos

* Unit tests
Lint
cargo +nightly fmt

* - Added CHANGELOG entry
- Added `#` prefix to issue IDs where missing

* ownership typo

Co-authored-by: Andrea Simeoni <>
  • Loading branch information
0xMelkor authored Dec 1, 2022
1 parent 0ce6b10 commit c4e09f2
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 24 deletions.
31 changes: 16 additions & 15 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
## ethers-core

### Unreleased
- Graceful handling of WebSocket transport errors [#1889](https://github.com/gakonst/ethers-rs/issues/1889) [#1815](https://github.com/gakonst/ethers-rs/issues/1815)
- `MiddlewareBuilder` trait to instantiate a `Provider` as `Middleware` layers.
- An `Event` builder can be instantiated specifying the event filter type, without the need to instantiate a contract.
- Add 'ethers_core::types::OpCode' and use in 'ethers_core::types::VMOperation' [1857](https://github.com/gakonst/ethers-rs/issues/1857)
- Add 'ethers_core::types::OpCode' and use in 'ethers_core::types::VMOperation' [#1857](https://github.com/gakonst/ethers-rs/issues/1857)
- Remove rust_decimals dependency for ethers-core
- Add support for numbers greater than 2^96 for `ethers_core::utils::parse_units` [#1822](https://github.com/gakonst/ethers-rs/issues/1822)
- Add comment about safety of u8 -> u64 cast in `ethers_core::types::Signature`
Expand All @@ -26,38 +27,38 @@
- Add `as_*_mut` methods on `TypedTransaction`
[#1310](https://github.com/gakonst/ethers-rs/pull/1310)
- AWS EIP712 data signing no longer signs with EIP155
- Added Cronos testnet to etherscan options [1276](https://github.com/gakonst/ethers-rs/pull/1276)
- Added Cronos testnet to etherscan options [#1276](https://github.com/gakonst/ethers-rs/pull/1276)
- Fix parsing of a pending block
[1272](https://github.com/gakonst/ethers-rs/pull/1272)
[#1272](https://github.com/gakonst/ethers-rs/pull/1272)
- Removed Cronos mainnet beta from `is_legacy` [1246](https://github.com/gakonst/ethers-rs/pull/1246)
- Fix RLP decoding of `from` field for `Eip1559TransactionRequest` and
`Eip2930TransactionRequest`, remove `Eip1559TransactionRequest` `sighash`
method [1180](https://github.com/gakonst/ethers-rs/pull/1180)
method [#1180](https://github.com/gakonst/ethers-rs/pull/1180)
- Fix RLP encoding of absent access list in `Transaction` [1137](https://github.com/gakonst/ethers-rs/pull/1137)
- Pass compilation time as additional argument to `Reporter::on_solc_success` [1098](https://github.com/gakonst/ethers-rs/pull/1098)
- Pass compilation time as additional argument to `Reporter::on_solc_success` [#1098](https://github.com/gakonst/ethers-rs/pull/1098)
- Fix aws signer bug which maps un-normalized signature to error if no normalization occurs (in `aws::utils::decode_signature`)
- Implement signed transaction RLP decoding [#1096](https://github.com/gakonst/ethers-rs/pull/1096)
- `Transaction::from` will default to `Address::zero()`. Add `recover_from` and
`recover_from_mut` methods for recovering the sender from signature, and also
setting the same on tx [1075](https://github.com/gakonst/ethers-rs/pull/1075).
- Add Etherscan account API endpoints [939](https://github.com/gakonst/ethers-rs/pull/939)
setting the same on tx [#1075](https://github.com/gakonst/ethers-rs/pull/1075).
- Add Etherscan account API endpoints [#939](https://github.com/gakonst/ethers-rs/pull/939)
- Add FTM Mainet and testnet to parse method "try_from" from Chain.rs and add cronos mainet and testnet to "from_str"
- Add FTM mainnet and testnet Multicall addresses [927](https://github.com/gakonst/ethers-rs/pull/927)
- Add FTM mainnet and testnet Multicall addresses [#927](https://github.com/gakonst/ethers-rs/pull/927)
- Add Cronos mainnet beta and testnet to the list of known chains
[926](https://github.com/gakonst/ethers-rs/pull/926)
[#926](https://github.com/gakonst/ethers-rs/pull/926)
- `Chain::to_string` will return the same chain name as `Chain::from_str`
- Add `eth_syncing` [848](https://github.com/gakonst/ethers-rs/pull/848)
- Add `eth_syncing` [#848](https://github.com/gakonst/ethers-rs/pull/848)
- Fix overflow and possible divide-by-zero in `estimate_priority_fee`
- Add BSC mainnet and testnet to the list of known chains
[831](https://github.com/gakonst/ethers-rs/pull/831)
[#831](https://github.com/gakonst/ethers-rs/pull/831)
- Returns error on invalid type conversion instead of panicking
[691](https://github.com/gakonst/ethers-rs/pull/691/files)
[#691](https://github.com/gakonst/ethers-rs/pull/691/files)
- Change types mapping for solidity `bytes` to rust `ethers::core::Bytes` and
solidity `uint8[]` to rust `Vec<u8>`.
[613](https://github.com/gakonst/ethers-rs/pull/613)
[#613](https://github.com/gakonst/ethers-rs/pull/613)
- Fix `format_units` to return a `String` of representing a decimal point float
such that the decimal places don't get truncated.
[597](https://github.com/gakonst/ethers-rs/pull/597)
[#597](https://github.com/gakonst/ethers-rs/pull/597)
- Implement hex display format for `ethers::core::Bytes`
[#624](https://github.com/gakonst/ethers-rs/pull/624).
- Fix `fee_history` to first try with `block_count` encoded as a hex `QUANTITY`.
Expand All @@ -83,7 +84,7 @@
- Add a getter to `ProjectCompileOutput` that returns a mapping of compiler
versions to a vector of name + contract struct tuples
[#908](https://github.com/gakonst/ethers-rs/pull/908)
- Add Yul compilation [994](https://github.com/gakonst/ethers-rs/pull/994)
- Add Yul compilation [#994](https://github.com/gakonst/ethers-rs/pull/994)
- Enforce commutativity of ENS reverse resolution
[#996](https://github.com/gakonst/ethers-rs/pull/996)
- Add `TransactionReceipt::to` and `TransactionReceipt::from`
Expand Down
23 changes: 14 additions & 9 deletions ethers-providers/src/transports/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,11 @@ where
debug!("work complete");
break
}
match self.tick().await {
Err(ClientError::UnexpectedClose) => {
error!("{}", ClientError::UnexpectedClose);
break
}
Err(e) => {
panic!("WS Server panic: {}", e);
}
_ => {}

if let Err(e) = self.tick().await {
error!("Received a WebSocket error: {:?}", e);
self.close_all_subscriptions();
break
}
}
};
Expand All @@ -266,6 +262,15 @@ where
tokio::spawn(f);
}

// This will close all active subscriptions. Each process listening for
// updates will observe the end of their subscription streams.
fn close_all_subscriptions(&self) {
error!("Tearing down subscriptions");
for (_, sub) in self.subscriptions.iter() {
sub.close_channel();
}
}

// dispatch an RPC request
async fn service_request(
&mut self,
Expand Down
79 changes: 79 additions & 0 deletions ethers-providers/tests/ws_errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#![cfg(not(target_arch = "wasm32"))]
use ethers_providers::{Middleware, Provider, StreamExt};
use futures_util::SinkExt;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{
accept_async,
tungstenite::{
self,
protocol::{frame::coding::CloseCode, CloseFrame},
Error,
},
};
use tungstenite::protocol::Message;

const WS_ENDPOINT: &str = "127.0.0.1:9002";

#[cfg(not(feature = "celo"))]
mod eth_tests {
use ethers_core::types::Filter;
use ethers_providers::{StreamExt, Ws};
use tokio_tungstenite::connect_async;

use super::*;

#[tokio::test]
async fn graceful_disconnect_on_ws_errors() {
// Spawn a fake Ws server that will drop our connection after a while
spawn_ws_server().await;

// Connect to the fake server
let (ws, _) = connect_async(format!("ws://{}", WS_ENDPOINT)).await.unwrap();
let provider = Provider::new(Ws::new(ws));
let filter = Filter::new().event("Transfer(address,address,uint256)");
let mut stream = provider.subscribe_logs(&filter).await.unwrap();

while let Some(_) = stream.next().await {
assert!(false); // force test to fail
}

assert!(true);
}
}

async fn spawn_ws_server() {
let listener = TcpListener::bind(&WS_ENDPOINT).await.expect("Can't listen");
tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_conn(stream));
}
});
}

async fn handle_conn(stream: TcpStream) -> Result<(), Error> {
let mut ws_stream = accept_async(stream).await?;

while let Some(_) = ws_stream.next().await {
let res: String =
"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}"
.into();

// Answer with a valid RPC response to keep the connection alive
ws_stream.send(Message::Text(res)).await?;

// Wait for a while
let timeout = Duration::from_secs(2);
tokio::time::sleep(timeout).await;

// Drop the connection
ws_stream
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Error,
reason: "Upstream went away".into(),
})))
.await?;
}

Ok(())
}

0 comments on commit c4e09f2

Please sign in to comment.