diff --git a/Cargo.toml b/Cargo.toml index 2399133..e129771 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ jemalloc = ["rblib/jemalloc"] debug = ["tokio/full", "tokio/tracing", "dep:console-subscriber"] [dependencies] -rblib = { git = "https://github.com/flashbots/rblib", rev = "198cbe65e59f7467c756d5ba6422155c98402291" } +rblib = { git = "https://github.com/flashbots/rblib", branch = "ak-filter-orders" } futures = "0.3" tokio = "1.46" @@ -75,7 +75,7 @@ console-subscriber = { version = "0.4", optional = true } tracing-subscriber = "0.3.20" [dev-dependencies] -rblib = { git = "https://github.com/flashbots/rblib", rev = "198cbe65e59f7467c756d5ba6422155c98402291", features = [ +rblib = { git = "https://github.com/flashbots/rblib", branch = "ak-filter-orders", features = [ "test-utils", ] } diff --git a/src/bundle.rs b/src/bundle.rs index ea65944..4d6d6c8 100644 --- a/src/bundle.rs +++ b/src/bundle.rs @@ -78,6 +78,29 @@ pub struct FlashblocksBundle { /// Note: Not recommended because this is subject to the builder node clock. #[serde(default, skip_serializing_if = "Option::is_none")] pub max_timestamp: Option, + + /// Minimum flashblock number at which this bundle can be included. + /// + /// Flashblocks are preconfirmations that are built incrementally. This + /// field along with `maxFlashblockNumber` allows bundles to be scheduled + /// for more precise execution. + #[serde( + default, + with = "alloy_serde::quantity::opt", + skip_serializing_if = "Option::is_none" + )] + pub min_flashblock_number: Option, + + /// Maximum flashblock number at which this bundle can be included. + /// + /// Similar to `minFlashblockNumber`, this sets an upper bound on which + /// flashblocks can include this bundle. + #[serde( + default, + with = "alloy_serde::quantity::opt", + skip_serializing_if = "Option::is_none" + )] + pub max_flashblock_number: Option, } impl FlashblocksBundle { @@ -93,6 +116,8 @@ impl FlashblocksBundle { max_block_number: None, min_timestamp: None, max_timestamp: None, + min_flashblock_number: None, + max_flashblock_number: None, } } } diff --git a/src/main.rs b/src/main.rs index 52083f7..b07a6e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -141,8 +141,14 @@ fn build_flashblocks_pipeline( let ws = Arc::new(WebSocketSink::new(socket_address)?); + // Multiple steps need to access flashblock number state, so we need to + // initialize it outside + let flashblock_number = Arc::new(FlashblockNumber::new()); + let flashblock_building_pipeline_steps = ( - AppendOrders::from_pool(pool).with_ok_on_limit(), + AppendOrders::from_pool(pool) + .with_ok_on_limit() + .with_filter(flashblock_number.clone().bundle_filter()), OrderByPriorityFee::default(), RemoveRevertedTransactions::default(), BreakAfterDeadline, @@ -161,10 +167,6 @@ fn build_flashblocks_pipeline( flashblock_building_pipeline_steps }; - // Multiple steps need to access flashblock number state, so we need to - // initialize it outside - let flashblock_number = Arc::new(FlashblockNumber::new()); - let pipeline = Pipeline::::named("flashblocks") .with_prologue(OptimismPrologue) .with_pipeline( diff --git a/src/platform.rs b/src/platform.rs index 2a1b07e..916ee95 100644 --- a/src/platform.rs +++ b/src/platform.rs @@ -14,7 +14,7 @@ use { /// the `Flashblocks` platform. /// /// See [`rblib::Platform`] for more details on platform definitions. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct Flashblocks; impl Platform for Flashblocks { diff --git a/src/publish.rs b/src/publish.rs index a0d80b9..5ce350b 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -210,7 +210,7 @@ impl Step for PublishFlashblock { /// Called during pipeline instantiation before any payload job is served. /// - Configure metrics scope. - fn setup( + async fn setup( &mut self, ctx: InitContext, ) -> Result<(), PayloadBuilderError> { diff --git a/src/state.rs b/src/state.rs index 8e4c2b5..b62c9bc 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,6 +1,13 @@ -use std::{ - fmt::Display, - sync::atomic::{AtomicU64, Ordering}, +use { + crate::Flashblocks, + rblib::{pool::Order, prelude::*}, + std::{ + fmt::Display, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + }, }; #[derive(Debug)] @@ -53,6 +60,33 @@ impl FlashblockNumber { pub fn reset_current_flashblock(&self) -> u64 { self.current_flashblock.swap(1, Ordering::Relaxed) } + + pub fn bundle_filter( + self: Arc, + ) -> impl Fn(&Checkpoint, &Order) -> bool + + Send + + Sync + + 'static { + move |_: &Checkpoint, order: &Order| -> bool { + let current_flashblock_number = self.current(); + + if let Order::Bundle(bundle) = order { + if let Some(min_flashblock_number) = bundle.min_flashblock_number { + if current_flashblock_number < min_flashblock_number { + return false; + } + } + + if let Some(max_flashblock_number) = bundle.max_flashblock_number { + if max_flashblock_number < current_flashblock_number { + return false; + } + } + } + + true + } + } } impl Default for FlashblockNumber {