Skip to content
This repository has been archived by the owner on Apr 28, 2022. It is now read-only.

Add trait for configuring GasNow WebSocket error reporting #10

Merged
merged 1 commit into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ edition = "2018"
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
log = "0.4"
primitive-types = { version = "0.9", features = ["fp-conversion"], optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
81 changes: 72 additions & 9 deletions src/gasnow_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use crate::{
use anyhow::{bail, ensure, Result};
use futures::StreamExt;
use serde_json::Value;
use std::time::{Duration, Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::watch;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::{error::Error as TungsteniteError, Message};
use url::Url;

pub const DEFAULT_URL: &str = "wss://www.gasnow.org/ws/gasprice";
Expand All @@ -28,12 +31,20 @@ pub struct GasNowWebSocketGasStation {

impl GasNowWebSocketGasStation {
pub fn new(max_update_age: Duration) -> Self {
Self::with_error_reporter(max_update_age, LogErrorReporter)
}

pub fn with_error_reporter(
max_update_age: Duration,
error_reporter: impl ErrorReporting,
) -> Self {
let (sender, receiver) = watch::channel(None);
tokio::spawn(receive_forever(
DEFAULT_URL.parse().unwrap(),
RECONNECT_INTERVAL,
sender,
max_update_age,
Arc::new(error_reporter),
));
Self {
max_update_age,
Expand Down Expand Up @@ -84,10 +95,17 @@ async fn receive_forever(
reconnect_interval: Duration,
sender: watch::Sender<Option<(Instant, ResponseData)>>,
max_update_interval: Duration,
error_reporter: Arc<dyn ErrorReporting>,
) {
let work = async {
loop {
connect_and_receive_until_error(&api, &sender, max_update_interval).await;
connect_and_receive_until_error(
&api,
&sender,
max_update_interval,
error_reporter.clone(),
)
.await;
tokio::time::sleep(reconnect_interval).await;
}
};
Expand All @@ -103,6 +121,7 @@ async fn connect_and_receive_until_error(
api: &Url,
sender: &watch::Sender<Option<(Instant, ResponseData)>>,
max_update_interval: Duration,
error_reporter: Arc<dyn ErrorReporting>,
) {
let (mut stream, _) = match tokio::time::timeout(
max_update_interval,
Expand All @@ -111,29 +130,29 @@ async fn connect_and_receive_until_error(
.await
{
Err(_) => {
tracing::error!("websocket connect timed out");
error_reporter.report_error(Error::ConnectionTimeOut);
return;
}
Ok(Err(err)) => {
tracing::error!(?err, "websocket connect failed");
error_reporter.report_error(Error::ConnectionFailure(err));
return;
}
Ok(Ok(result)) => result,
};
loop {
let message = match tokio::time::timeout(max_update_interval, stream.next()).await {
Err(_) => {
tracing::error!("websocket stream timed out");
error_reporter.report_error(Error::StreamTimeOut);
return;
}
Ok(None) => {
tracing::warn!("websocket stream closed");
tracing::info!("websocket stream closed");
return;
}
// It is unclear which errors exactly cause the websocket to become unusable so we stop
// on any.
Ok(Some(Err(err))) => {
tracing::error!(?err, "websocket stream failed");
error_reporter.report_error(Error::StreamFailure(err));
return;
}
Ok(Some(Ok(message))) => message,
Expand All @@ -151,7 +170,7 @@ async fn connect_and_receive_until_error(
Message::Binary(binary) => String::from_utf8_lossy(&binary).into_owned(),
_ => unreachable!(),
};
tracing::error!(?err, ?msg, "decode failed");
error_reporter.report_error(Error::JsonDecodeFailed(msg, err));
continue;
}
};
Expand All @@ -167,6 +186,50 @@ async fn connect_and_receive_until_error(
}
}

/// A trait for configuring error reporting for the WebSocket gas estimator.
pub trait ErrorReporting: Send + Sync + 'static {
fn report_error(&self, err: Error);
}

/// A possible error to be reported.
pub enum Error {
/// The WebSocket timed out establishing a connection to the remote service.
ConnectionTimeOut,
/// An unexpected error occured connecting to the remote service.
ConnectionFailure(TungsteniteError),
/// The WebSocket message stream timed out.
StreamTimeOut,
/// An unexpected error occured reading the WebSocket message stream.
StreamFailure(TungsteniteError),
/// An error occured decoding the JSON gas update data.
JsonDecodeFailed(String, serde_json::Error),
}

/// The default error reporter that just logs the errors.
pub struct LogErrorReporter;

impl ErrorReporting for LogErrorReporter {
fn report_error(&self, err: Error) {
match err {
Error::ConnectionTimeOut => {
tracing::warn!("websocket connect timed out");
}
Error::ConnectionFailure(err) => {
tracing::warn!(?err, "websocket connect failed");
}
Error::StreamTimeOut => {
tracing::warn!("websocket stream timed out");
}
Error::StreamFailure(err) => {
tracing::warn!(?err, "websocket stream failed");
}
Error::JsonDecodeFailed(msg, err) => {
tracing::warn!(?err, ?msg, "decode failed");
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions src/priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ impl PriorityGasPriceEstimating {
Err(err) => {
let num_errors = estimator.errors_in_a_row.fetch_add(1, Ordering::SeqCst) + 1;
if num_errors < LOG_ERROR_AFTER_N_ERRORS {
log::warn!("gas estimator {} failed: {:?}", i, err);
tracing::warn!("gas estimator {} failed: {:?}", i, err);
} else {
log::error!("gas estimator {} failed: {:?}", i, err);
tracing::error!("gas estimator {} failed: {:?}", i, err);
}
}
}
Expand Down