From 4777bb02dc145ed66935e3db7167e650a1bd067c Mon Sep 17 00:00:00 2001 From: Eric Zhong Date: Thu, 29 Aug 2024 12:12:02 -0400 Subject: [PATCH] fix: error handling (#1) * map err for routing api calls * try to fix order duplication * remove log * cargo fmt and map err on txn submission --- src/collectors/uniswapx_route_collector.rs | 30 ++-- src/executors/protect_executor.rs | 18 ++- src/executors/public_1559_executor.rs | 3 +- src/main.rs | 20 ++- src/strategies/priority_strategy.rs | 156 +++++++++++---------- src/strategies/shared.rs | 2 +- src/strategies/uniswapx_strategy.rs | 28 ++-- 7 files changed, 151 insertions(+), 106 deletions(-) diff --git a/src/collectors/uniswapx_route_collector.rs b/src/collectors/uniswapx_route_collector.rs index 2247fa3..11469bb 100644 --- a/src/collectors/uniswapx_route_collector.rs +++ b/src/collectors/uniswapx_route_collector.rs @@ -1,5 +1,5 @@ use alloy_primitives::Uint; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Result}; use reqwest::header::ORIGIN; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -10,7 +10,7 @@ use artemis_core::types::{Collector, CollectorStream}; use async_trait::async_trait; use futures::lock::Mutex; use futures::stream::{FuturesUnordered, StreamExt}; -use reqwest::Client; +use reqwest::{Client, StatusCode}; const ROUTING_API: &str = "https://api.uniswap.org/v1/quote"; const SLIPPAGE_TOLERANCE: &str = "0.5"; @@ -173,7 +173,7 @@ impl Collector for UniswapXRouteCollector { let stream = async_stream::stream! { let mut receiver = self.route_request_receiver.lock().await; while let Some(route_requests) = receiver.recv().await { - let tasks: FuturesUnordered<_> = route_requests.iter() + let tasks: FuturesUnordered<_> = route_requests.into_iter() .map(|batch| { let OrderBatchData { orders, token_in, token_out, amount_in, .. } = batch.clone(); info!( @@ -232,16 +232,30 @@ pub async fn route_order(params: RouteOrderParams) -> Result { let client = reqwest::Client::new(); - Ok(client + let response = client .get(format!("{}?{}", ROUTING_API, query_string)) .header(ORIGIN, "https://app.uniswap.org") .header("x-request-source", "uniswap-web") .send() .await - .context("Quote request failed with {}")? - .json::() - .await - .context("Failed to parse response: {}")?) + .map_err(|e| anyhow!("Quote request failed with error: {}", e))?; + + match response.status() { + StatusCode::OK => Ok(response + .json::() + .await + .map_err(|e| anyhow!("Failed to parse response: {}", e))?), + StatusCode::BAD_REQUEST => Err(anyhow!("Bad request: {}", response.status())), + StatusCode::NOT_FOUND => Err(anyhow!("Not quote found: {}", response.status())), + StatusCode::TOO_MANY_REQUESTS => Err(anyhow!("Too many requests: {}", response.status())), + StatusCode::INTERNAL_SERVER_ERROR => { + Err(anyhow!("Internal server error: {}", response.status())) + } + _ => Err(anyhow!( + "Unexpected error with status code: {}", + response.status() + )), + } } // The Uniswap routing API requires that "ETH" be used instead of the zero address diff --git a/src/executors/protect_executor.rs b/src/executors/protect_executor.rs index 172c7a2..efc8f7c 100644 --- a/src/executors/protect_executor.rs +++ b/src/executors/protect_executor.rs @@ -4,11 +4,11 @@ use std::{ }; use tracing::info; -use anyhow::{Context, Result}; +use anyhow::Result; use artemis_core::executors::mempool_executor::SubmitTxToMempool; use artemis_core::types::Executor; use async_trait::async_trait; -use ethers::providers::Middleware; +use ethers::{providers::Middleware, types::U256}; /// An executor that sends transactions to the mempool. pub struct ProtectExecutor { @@ -39,9 +39,12 @@ where .client .estimate_gas(&action.tx, None) .await - .context("Error estimating gas usage: {}"); + .unwrap_or_else(|err| { + info!("Error estimating gas: {}", err); + U256::from(1_000_000) + }); info!("Gas Usage {:?}", gas_usage_result); - let gas_usage = gas_usage_result?; + let gas_usage = gas_usage_result; let bid_gas_price; if let Some(gas_bid_info) = action.gas_bid_info { @@ -56,12 +59,15 @@ where .client .get_gas_price() .await - .context("Error getting gas price: {}")?; + .map_err(|err| anyhow::anyhow!("Error getting gas price: {}", err))?; } action.tx.set_gas_price(bid_gas_price); info!("Executing tx {:?}", action.tx); - self.sender_client.send_transaction(action.tx, None).await?; + self.sender_client + .send_transaction(action.tx, None) + .await + .map_err(|err| anyhow::anyhow!("Error sending transaction: {}", err))?; Ok(()) } } diff --git a/src/executors/public_1559_executor.rs b/src/executors/public_1559_executor.rs index ac39e67..5017875 100644 --- a/src/executors/public_1559_executor.rs +++ b/src/executors/public_1559_executor.rs @@ -73,7 +73,8 @@ where info!("Executing tx {:?}", action.execution.tx); self.sender_client .send_transaction(action.execution.tx, None) - .await?; + .await + .map_err(|err| anyhow::anyhow!("Error sending transaction: {}", err))?; Ok(()) } } diff --git a/src/main.rs b/src/main.rs index b684e00..c58c553 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use strategies::{ uniswapx_strategy::UniswapXUniswapFill, }; use tokio::sync::mpsc::channel; -use tracing::{info, Level}; +use tracing::{error, info, Level}; use tracing_subscriber::{filter, prelude::*}; pub mod collectors; @@ -207,9 +207,21 @@ async fn main() -> Result<()> { engine.add_executor(Box::new(protect_executor)); engine.add_executor(Box::new(public_tx_executor)); // Start engine. - if let Ok(mut set) = engine.run().await { - while let Some(res) = set.join_next().await { - info!("res: {:?}", res); + match engine.run().await { + Ok(mut set) => { + while let Some(res) = set.join_next().await { + match res { + Ok(res) => { + info!("res: {:?}", res); + } + Err(e) => { + info!("error: {:?}", e); + } + } + } + } + Err(e) => { + error!("Engine run error: {:?}", e); } } Ok(()) diff --git a/src/strategies/priority_strategy.rs b/src/strategies/priority_strategy.rs index 97065b6..55091b4 100644 --- a/src/strategies/priority_strategy.rs +++ b/src/strategies/priority_strategy.rs @@ -111,6 +111,14 @@ impl UniswapXPriorityFill { route_receiver: receiver, } } + + fn open_orders(&self) -> &HashMap { + &self.open_orders + } + + fn open_orders_mut(&mut self) -> &mut HashMap { + &mut self.open_orders + } } #[async_trait] @@ -125,9 +133,9 @@ impl Strategy for UniswapXPriorityFill Option { match event { - Event::UniswapXOrder(order) => self.process_order_event(*order).await, - Event::NewBlock(block) => self.process_new_block_event(block).await, - Event::UniswapXRoute(route) => self.process_new_route(*route).await, + Event::UniswapXOrder(order) => self.process_order_event(&order).await, + Event::NewBlock(block) => self.process_new_block_event(&block).await, + Event::UniswapXRoute(route) => self.process_new_route(&route).await, } } } @@ -146,7 +154,7 @@ impl UniswapXPriorityFill { Ok(PriorityOrder::decode_inner(&order_hex, false)?) } - async fn process_order_event(&mut self, event: UniswapXOrder) -> Option { + async fn process_order_event(&mut self, event: &UniswapXOrder) -> Option { if self.last_block_timestamp == 0 { return None; } @@ -156,23 +164,29 @@ impl UniswapXPriorityFill { .map_err(|e| error!("failed to decode: {}", e)) .ok()?; - self.update_order_state(order, event.signature, event.order_hash); - // try to send immediately - self.batch_sender - .send(self.get_order_batches()) - .await - .ok()?; + self.update_order_state(&order, &event.signature, &event.order_hash); + + // get the order data from open orders and send it to the batch sender + if let Some(order_data) = self.open_orders().get(&event.order_hash) { + let order_batch = self.get_order_batch(order_data); + self.batch_sender + .send(vec![order_batch]) + .await + .map_err(|e| error!("failed to send order batch: {}", e)) + .ok()?; + } None } - async fn process_new_route(&mut self, event: RoutedOrder) -> Option { + async fn process_new_route(&mut self, event: &RoutedOrder) -> Option { if event .request .orders .iter() - .any(|o| self.done_orders.contains_key(&o.hash)) + .any(|o: &OrderData| self.done_orders.contains_key(&o.hash)) { + info!("Skipping route with done orders"); return None; } @@ -219,7 +233,7 @@ impl UniswapXPriorityFill { } /// Process new block events, updating the internal state. - async fn process_new_block_event(&mut self, event: NewBlock) -> Option { + async fn process_new_block_event(&mut self, event: &NewBlock) -> Option { self.last_block_number = event.number.as_u64(); self.last_block_timestamp = event.timestamp.as_u64(); @@ -227,7 +241,7 @@ impl UniswapXPriorityFill { "Processing block {} at {}, Order set sizes -- open: {}, done: {}", event.number, event.timestamp, - self.open_orders.len(), + self.open_orders().len(), self.done_orders.len() ); self.handle_fills() @@ -237,11 +251,6 @@ impl UniswapXPriorityFill { self.update_open_orders(); self.prune_done_orders(); - self.batch_sender - .send(self.get_order_batches()) - .await - .ok()?; - None } @@ -264,28 +273,21 @@ impl UniswapXPriorityFill { Ok(signed_orders) } - /// We do not batch orders because priority fee is applied on the transaction level - fn get_order_batches(&self) -> Vec { - let mut order_batches: Vec = Vec::new(); - - // generate batches of size 1 - self.open_orders.iter().for_each(|(_, order_data)| { - let amount_in = order_data.resolved.input.amount; - let amount_out = order_data - .resolved - .outputs - .iter() - .fold(Uint::from(0), |sum, output| sum.wrapping_add(output.amount)); - - order_batches.push(OrderBatchData { - orders: vec![order_data.clone()], - amount_in, - amount_out_required: amount_out, - token_in: order_data.resolved.input.token.clone(), - token_out: order_data.resolved.outputs[0].token.clone(), - }); - }); - order_batches + fn get_order_batch(&self, order_data: &OrderData) -> OrderBatchData { + let amount_in: Uint<256, 4> = order_data.resolved.input.amount; + let amount_out = order_data + .resolved + .outputs + .iter() + .fold(Uint::from(0), |sum, output| sum.wrapping_add(output.amount)); + + OrderBatchData { + orders: vec![order_data.clone()], + amount_in, + amount_out_required: amount_out, + token_in: order_data.resolved.input.token.clone(), + token_out: order_data.resolved.outputs[0].token.clone(), + } } async fn handle_fills(&mut self) -> Result<()> { @@ -301,7 +303,7 @@ impl UniswapXPriorityFill { let order_hash = format!("0x{:x}", log.topics[1]); // remove from open info!("Removing filled order {}", order_hash); - self.open_orders.remove(&order_hash); + self.open_orders_mut().remove(&order_hash); // add to done self.done_orders.insert( order_hash.to_string(), @@ -336,7 +338,12 @@ impl UniswapXPriorityFill { }); } - fn update_order_state(&mut self, order: PriorityOrder, signature: String, order_hash: String) { + fn update_order_state( + &mut self, + order: &PriorityOrder, + signature: &String, + order_hash: &String, + ) { let resolved = order.resolve( self.last_block_number, self.last_block_timestamp + BLOCK_TIME, @@ -345,7 +352,7 @@ impl UniswapXPriorityFill { let order_status: OrderStatus = match resolved { OrderResolution::Expired => OrderStatus::Done, OrderResolution::Invalid => OrderStatus::Done, - OrderResolution::NotFillableYet => OrderStatus::NotFillableYet, // TODO: gracefully handle this, currently this will cause a revert if we try to fill too earlty + OrderResolution::NotFillableYet => OrderStatus::NotFillableYet, OrderResolution::Resolved(resolved_order) => OrderStatus::Open(resolved_order), }; @@ -357,22 +364,26 @@ impl UniswapXPriorityFill { info!("Order not fillable yet, skipping: {}", order_hash); } OrderStatus::Open(resolved_order) => { - if self.done_orders.contains_key(&order_hash) { + if self.done_orders.contains_key(order_hash) { info!("Order already done, skipping: {}", order_hash); return; } - if !self.open_orders.contains_key(&order_hash) { + if self.open_orders().contains_key(order_hash) { + let existing_order = self.open_orders_mut().get_mut(order_hash).unwrap(); + info!("Updating order {}", order_hash); + existing_order.resolved = resolved_order; + } else { info!("Adding new order {}", order_hash); + self.open_orders_mut().insert( + order_hash.clone(), + OrderData { + order: Order::PriorityOrder(order.clone()), + hash: order_hash.clone(), + signature: signature.clone(), + resolved: resolved_order, + }, + ); } - self.open_orders.insert( - order_hash.clone(), - OrderData { - order: Order::PriorityOrder(order), - hash: order_hash, - signature, - resolved: resolved_order, - }, - ); } } } @@ -390,28 +401,29 @@ impl UniswapXPriorityFill { } fn update_open_orders(&mut self) { - // TODO: this is nasty, plz cleanup - let binding = self.open_orders.clone(); - let order_hashes: Vec<(&String, &OrderData)> = binding.iter().collect(); - for (order_hash, order_data) in order_hashes { - match &order_data.order { - Order::PriorityOrder(order) => { - self.update_order_state( - order.clone(), - order_data.signature.clone(), - order_hash.clone().to_string(), - ); - } - _ => { - error!("Invalid order type"); + let order_hashes: Vec = self.open_orders().keys().cloned().collect(); + for order_hash in order_hashes { + let (mut order, signature) = { + let order_data = self.open_orders_mut().get_mut(&order_hash); + if let Some(order_data) = order_data { + // Clone the necessary data + let signature = order_data.clone().signature; + match &order_data.order { + Order::PriorityOrder(order) => (order.clone(), signature), + _ => continue, + } + } else { + continue; } - } + }; + + self.update_order_state(&mut order, &signature, &order_hash); } } fn mark_as_done(&mut self, order: &str) { - if self.open_orders.contains_key(order) { - self.open_orders.remove(order); + if self.open_orders().contains_key(order) { + self.open_orders_mut().remove(order); } if !self.done_orders.contains_key(order) { self.done_orders diff --git a/src/strategies/shared.rs b/src/strategies/shared.rs index 637945f..a095fd0 100644 --- a/src/strategies/shared.rs +++ b/src/strategies/shared.rs @@ -27,7 +27,7 @@ pub trait UniswapXStrategy { client: Arc, executor_address: &str, signed_orders: Vec, - RoutedOrder { request, route }: RoutedOrder, + RoutedOrder { request, route }: &RoutedOrder, ) -> Result { let chain_id: U256 = client.get_chainid().await?; let fill_contract = diff --git a/src/strategies/uniswapx_strategy.rs b/src/strategies/uniswapx_strategy.rs index 7ee53c0..8cb4222 100644 --- a/src/strategies/uniswapx_strategy.rs +++ b/src/strategies/uniswapx_strategy.rs @@ -86,9 +86,9 @@ impl Strategy for UniswapXUniswapFill // Process incoming events, seeing if we can arb new orders, and updating the internal state on new blocks. async fn process_event(&mut self, event: Event) -> Option { match event { - Event::UniswapXOrder(order) => self.process_order_event(*order).await, - Event::NewBlock(block) => self.process_new_block_event(block).await, - Event::UniswapXRoute(route) => self.process_new_route(*route).await, + Event::UniswapXOrder(order) => self.process_order_event(&order).await, + Event::NewBlock(block) => self.process_new_block_event(&block).await, + Event::UniswapXRoute(route) => self.process_new_route(&route).await, } } } @@ -108,7 +108,7 @@ impl UniswapXUniswapFill { } // Process new orders as they come in. - async fn process_order_event(&mut self, event: UniswapXOrder) -> Option { + async fn process_order_event(&mut self, event: &UniswapXOrder) -> Option { if self.last_block_timestamp == 0 { return None; } @@ -118,11 +118,11 @@ impl UniswapXUniswapFill { .map_err(|e| error!("failed to decode: {}", e)) .ok()?; - self.update_order_state(order, event.signature, event.order_hash); + self.update_order_state(order, &event.signature, &event.order_hash); None } - async fn process_new_route(&mut self, event: RoutedOrder) -> Option { + async fn process_new_route(&mut self, event: &RoutedOrder) -> Option { if event .request .orders @@ -169,7 +169,7 @@ impl UniswapXUniswapFill { } /// Process new block events, updating the internal state. - async fn process_new_block_event(&mut self, event: NewBlock) -> Option { + async fn process_new_block_event(&mut self, event: &NewBlock) -> Option { self.last_block_number = event.number.as_u64(); self.last_block_timestamp = event.timestamp.as_u64(); @@ -299,8 +299,8 @@ impl UniswapXUniswapFill { Order::V2DutchOrder(order) => { self.update_order_state( order.clone(), - order_data.signature.clone(), - order_hash.clone().to_string(), + &order_data.signature, + &order_hash.to_string(), ); } _ => { @@ -320,7 +320,7 @@ impl UniswapXUniswapFill { } } - fn update_order_state(&mut self, order: V2DutchOrder, signature: String, order_hash: String) { + fn update_order_state(&mut self, order: V2DutchOrder, signature: &String, order_hash: &String) { let resolved = order.resolve(self.last_block_timestamp + BLOCK_TIME); let order_status: OrderStatus = match resolved { OrderResolution::Expired => OrderStatus::Done, @@ -334,19 +334,19 @@ impl UniswapXUniswapFill { self.mark_as_done(&order_hash); } OrderStatus::Open(resolved_order) => { - if self.done_orders.contains_key(&order_hash) { + if self.done_orders.contains_key(order_hash) { info!("Order already done, skipping: {}", order_hash); return; } - if !self.open_orders.contains_key(&order_hash) { + if !self.open_orders.contains_key(order_hash) { info!("Adding new order {}", order_hash); } self.open_orders.insert( order_hash.clone(), OrderData { order: Order::V2DutchOrder(order), - hash: order_hash, - signature, + hash: order_hash.clone(), + signature: signature.clone(), resolved: resolved_order, }, );