Skip to content

Commit

Permalink
Multiple actions returned from process_event if needed (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
laudiacay authored Aug 7, 2023
1 parent 925f285 commit 88fb493
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 31 deletions.
2 changes: 1 addition & 1 deletion crates/artemis-core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ where
loop {
match event_receiver.recv().await {
Ok(event) => {
if let Some(action) = strategy.process_event(event).await {
for action in strategy.process_event(event).await {
match action_sender.send(action) {
Ok(_) => {}
Err(e) => error!("error sending action: {}", e),
Expand Down
25 changes: 7 additions & 18 deletions crates/artemis-core/src/executors/mev_share_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::types::Executor;
use anyhow::Result;
use async_trait::async_trait;
use ethers::signers::Signer;
use futures::{stream, StreamExt};
use jsonrpsee::http_client::{
transport::{self},
HttpClientBuilder,
Expand Down Expand Up @@ -34,24 +33,14 @@ impl MevshareExecutor {
}

#[async_trait]
impl Executor<Vec<SendBundleRequest>> for MevshareExecutor {
impl Executor<SendBundleRequest> for MevshareExecutor {
/// Send bundles to the matchmaker.
async fn execute(&self, action: Vec<SendBundleRequest>) -> Result<()> {
let bodies = stream::iter(action)
.map(|bundle| {
let client = &self.mev_share_client;
async move { client.send_bundle(bundle).await }
})
.buffer_unordered(5);

bodies
.for_each(|b| async {
match b {
Ok(b) => info!("Bundle response: {:?}", b),
Err(e) => error!("Bundle error: {}", e),
}
})
.await;
async fn execute(&self, action: SendBundleRequest) -> Result<()> {
let body = self.mev_share_client.send_bundle(action).await;
match body {
Ok(body) => info!("Bundle response: {:?}", body),
Err(e) => error!("Bundle error: {}", e),
};
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/artemis-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait Strategy<E, A>: Send + Sync {
async fn sync_state(&mut self) -> Result<()>;

/// Process an event, and return an action if needed.
async fn process_event(&mut self, event: E) -> Option<A>;
async fn process_event(&mut self, event: E) -> Vec<A>;
}

/// Executor trait, responsible for executing actions returned by strategies.
Expand Down
13 changes: 8 additions & 5 deletions crates/strategies/mev-share-uni-arb/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,29 @@ impl<M: Middleware + 'static, S: Signer + 'static> Strategy<Event, Action>
}

// Process incoming events, seeing if we can arb new orders.
async fn process_event(&mut self, event: Event) -> Option<Action> {
async fn process_event(&mut self, event: Event) -> Vec<Action> {
match event {
Event::MEVShareEvent(event) => {
info!("Received mev share event: {:?}", event);
// skip if event has no logs
if event.logs.is_empty() {
return None;
return vec![];
}
let address = event.logs[0].address;
// skip if address is not a v3 pool
if !self.pool_map.contains_key(&address) {
return None;
return vec![];
}
// if it's a v3 pool we care about, submit bundles
info!(
"Found a v3 pool match at address {:?}, submitting bundles",
address
);
let bundles = self.generate_bundles(address, event.hash).await;
return Some(Action::SubmitBundles(bundles));
self.generate_bundles(address, event.hash)
.await
.into_iter()
.map(Action::SubmitBundle)
.collect()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/strategies/mev-share-uni-arb/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum Event {
/// Core Action enum for the current strategy.
#[derive(Debug, Clone)]
pub enum Action {
SubmitBundles(Vec<SendBundleRequest>),
SubmitBundle(SendBundleRequest),
}

#[derive(Debug, serde::Deserialize)]
Expand Down
11 changes: 7 additions & 4 deletions crates/strategies/opensea-sudo-arb/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,14 @@ impl<M: Middleware + 'static> Strategy<Event, Action> for OpenseaSudoArb<M> {
}

// Process incoming events, seeing if we can arb new orders, and updating the internal state on new blocks.
async fn process_event(&mut self, event: Event) -> Option<Action> {
async fn process_event(&mut self, event: Event) -> Vec<Action> {
match event {
Event::OpenseaOrder(order) => self.process_order_event(*order).await,
Event::OpenseaOrder(order) => self
.process_order_event(*order)
.await
.map_or(vec![], |a| vec![a]),
Event::NewBlock(block) => match self.process_new_block_event(block).await {
Ok(_) => None,
Ok(_) => vec![],
Err(e) => {
panic!("Strategy is out of sync {}", e);
}
Expand Down Expand Up @@ -217,7 +220,7 @@ impl<M: Middleware + 'static> OpenseaSudoArb<M> {
let quotes = self.quoter.get_multiple_sell_quotes(pools.clone()).await?;
let res = pools
.into_iter()
.zip(quotes.into_iter())
.zip(quotes)
.collect::<Vec<(H160, SellQuote)>>();
Ok(res)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/mev-share-arb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn main() -> Result<()> {
// Set up executor.
let mev_share_executor = Box::new(MevshareExecutor::new(fb_signer));
let mev_share_executor = ExecutorMap::new(mev_share_executor, |action| match action {
Action::SubmitBundles(bundles) => Some(bundles),
Action::SubmitBundle(bundle) => Some(bundle),
});
engine.add_executor(Box::new(mev_share_executor));

Expand Down

0 comments on commit 88fb493

Please sign in to comment.