Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,168 changes: 559 additions & 609 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 8 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ name = "builder"
name = "zenith-builder-example"
path = "bin/builder.rs"

[features]
integration = []

[dependencies]
init4-bin-base = { version = "0.12.3", features = ["perms", "aws"] }
init4-bin-base = { version = "0.12.3", features = ["perms", "aws", "flashbots"] }

signet-constants = { version = "0.10.1" }
signet-sim = { version = "0.10.1" }
Expand All @@ -36,22 +33,22 @@ alloy = { version = "1.0.25", features = [
"json-rpc",
"signer-aws",
"rpc-types-mev",
"rpc-client",
"rpc-types-debug",
"rlp",
"node-bindings",
"serde",
"getrandom"
"getrandom",
] }

serde = { version = "1.0.197", features = ["derive"] }

axum = "0.7.5"
chrono = "0.4.40"
eyre = "0.6.12"
openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.12.22", features = ["blocking", "json"] }
serde_json = "1.0"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.145"
tracing = "0.1.41"
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
chrono = "0.4.40"

tokio-stream = "0.1.17"
url = "2.5.4"
tracing = "0.1.41"
46 changes: 13 additions & 33 deletions bin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use builder::{
config::BuilderConfig,
service::serve_builder,
tasks::{
block::sim::Simulator, cache::CacheTasks, env::EnvTask, metrics::MetricsTask,
submit::BuilderHelperTask,
},
tasks::{block::sim::Simulator, cache::CacheTasks, env::EnvTask, metrics::MetricsTask},
};
use init4_bin_base::{
deps::tracing::{info, info_span},
utils::from_env::FromEnv,
};
use signet_types::constants::SignetSystemConstants;
use tokio::select;

// Note: Must be set to `multi_thread` to support async tasks.
Expand All @@ -22,49 +18,33 @@ async fn main() -> eyre::Result<()> {

// Pull the configuration from the environment
let config = BuilderConfig::from_env()?.clone();
let constants = SignetSystemConstants::pecorino();

// We connect the WS greedily, so we can fail early if the connection is
// invalid.
let ru_provider = config.connect_ru_provider().await?;
// We connect the providers greedily, so we can fail early if the
// RU WS connection is invalid.
let (ru_provider, host_provider) =
tokio::try_join!(config.connect_ru_provider(), config.connect_host_provider(),)?;

// Spawn the EnvTask
let env_task = EnvTask::new(
config.clone(),
constants.clone(),
config.connect_host_provider().await?,
ru_provider.clone(),
);
let env_task = EnvTask::new(config.clone(), host_provider.clone(), ru_provider.clone());
let (block_env, env_jh) = env_task.spawn();

// Spawn the cache system
let cache_tasks = CacheTasks::new(config.clone(), block_env.clone());
let cache_system = cache_tasks.spawn();

// Prep providers and contracts
let (host_provider, quincey) =
tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?;
let zenith = config.connect_zenith(host_provider.clone());

// Set up the metrics task
let metrics = MetricsTask { host_provider: host_provider.clone() };
let metrics = MetricsTask { host_provider };
let (tx_channel, metrics_jh) = metrics.spawn();

// Make a Tx submission task
let submit = BuilderHelperTask {
zenith,
quincey,
config: config.clone(),
constants: constants.clone(),
outbound_tx_channel: tx_channel,
};

// Set up tx submission
let (submit_channel, submit_jh) = submit.spawn();
// Set up the submit task. This will be either a Flashbots task or a
// BuilderHelper task depending on whether a Flashbots endpoint is
// configured.
let (submit_channel, submit_jh) = config.spawn_submit_task(tx_channel).await?;

// Set up the simulator
let sim = Simulator::new(&config, ru_provider.clone(), block_env);
let build_jh = sim.spawn_simulator_task(constants, cache_system.sim_cache, submit_channel);
let build_jh =
sim.spawn_simulator_task(config.constants.clone(), cache_system.sim_cache, submit_channel);

// Start the healthcheck server
let server = serve_builder(([0, 0, 0, 0], config.builder_port));
Expand Down
65 changes: 54 additions & 11 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use crate::{quincey::Quincey, tasks::block::cfg::SignetCfgEnv};
use crate::{
quincey::Quincey,
tasks::{
block::{cfg::SignetCfgEnv, sim::SimResult},
submit::{BuilderHelperTask, FlashbotsTask},
},
};
use alloy::{
network::{Ethereum, EthereumWallet},
primitives::Address,
primitives::{Address, TxHash},
providers::{
Identity, ProviderBuilder, RootProvider,
fillers::{
Expand All @@ -15,14 +21,16 @@ use init4_bin_base::{
perms::{Authenticator, OAuthConfig, SharedToken},
utils::{
calc::SlotCalculator,
flashbots::Flashbots,
from_env::FromEnv,
provider::{ProviderConfig, PubSubConfig},
signer::{LocalOrAws, SignerError},
},
};
use signet_constants::SignetSystemConstants;
use signet_zenith::Zenith;
use std::borrow::Cow;
use tokio::join;
use tokio::{join, sync::mpsc::UnboundedSender, task::JoinHandle};

/// Type alias for the provider used to simulate against rollup state.
pub type RuProvider = RootProvider<Ethereum>;
Expand Down Expand Up @@ -89,13 +97,8 @@ pub struct BuilderConfig {
)]
pub tx_broadcast_urls: Vec<Cow<'static, str>>,

/// Flashbots endpoint for privately submitting rollup blocks.
#[from_env(
var = "FLASHBOTS_ENDPOINT",
desc = "Flashbots endpoint for privately submitting rollup blocks",
optional
)]
pub flashbots_endpoint: Option<url::Url>,
/// Flashbots configuration for privately submitting rollup blocks.
pub flashbots: init4_bin_base::utils::flashbots::FlashbotsConfig,

/// Address of the Zenith contract on Host.
#[from_env(var = "ZENITH_ADDRESS", desc = "address of the Zenith contract on Host")]
Expand Down Expand Up @@ -163,12 +166,21 @@ pub struct BuilderConfig {

/// The slot calculator for the builder.
pub slot_calculator: SlotCalculator,

/// The signet system constants.
pub constants: SignetSystemConstants,
}

impl BuilderConfig {
/// Connect to the Builder signer.
pub async fn connect_builder_signer(&self) -> Result<LocalOrAws, SignerError> {
LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)).await
static ONCE: tokio::sync::OnceCell<LocalOrAws> = tokio::sync::OnceCell::const_new();

ONCE.get_or_try_init(|| async {
LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)).await
})
.await
.cloned()
}

/// Connect to the Sequencer signer.
Expand Down Expand Up @@ -275,4 +287,35 @@ impl BuilderConfig {
.unwrap_or(DEFAULT_CONCURRENCY_LIMIT)
})
}

/// Connect to a Flashbots provider.
pub async fn flashbots_provider(&self) -> eyre::Result<Flashbots> {
self.flashbots
.build(self.connect_builder_signer().await?)
.ok_or_else(|| eyre::eyre!("Flashbots is not configured"))
}

/// Spawn a submit task, either Flashbots or BuilderHelper depending on
/// configuration.
pub async fn spawn_submit_task(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this fn log whether the task was spawned in flashbots or tx mode?

&self,
tx_channel: UnboundedSender<TxHash>,
) -> eyre::Result<(UnboundedSender<SimResult>, JoinHandle<()>)> {
// If we have a flashbots endpoint, use that
if self.flashbots.flashbots_endpoint.is_some() {
// Make a Flashbots submission task
let submit = FlashbotsTask::new(self.clone(), tx_channel).await?;

// Set up flashbots submission
let (submit_channel, submit_jh) = submit.spawn();
return Ok((submit_channel, submit_jh));
}

// Make a Tx submission task
let submit = BuilderHelperTask::new(self.clone(), tx_channel).await?;

// Set up tx submission
let (submit_channel, submit_jh) = submit.spawn();
Ok((submit_channel, submit_jh))
}
}
29 changes: 27 additions & 2 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,33 @@
macro_rules! span_scoped {
($span:expr, $level:ident!($($arg:tt)*)) => {
$span.in_scope(|| {
$level!($($arg)*);
});
::tracing::$level!($($arg)*);
})
};
}

/// Helper macro to log a debug event within a span that is not currently
/// entered.
macro_rules! span_debug {
($span:expr, $($arg:tt)*) => {
span_scoped!($span, debug!($($arg)*))
};
}

/// Helper macro to log an info event within a span that is not currently
/// entered.
macro_rules! span_info {
($span:expr, $($arg:tt)*) => {
span_scoped!($span, info!($($arg)*))
};

}

/// Helper macro to log a warning event within a span that is not currently
/// entered.
macro_rules! span_error {
($span:expr, $($arg:tt)*) => {
span_scoped!($span, error!($($arg)*))
};
}

Expand Down
7 changes: 2 additions & 5 deletions src/quincey.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use alloy::signers::Signer;
use eyre::bail;
use init4_bin_base::{
deps::tracing::{debug, info, instrument, trace},
perms::SharedToken,
utils::signer::LocalOrAws,
};
use init4_bin_base::{perms::SharedToken, utils::signer::LocalOrAws};
use reqwest::Client;
use signet_types::{SignRequest, SignResponse};
use tracing::{debug, info, instrument, trace};

/// A quincey client for making requests to the Quincey API.
#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use axum::{
response::{IntoResponse, Response},
routing::get,
};
use init4_bin_base::deps::tracing::error;
use std::net::SocketAddr;
use tracing::error;

/// Return a 404 Not Found response
pub async fn return_404() -> Response {
Expand Down
27 changes: 10 additions & 17 deletions src/tasks/block/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use crate::{
tasks::env::SimEnv,
};
use alloy::{eips::BlockId, network::Ethereum};
use init4_bin_base::{
deps::tracing::{debug, error},
utils::calc::SlotCalculator,
};
use init4_bin_base::utils::calc::SlotCalculator;
use signet_sim::{BlockBuild, BuiltBlock, SimCache};
use signet_types::constants::SignetSystemConstants;
use std::time::{Duration, Instant};
Expand All @@ -20,7 +17,7 @@ use tokio::{
},
task::JoinHandle,
};
use tracing::{Instrument, info, instrument};
use tracing::{Instrument, Span, instrument};
use trevm::revm::{
context::BlockEnv,
database::{AlloyDB, WrapDatabaseAsync},
Expand Down Expand Up @@ -64,12 +61,12 @@ impl SimResult {

/// Returns a reference to the tracing span associated with this simulation
/// result.
pub const fn span(&self) -> &tracing::Span {
pub const fn span(&self) -> &Span {
self.sim_env.span()
}

/// Clones the span for use in other tasks.
pub fn clone_span(&self) -> tracing::Span {
pub fn clone_span(&self) -> Span {
self.sim_env.clone_span()
}
}
Expand Down Expand Up @@ -116,7 +113,6 @@ impl Simulator {
///
/// A `Result` containing the built block or an error.
#[instrument(skip_all, fields(
block_number = block_env.number.to::<u64>(),
tx_count = sim_items.len(),
millis_to_deadline = finish_by.duration_since(Instant::now()).as_millis()
))]
Expand Down Expand Up @@ -144,7 +140,7 @@ impl Simulator {
);

let built_block = block_build.build().in_current_span().await;
debug!(
tracing::debug!(
tx_count = built_block.tx_count(),
block_number = built_block.block_number(),
"block simulation completed",
Expand All @@ -171,7 +167,7 @@ impl Simulator {
cache: SimCache,
submit_sender: mpsc::UnboundedSender<SimResult>,
) -> JoinHandle<()> {
debug!("starting simulator task");
tracing::debug!("starting simulator task");

tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await })
}
Expand Down Expand Up @@ -201,16 +197,13 @@ impl Simulator {
loop {
// Wait for the block environment to be set
if self.sim_env.changed().await.is_err() {
error!("block_env channel closed - shutting down simulator task");
tracing::error!("block_env channel closed - shutting down simulator task");
return;
}
let Some(sim_env) = self.sim_env.borrow_and_update().clone() else { return };

let span = sim_env.span();

span.in_scope(|| {
info!("new block environment received");
});
span_info!(span, "new block environment received");

// Calculate the deadline for this block simulation.
// NB: This must happen _after_ taking a reference to the sim cache,
Expand All @@ -222,13 +215,13 @@ impl Simulator {
.handle_build(constants.clone(), sim_cache, finish_by, sim_env.block_env.clone())
.instrument(span.clone())
.await
.inspect_err(|err| span.in_scope(|| error!(%err, "error during block build")))
.inspect_err(|err| span_error!(span, %err, "error during block build"))
else {
continue;
};

let _guard = span.clone().entered();
debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built simulated block");
span_debug!(span, tx_count = block.transactions().len(), "built simulated block");
let _ = submit_sender.send(SimResult { block, sim_env });
}
}
Expand Down
Loading