Skip to content

Commit

Permalink
fix: error handling (#1)
Browse files Browse the repository at this point in the history
* map err for routing api calls

* try to fix order duplication

* remove log

* cargo fmt and map err on txn submission
  • Loading branch information
zhongeric authored Aug 29, 2024
1 parent cb81ace commit 4777bb0
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 106 deletions.
30 changes: 22 additions & 8 deletions src/collectors/uniswapx_route_collector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloy_primitives::Uint;
use anyhow::{Context, Result};
use anyhow::{anyhow, Result};
use reqwest::header::ORIGIN;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -10,7 +10,7 @@ use artemis_core::types::{Collector, CollectorStream};
use async_trait::async_trait;
use futures::lock::Mutex;
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use reqwest::{Client, StatusCode};

const ROUTING_API: &str = "https://api.uniswap.org/v1/quote";
const SLIPPAGE_TOLERANCE: &str = "0.5";
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Collector<RoutedOrder> for UniswapXRouteCollector {
let stream = async_stream::stream! {
let mut receiver = self.route_request_receiver.lock().await;
while let Some(route_requests) = receiver.recv().await {
let tasks: FuturesUnordered<_> = route_requests.iter()
let tasks: FuturesUnordered<_> = route_requests.into_iter()
.map(|batch| {
let OrderBatchData { orders, token_in, token_out, amount_in, .. } = batch.clone();
info!(
Expand Down Expand Up @@ -232,16 +232,30 @@ pub async fn route_order(params: RouteOrderParams) -> Result<OrderRoute> {

let client = reqwest::Client::new();

Ok(client
let response = client
.get(format!("{}?{}", ROUTING_API, query_string))
.header(ORIGIN, "https://app.uniswap.org")
.header("x-request-source", "uniswap-web")
.send()
.await
.context("Quote request failed with {}")?
.json::<OrderRoute>()
.await
.context("Failed to parse response: {}")?)
.map_err(|e| anyhow!("Quote request failed with error: {}", e))?;

match response.status() {
StatusCode::OK => Ok(response
.json::<OrderRoute>()
.await
.map_err(|e| anyhow!("Failed to parse response: {}", e))?),
StatusCode::BAD_REQUEST => Err(anyhow!("Bad request: {}", response.status())),
StatusCode::NOT_FOUND => Err(anyhow!("Not quote found: {}", response.status())),
StatusCode::TOO_MANY_REQUESTS => Err(anyhow!("Too many requests: {}", response.status())),
StatusCode::INTERNAL_SERVER_ERROR => {
Err(anyhow!("Internal server error: {}", response.status()))
}
_ => Err(anyhow!(
"Unexpected error with status code: {}",
response.status()
)),
}
}

// The Uniswap routing API requires that "ETH" be used instead of the zero address
Expand Down
18 changes: 12 additions & 6 deletions src/executors/protect_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::{
};
use tracing::info;

use anyhow::{Context, Result};
use anyhow::Result;
use artemis_core::executors::mempool_executor::SubmitTxToMempool;
use artemis_core::types::Executor;
use async_trait::async_trait;
use ethers::providers::Middleware;
use ethers::{providers::Middleware, types::U256};

/// An executor that sends transactions to the mempool.
pub struct ProtectExecutor<M, N> {
Expand Down Expand Up @@ -39,9 +39,12 @@ where
.client
.estimate_gas(&action.tx, None)
.await
.context("Error estimating gas usage: {}");
.unwrap_or_else(|err| {
info!("Error estimating gas: {}", err);
U256::from(1_000_000)
});
info!("Gas Usage {:?}", gas_usage_result);
let gas_usage = gas_usage_result?;
let gas_usage = gas_usage_result;

let bid_gas_price;
if let Some(gas_bid_info) = action.gas_bid_info {
Expand All @@ -56,12 +59,15 @@ where
.client
.get_gas_price()
.await
.context("Error getting gas price: {}")?;
.map_err(|err| anyhow::anyhow!("Error getting gas price: {}", err))?;
}
action.tx.set_gas_price(bid_gas_price);

info!("Executing tx {:?}", action.tx);
self.sender_client.send_transaction(action.tx, None).await?;
self.sender_client
.send_transaction(action.tx, None)
.await
.map_err(|err| anyhow::anyhow!("Error sending transaction: {}", err))?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion src/executors/public_1559_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ where
info!("Executing tx {:?}", action.execution.tx);
self.sender_client
.send_transaction(action.execution.tx, None)
.await?;
.await
.map_err(|err| anyhow::anyhow!("Error sending transaction: {}", err))?;
Ok(())
}
}
20 changes: 16 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use strategies::{
uniswapx_strategy::UniswapXUniswapFill,
};
use tokio::sync::mpsc::channel;
use tracing::{info, Level};
use tracing::{error, info, Level};
use tracing_subscriber::{filter, prelude::*};

pub mod collectors;
Expand Down Expand Up @@ -207,9 +207,21 @@ async fn main() -> Result<()> {
engine.add_executor(Box::new(protect_executor));
engine.add_executor(Box::new(public_tx_executor));
// Start engine.
if let Ok(mut set) = engine.run().await {
while let Some(res) = set.join_next().await {
info!("res: {:?}", res);
match engine.run().await {
Ok(mut set) => {
while let Some(res) = set.join_next().await {
match res {
Ok(res) => {
info!("res: {:?}", res);
}
Err(e) => {
info!("error: {:?}", e);
}
}
}
}
Err(e) => {
error!("Engine run error: {:?}", e);
}
}
Ok(())
Expand Down
Loading

0 comments on commit 4777bb0

Please sign in to comment.