Skip to content

Commit

Permalink
feat(L2): basic sequencer (lambdaclass#586)
Browse files Browse the repository at this point in the history
> [!WARNING]
> This PR depends on lambdaclass#575 and must be merged after it.

**Motivation**

<!-- Why does this pull request exist? What are its goals? -->

**Description**

<!-- A clear and concise general description of the changes this PR
introduces -->

- Adds block producer module
- Implements `Engine` to advance the blockchain

---------

Co-authored-by: fmoletta <fedemoletta@hotmail.com>
Co-authored-by: Manuel Bilbao <manuel.bilbao@lambdaclass.com>
Co-authored-by: Manuel Iñaki Bilbao <bilbaomanuel98@gmail.com>
  • Loading branch information
4 people authored Oct 8, 2024
1 parent 8024068 commit f93dd6a
Show file tree
Hide file tree
Showing 19 changed files with 344 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ crc32fast = "1.4.2"
lazy_static = "1.5.0"
sha3 = "0.10.8"
tokio-util = { version = "0.7.12", features = ["rt"] }
jsonwebtoken = "9.3.0"
rand = "0.8.5"
13 changes: 8 additions & 5 deletions cmd/ethereum_rust/ethereum_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ async fn main() {
let size = blocks.len();
for block in blocks {
let hash = block.header.compute_block_hash();
info!("Adding block {} with hash {}.", block.header.number, hash);
info!(
"Adding block {} with hash {:#x}.",
block.header.number, hash
);
match add_block(&block, &store) {
Ok(()) => store
.set_canonical_block(block.header.number, hash)
.unwrap(),
_ => {
Err(error) => {
warn!(
"Failed to add block {} with hash {}.",
block.header.number, hash
"Failed to add block {} with hash {:#x}: {}",
block.header.number, hash, error
);
}
}
Expand Down Expand Up @@ -159,7 +162,7 @@ async fn main() {
// We do not want to start the networking module if the l2 feature is enabled.
cfg_if::cfg_if! {
if #[cfg(feature = "l2")] {
let l2_operator = ethereum_rust_l2::start_operator().into_future();
let l2_operator = ethereum_rust_l2::start_operator(store.clone()).into_future();
tracker.spawn(l2_operator);
let l2_prover = ethereum_rust_l2::start_prover().into_future();
tracker.spawn(l2_prover);
Expand Down
45 changes: 45 additions & 0 deletions crates/common/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,28 @@ pub mod bytes48 {
{
serialize_vec_of_hex_encodables(value, serializer)
}

pub fn deserialize<'de, D>(d: D) -> Result<Vec<[u8; 48]>, D::Error>
where
D: Deserializer<'de>,
{
let value = Vec::<String>::deserialize(d)?;
let mut output = Vec::new();
for str in value {
let bytes = hex::decode(str.trim_start_matches("0x"))
.map_err(|e| D::Error::custom(e.to_string()))?;
if bytes.len() != 48 {
return Err(D::Error::custom(format!(
"Expected 48 bytes, got {}",
bytes.len()
)));
}
let mut blob = [0u8; 48];
blob.copy_from_slice(&bytes);
output.push(blob);
}
Ok(output)
}
}
}

Expand All @@ -310,6 +332,29 @@ pub mod blob {
{
serialize_vec_of_hex_encodables(value, serializer)
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<[u8; BYTES_PER_BLOB]>, D::Error>
where
D: Deserializer<'de>,
{
let value = Vec::<String>::deserialize(deserializer)?;
let mut output = Vec::new();
for str in value {
let bytes = hex::decode(str.trim_start_matches("0x"))
.map_err(|e| D::Error::custom(e.to_string()))?;
if bytes.len() != BYTES_PER_BLOB {
return Err(D::Error::custom(format!(
"Expected {} bytes, got {}",
BYTES_PER_BLOB,
bytes.len()
)));
}
let mut blob = [0u8; BYTES_PER_BLOB];
blob.copy_from_slice(&bytes);
output.push(blob);
}
Ok(output)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/common/types/blobs_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethereum_rust_rlp::{
error::RLPDecodeError,
structs::{Decoder, Encoder},
};
use serde::Serialize;
use serde::{Deserialize, Serialize};

use super::BYTES_PER_BLOB;

Expand All @@ -14,7 +14,7 @@ pub type Blob = [u8; BYTES_PER_BLOB];
pub type Commitment = Bytes48;
pub type Proof = Bytes48;

#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
/// Struct containing all the blobs for a blob transaction, along with the corresponding commitments and proofs
pub struct BlobsBundle {
Expand Down
8 changes: 5 additions & 3 deletions crates/l2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ tokio-util.workspace = true
tracing.workspace = true
serde.workspace = true
serde_json.workspace = true
sp1-sdk = "2.0.0"
ethereum-types.workspace = true
ethereum_rust-core.workspace = true
ethereum_rust-rlp.workspace = true
ethereum_rust-rpc.workspace = true
ethereum_rust-blockchain.workspace = true
libsecp256k1 = "0.7.1"
ethereum_rust-storage.workspace = true
hex.workspace = true
bytes.workspace = true
jsonwebtoken.workspace = true
sp1-sdk = "2.0.0"
libsecp256k1 = "0.7.1"
keccak-hash = "0.10.0"
hex.workspace = true

[lib]
path = "./l2.rs"
49 changes: 47 additions & 2 deletions crates/l2/operator/block_producer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,48 @@
pub struct BlockProducer;
use crate::operator::engine::Engine;
use ethereum_rust_rpc::types::fork_choice::{ForkChoiceState, PayloadAttributesV3};
use ethereum_types::H256;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time::sleep;

pub async fn start_block_producer() {}
pub async fn start_block_producer(current_block_hash: H256) {
let mut current_block_hash = current_block_hash;
loop {
let secret = std::fs::read("../../../jwt.hex").unwrap();
let engine = Engine::new("http://localhost:8551", secret.into());

let fork_choice_state = ForkChoiceState {
head_block_hash: current_block_hash,
safe_block_hash: current_block_hash,
finalized_block_hash: current_block_hash,
};
let payload_attributes = PayloadAttributesV3 {
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
..Default::default()
};

let fork_choice_response = engine
.engine_forkchoice_updated_v3(fork_choice_state, payload_attributes)
.await
.unwrap();

let payload_id = fork_choice_response.payload_id.unwrap();

let execution_payload_response = engine.engine_get_payload_v3(payload_id).await.unwrap();

let payload_status = engine
.engine_new_payload_v3(
execution_payload_response.execution_payload,
Default::default(),
Default::default(),
)
.await
.unwrap();

current_block_hash = payload_status.latest_valid_hash.unwrap();

sleep(Duration::from_secs(5)).await;
}
}
145 changes: 145 additions & 0 deletions crates/l2/operator/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use bytes::Bytes;
use ethereum_rust_rpc::{
engine::{
fork_choice::ForkChoiceUpdatedV3,
payload::{GetPayloadV3Request, NewPayloadV3Request},
ExchangeCapabilitiesRequest,
},
types::{
fork_choice::{ForkChoiceResponse, ForkChoiceState, PayloadAttributesV3},
payload::{ExecutionPayloadResponse, ExecutionPayloadV3, PayloadStatus},
},
utils::{RpcErrorResponse, RpcRequest, RpcSuccessResponse},
};
use ethereum_types::H256;
use reqwest::Client;
use serde::Deserialize;
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Deserialize)]
#[serde(untagged)]
pub enum RpcResponse {
Success(RpcSuccessResponse),
Error(RpcErrorResponse),
}

pub struct Engine {
client: Client,
secret: Bytes,
execution_client_url: String,
}

impl Engine {
pub fn new(execution_client_url: &str, secret: Bytes) -> Self {
Self {
client: Client::new(),
secret,
execution_client_url: execution_client_url.to_string(),
}
}

async fn send_request(&self, request: RpcRequest) -> Result<RpcResponse, reqwest::Error> {
self.client
.post(&self.execution_client_url)
.bearer_auth(self.auth_token())
.header("content-type", "application/json")
.body(serde_json::ser::to_string(&request).unwrap())
.send()
.await?
.json::<RpcResponse>()
.await
}

pub async fn engine_exchange_capabilities(&self) -> Result<Vec<String>, String> {
let request = ExchangeCapabilitiesRequest::from(Self::capabilities()).into();

match self.send_request(request).await {
Ok(RpcResponse::Success(result)) => Ok(serde_json::from_value(result.result).unwrap()),
Ok(RpcResponse::Error(e)) => Err(e.error.message),
Err(e) => Err(e.to_string()),
}
}

pub async fn engine_forkchoice_updated_v3(
&self,
state: ForkChoiceState,
payload_attributes: PayloadAttributesV3,
) -> Result<ForkChoiceResponse, String> {
let request = ForkChoiceUpdatedV3 {
fork_choice_state: state,
payload_attributes: Some(payload_attributes),
}
.into();

match self.send_request(request).await {
Ok(RpcResponse::Success(s)) => match serde_json::from_value(s.result.clone()) {
Ok(parsed_value) => Ok(parsed_value),
Err(error) => {
dbg!(s.result);
Err(error.to_string())
}
},
Ok(RpcResponse::Error(e)) => Err(e.error.message),
Err(e) => Err(e.to_string()),
}
}

pub async fn engine_get_payload_v3(
&self,
payload_id: u64,
) -> Result<ExecutionPayloadResponse, String> {
let request = GetPayloadV3Request { payload_id }.into();

match self.send_request(request).await {
Ok(RpcResponse::Success(s)) => Ok(serde_json::from_value(s.result).unwrap()),
Ok(RpcResponse::Error(e)) => Err(e.error.message),
Err(e) => Err(e.to_string()),
}
}

pub async fn engine_new_payload_v3(
&self,
execution_payload: ExecutionPayloadV3,
expected_blob_versioned_hashes: Vec<H256>,
parent_beacon_block_root: H256,
) -> Result<PayloadStatus, String> {
let request = NewPayloadV3Request {
payload: execution_payload,
expected_blob_versioned_hashes,
parent_beacon_block_root,
}
.into();

match self.send_request(request).await {
Ok(RpcResponse::Success(s)) => Ok(serde_json::from_value(s.result).unwrap()),
Ok(RpcResponse::Error(e)) => Err(e.error.message),
Err(e) => Err(e.to_string()),
}
}

fn auth_token(&self) -> String {
// Header
let header = jsonwebtoken::Header::default();
// Claims
let valid_iat = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as usize;
let claims = json!({"iat": valid_iat});
// Encoding Key
let decoded_secret = hex::decode(self.secret.clone()).unwrap();
let encoding_key = jsonwebtoken::EncodingKey::from_secret(decoded_secret.as_ref());
// JWT Token
jsonwebtoken::encode(&header, &claims, &encoding_key).unwrap()
}

fn capabilities() -> Vec<String> {
vec![
"engine_exchangeCapabilities".to_owned(),
"engine_forkchoiceUpdatedV3".to_owned(),
"engine_getPayloadV3".to_owned(),
"engine_newPayloadV3".to_owned(),
]
}
}
13 changes: 11 additions & 2 deletions crates/l2/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
use ethereum_rust_storage::Store;
use std::net::{IpAddr, Ipv4Addr};

pub mod block_producer;
pub mod engine;
pub mod l1_tx_sender;
pub mod l1_watcher;
pub mod proof_data_provider;

pub async fn start_operator() {
pub async fn start_operator(store: Store) {
let l1_tx_sender = tokio::spawn(l1_tx_sender::start_l1_tx_sender());
let l1_watcher = tokio::spawn(l1_watcher::start_l1_watcher());
let block_producer = tokio::spawn(block_producer::start_block_producer());
let current_block_hash = {
let current_block_number = store.get_latest_block_number().unwrap().unwrap();
store
.get_canonical_block_hash(current_block_number)
.unwrap()
.unwrap()
};
let block_producer = tokio::spawn(block_producer::start_block_producer(current_block_hash));
let proof_data_provider = tokio::spawn(proof_data_provider::start_proof_data_provider(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
3000,
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ethereum_rust-net.workspace = true
ethereum_rust-rlp.workspace = true
hex.workspace = true
axum-extra = {version = "0.9.3", features = ["typed-header"]}
jsonwebtoken = "9.3.0"
jsonwebtoken.workspace = true
rand.workspace = true

[dev-dependencies]
Expand Down
14 changes: 14 additions & 0 deletions crates/networking/rpc/engine/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
fork_choice::{ForkChoiceResponse, ForkChoiceState, PayloadAttributesV3},
payload::PayloadStatus,
},
utils::RpcRequest,
RpcErr, RpcHandler,
};

Expand All @@ -22,6 +23,19 @@ pub struct ForkChoiceUpdatedV3 {
pub payload_attributes: Option<PayloadAttributesV3>,
}

impl From<ForkChoiceUpdatedV3> for RpcRequest {
fn from(val: ForkChoiceUpdatedV3) -> Self {
RpcRequest {
method: "engine_forkchoiceUpdatedV3".to_string(),
params: Some(vec![
serde_json::json!(val.fork_choice_state),
serde_json::json!(val.payload_attributes),
]),
..Default::default()
}
}
}

impl RpcHandler for ForkChoiceUpdatedV3 {
fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr> {
let params = params
Expand Down
Loading

0 comments on commit f93dd6a

Please sign in to comment.