Skip to content

Commit

Permalink
fix(l1): fix hangs on the tx spammer (#1413)
Browse files Browse the repository at this point in the history
**Motivation**

The spammer hanged in one of 2 forms, either it wait for tx that weren't
added in the second phase of each spam and timeout after 5 minutes
resuming the execution or it hangs ad infinitum if the issue happens in
the first phase. This indicate that we were losing transactions.

**Description**

This is a mid-term solution, the explanation about what happened is done
on the issue, [in this particular
comment](#1120 (comment)),
but TLDR: we were receiving `engine_getPayload` request with old
`payload_ids` and we filled *AGAIN* the block but with new transactions.
the block was discarded as orphan by consensus and we lost those txs
from mempool on the immediate subsequent request. This was caused
because we were missing the update of the payload when it was modified
in the second step of our build process.

The current solution stores the whole payload, i.e. the block, block
value and the blobs bunde. Given our current implementation (a naive
2-step build process) we either filled the transaction and ended the
build process or not, a simple closed flag was added to the payload
store to signal this and avoid refilling transactions, this is clearer
than just check if there are any but could be changed if preferred.

| Spammer logs             |  Dora explorer |
:-------------------------:|:-------------------------:

![image](https://github.com/user-attachments/assets/f768879b-4bba-41f6-991b-e81abe0531f4)
|
![image](https://github.com/user-attachments/assets/bd642c92-4a99-42fa-b99d-bc9036b14fff)

**Next Steps**
Enhance the build process to make it async and interactive instead of
the naive 2-step all-or-nothing implementation we had right now.


<!-- Link to issues: Resolves #111, Resolves #222 -->

Resolves #1120
  • Loading branch information
rodrigo-o authored Dec 10, 2024
1 parent dd680ee commit 3c05f02
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 27 deletions.
12 changes: 7 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,20 @@ checkout-ethereum-package: ethereum-package ## 📦 Checkout specific Ethereum p
git fetch && \
git checkout $(ETHEREUM_PACKAGE_REVISION)

ENCLAVE ?= lambdanet

localnet: stop-localnet-silent build-image checkout-ethereum-package ## 🌐 Start local network
kurtosis run --enclave lambdanet ethereum-package --args-file test_data/network_params.yaml
kurtosis run --enclave $(ENCLAVE) ethereum-package --args-file test_data/network_params.yaml
docker logs -f $$(docker ps -q --filter ancestor=ethrex)

stop-localnet: ## 🛑 Stop local network
kurtosis enclave stop lambdanet
kurtosis enclave rm lambdanet --force
kurtosis enclave stop $(ENCLAVE)
kurtosis enclave rm $(ENCLAVE) --force

stop-localnet-silent:
@echo "Double checking local net is not already started..."
@kurtosis enclave stop lambdanet >/dev/null 2>&1 || true
@kurtosis enclave rm lambdanet --force >/dev/null 2>&1 || true
@kurtosis enclave stop $(ENCLAVE) >/dev/null 2>&1 || true
@kurtosis enclave rm $(ENCLAVE) --force >/dev/null 2>&1 || true

HIVE_REVISION := f220e0c55fb222aaaffdf17d66aa0537cd16a67a
# Shallow clones can't specify a single revision, but at least we avoid working
Expand Down
4 changes: 4 additions & 0 deletions crates/common/types/blobs_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ fn verify_blob_kzg_proof(
}

impl BlobsBundle {
pub fn empty() -> Self {
Self::default()
}

// In the future we might want to provide a new method that calculates the commitments and proofs using the following.
#[cfg(feature = "c-kzg")]
pub fn create_from_blobs(blobs: &Vec<Blob>) -> Result<Self, BlobsBundleError> {
Expand Down
30 changes: 26 additions & 4 deletions crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,23 +355,45 @@ impl GetPayloadRequest {

fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
info!("Requested payload with id: {:#018x}", self.payload_id);
let Some(mut payload) = context.storage.get_payload(self.payload_id)? else {

let payload = context.storage.get_payload(self.payload_id)?;

let Some((mut payload_block, block_value, blobs_bundle, completed)) = payload else {
return Err(RpcErr::UnknownPayload(format!(
"Payload with id {:#018x} not found",
self.payload_id
)));
};

// Check timestamp matches valid fork
let chain_config = &context.storage.get_chain_config()?;
let current_fork = chain_config.get_fork(payload.header.timestamp);
let current_fork = chain_config.get_fork(payload_block.header.timestamp);
info!("Current Fork: {:?}", current_fork);
if current_fork != self.valid_fork() {
return Err(RpcErr::UnsuportedFork(format!("{current_fork:?}")));
}

let (blobs_bundle, block_value) = build_payload(&mut payload, &context.storage)
if completed {
return serde_json::to_value(self.build_response(
ExecutionPayload::from_block(payload_block),
blobs_bundle,
block_value,
))
.map_err(|error| RpcErr::Internal(error.to_string()));
}

let (blobs_bundle, block_value) = build_payload(&mut payload_block, &context.storage)
.map_err(|err| RpcErr::Internal(err.to_string()))?;
let execution_payload = ExecutionPayload::from_block(payload);

context.storage.update_payload(
self.payload_id,
payload_block.clone(),
block_value,
blobs_bundle.clone(),
true,
)?;

let execution_payload = ExecutionPayload::from_block(payload_block);

serde_json::to_value(self.build_response(execution_payload, blobs_bundle, block_value))
.map_err(|error| RpcErr::Internal(error.to_string()))
Expand Down
17 changes: 15 additions & 2 deletions crates/storage/store/engines/api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use bytes::Bytes;
use ethereum_types::{H256, U256};
use ethrex_core::types::{
Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt, Transaction,
BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index,
Receipt, Transaction,
};
use std::{fmt::Debug, panic::RefUnwindSafe};

Expand Down Expand Up @@ -216,5 +217,17 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {

fn add_payload(&self, payload_id: u64, block: Block) -> Result<(), StoreError>;

fn get_payload(&self, payload_id: u64) -> Result<Option<Block>, StoreError>;
fn get_payload(
&self,
payload_id: u64,
) -> Result<Option<(Block, U256, BlobsBundle, bool)>, StoreError>;

fn update_payload(
&self,
payload_id: u64,
block: Block,
block_value: U256,
blobs_bundle: BlobsBundle,
completed: bool,
) -> Result<(), StoreError>;
}
28 changes: 24 additions & 4 deletions crates/storage/store/engines/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::StoreError;
use bytes::Bytes;
use ethereum_types::{H256, U256};
use ethrex_core::types::{
Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt,
BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt,
};
use ethrex_trie::{InMemoryTrieDB, Trie};
use std::{
Expand Down Expand Up @@ -36,7 +36,7 @@ struct StoreInner {
// TODO (#307): Remove TotalDifficulty.
block_total_difficulties: HashMap<BlockHash, U256>,
// Stores local blocks by payload id
payloads: HashMap<u64, Block>,
payloads: HashMap<u64, (Block, U256, BlobsBundle, bool)>,
pending_blocks: HashMap<BlockHash, Block>,
}

Expand Down Expand Up @@ -341,13 +341,33 @@ impl StoreEngine for Store {
}

fn add_payload(&self, payload_id: u64, block: Block) -> Result<(), StoreError> {
self.inner().payloads.insert(payload_id, block);
self.inner().payloads.insert(
payload_id,
(block, U256::zero(), BlobsBundle::empty(), false),
);
Ok(())
}

fn get_payload(&self, payload_id: u64) -> Result<Option<Block>, StoreError> {
fn get_payload(
&self,
payload_id: u64,
) -> Result<Option<(Block, U256, BlobsBundle, bool)>, StoreError> {
Ok(self.inner().payloads.get(&payload_id).cloned())
}

fn update_payload(
&self,
payload_id: u64,
block: Block,
block_value: U256,
blobs_bundle: BlobsBundle,
completed: bool,
) -> Result<(), StoreError> {
self.inner()
.payloads
.insert(payload_id, (block, block_value, blobs_bundle, completed));
Ok(())
}
}

impl Debug for Store {
Expand Down
31 changes: 26 additions & 5 deletions crates/storage/store/engines/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use anyhow::Result;
use bytes::Bytes;
use ethereum_types::{H256, U256};
use ethrex_core::types::{
Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt, Transaction,
BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index,
Receipt, Transaction,
};
use ethrex_rlp::decode::RLPDecode;
use ethrex_rlp::encode::RLPEncode;
Expand Down Expand Up @@ -351,13 +352,33 @@ impl StoreEngine for Store {
}

fn add_payload(&self, payload_id: u64, block: Block) -> Result<(), StoreError> {
self.write::<Payloads>(payload_id, block.into())
self.write::<Payloads>(
payload_id,
(block, U256::zero(), BlobsBundle::empty(), false).into(),
)
}

fn get_payload(&self, payload_id: u64) -> Result<Option<Block>, StoreError> {
fn get_payload(
&self,
payload_id: u64,
) -> Result<Option<(Block, U256, BlobsBundle, bool)>, StoreError> {
Ok(self.read::<Payloads>(payload_id)?.map(|b| b.to()))
}

fn update_payload(
&self,
payload_id: u64,
block: Block,
block_value: U256,
blobs_bundle: BlobsBundle,
completed: bool,
) -> std::result::Result<(), StoreError> {
self.write::<Payloads>(
payload_id,
(block, block_value, blobs_bundle, completed).into(),
)
}

fn get_transaction_by_hash(
&self,
transaction_hash: H256,
Expand Down Expand Up @@ -492,8 +513,8 @@ table!(
// Local Blocks

table!(
/// payload id to payload block table
( Payloads ) u64 => BlockRLP
/// payload id to payload table
( Payloads ) u64 => Rlp<(Block, U256, BlobsBundle, bool)>
);

table!(
Expand Down
34 changes: 28 additions & 6 deletions crates/storage/store/engines/redb.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{borrow::Borrow, panic::RefUnwindSafe, sync::Arc};

use ethrex_core::types::BlockBody;
use ethrex_core::U256;
use ethrex_core::{
types::{Block, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt},
H256,
types::{BlobsBundle, Block, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt},
H256, U256,
};
use ethrex_rlp::decode::RLPDecode;
use ethrex_rlp::encode::RLPEncode;
Expand Down Expand Up @@ -45,7 +44,8 @@ pub const STORAGE_TRIE_NODES_TABLE: MultimapTableDefinition<([u8; 32], [u8; 33])
MultimapTableDefinition::new("StorageTrieNodes");
const CHAIN_DATA_TABLE: TableDefinition<ChainDataIndex, Vec<u8>> =
TableDefinition::new("ChainData");
const PAYLOADS_TABLE: TableDefinition<BlockNumber, BlockRLP> = TableDefinition::new("Payloads");
const PAYLOADS_TABLE: TableDefinition<BlockNumber, Rlp<(Block, U256, BlobsBundle, bool)>> =
TableDefinition::new("Payloads");
const PENDING_BLOCKS_TABLE: TableDefinition<BlockHashRLP, BlockRLP> =
TableDefinition::new("PendingBlocks");
const TRANSACTION_LOCATIONS_TABLE: MultimapTableDefinition<
Expand Down Expand Up @@ -532,15 +532,37 @@ impl StoreEngine for RedBStore {
self.write(
PAYLOADS_TABLE,
payload_id,
<Block as Into<BlockRLP>>::into(block),
<(Block, U256, BlobsBundle, bool) as Into<Rlp<(Block, U256, BlobsBundle, bool)>>>::into(
(block, U256::zero(), BlobsBundle::empty(), false),
),
)
}

fn get_payload(&self, payload_id: u64) -> Result<Option<Block>, StoreError> {
fn get_payload(
&self,
payload_id: u64,
) -> Result<Option<(Block, U256, BlobsBundle, bool)>, StoreError> {
Ok(self
.read(PAYLOADS_TABLE, payload_id)?
.map(|b| b.value().to()))
}

fn update_payload(
&self,
payload_id: u64,
block: Block,
block_value: U256,
blobs_bundle: BlobsBundle,
completed: bool,
) -> Result<(), StoreError> {
self.write(
PAYLOADS_TABLE,
payload_id,
<(Block, U256, BlobsBundle, bool) as Into<Rlp<(Block, U256, BlobsBundle, bool)>>>::into(
(block, block_value, blobs_bundle, completed),
),
)
}
}

impl redb::Value for ChainDataIndex {
Expand Down
17 changes: 16 additions & 1 deletion crates/storage/store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,10 +910,25 @@ impl Store {
self.engine.add_payload(payload_id, block)
}

pub fn get_payload(&self, payload_id: u64) -> Result<Option<Block>, StoreError> {
pub fn get_payload(
&self,
payload_id: u64,
) -> Result<Option<(Block, U256, BlobsBundle, bool)>, StoreError> {
self.engine.get_payload(payload_id)
}

pub fn update_payload(
&self,
payload_id: u64,
block: Block,
block_value: U256,
blobs_bundle: BlobsBundle,
completed: bool,
) -> Result<(), StoreError> {
self.engine
.update_payload(payload_id, block, block_value, blobs_bundle, completed)
}

/// Creates a new state trie with an empty state root, for testing purposes only
pub fn new_state_trie_for_test(&self) -> Trie {
self.engine.open_state_trie(*EMPTY_TRIE_HASH)
Expand Down

0 comments on commit 3c05f02

Please sign in to comment.