Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(l1): fix hive LargeTxRequest test #1480

Merged
merged 28 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
43ca6ef
receive new pooled transaction hashes message
Arkenan Nov 28, 2024
2d300e5
Merge remote-tracking branch 'origin/main' into new-pooled-transactio…
Arkenan Nov 28, 2024
c2474e8
Add pooled transactions messages roundtrip
Arkenan Dec 2, 2024
0b1e9c8
fix linter errors
Arkenan Dec 2, 2024
d3b503b
Fix new pooled hashes decoding error, add new pooled txs hive test to ci
Arkenan Dec 4, 2024
2d25ba4
merge with main
Arkenan Dec 4, 2024
13b7ea6
bump github versions to comply with linter
Arkenan Dec 4, 2024
dc6b54f
Merge remote-tracking branch 'origin/main' into new-pooled-transactio…
Arkenan Dec 4, 2024
eaf4192
Merge remote-tracking branch 'origin/main' into new-pooled-transactio…
Arkenan Dec 5, 2024
bdffc38
merge with main, erase integration yaml
Arkenan Dec 5, 2024
4ae8b37
readd pooled tx test
Arkenan Dec 5, 2024
a3d9b38
address comments
Arkenan Dec 5, 2024
4ef8b85
Merge remote-tracking branch 'origin/main' into new-pooled-transactio…
Arkenan Dec 9, 2024
f6e4f12
Add handling of GetPooledTransaction message
Arkenan Dec 9, 2024
ee5b184
wip: move p2p transactions type to common
Arkenan Dec 10, 2024
de923b0
wip, p2p transaction to reg transaction
Arkenan Dec 11, 2024
153fbe8
compiling
Arkenan Dec 11, 2024
6b02dee
make clippy happy
Arkenan Dec 11, 2024
c46cdb2
Add new passing test
Arkenan Dec 11, 2024
2974580
Debugging large tx issue
Arkenan Dec 11, 2024
b5d1a12
improve messages
Arkenan Dec 19, 2024
d8d78e5
Remove transactions limit on decode
Arkenan Jan 3, 2025
755e644
Add LargeTxRequest passing hive test
Arkenan Jan 3, 2025
b497a87
remove debug messages
Arkenan Jan 3, 2025
2e49bc2
make clippy happy
Arkenan Jan 3, 2025
a2236ac
merge with main
Arkenan Jan 3, 2025
856de5f
merge with main
Arkenan Jan 3, 2025
675a395
Merge branch 'main' into larget_tx_request
Arkenan Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ jobs:
test_pattern: /AccountRange|StorageRanges|ByteCodes|TrieNodes
- name: "Devp2p eth tests"
simulation: devp2p
test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs|GetBlockReceipts|BlobViolations
test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs|GetBlockReceipts|BlobViolations|LargeTxRequest
- name: "Engine Auth and EC tests"
simulation: ethereum/engine
test_pattern: engine-(auth|exchange-capabilities)/
Expand Down
2 changes: 2 additions & 0 deletions crates/common/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl RLPEncode for P2PTransaction {
impl RLPDecode for P2PTransaction {
fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
if is_encoded_as_bytes(rlp)? {
// Adjust the encoding to get the payload
let payload = get_rlp_bytes_item_payload(rlp)?;
let tx_type = payload.first().ok_or(RLPDecodeError::InvalidLength)?;
let tx_encoding = &payload.get(1..).ok_or(RLPDecodeError::InvalidLength)?;
Expand All @@ -93,6 +94,7 @@ impl RLPDecode for P2PTransaction {
// EIP4844
0x3 => WrappedEIP4844Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::EIP4844TransactionWithBlobs(tx), rem)),

// PriviligedL2
0x7e => PrivilegedL2Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::PrivilegedL2Transaction(tx), rem)),
Expand Down
25 changes: 15 additions & 10 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
reason: self.match_disconnect_reason(&error),
}))
.await
.unwrap_or_else(|e| debug!("Could not send Disconnect message: ({e})"));
.unwrap_or_else(|e| error!("Could not send Disconnect message: ({e})."));
if let Ok(node_id) = self.get_remote_node_id() {
// Discard peer from kademlia table
debug!("{error_text}: ({error}), discarding peer {node_id}");
error!("{error_text}: ({error}), discarding peer {node_id}");
table.lock().await.replace_peer(node_id);
} else {
debug!("{error_text}: ({error}), unknown peer")
error!("{error_text}: ({error}), unknown peer")
}
}

Expand Down Expand Up @@ -345,15 +345,13 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
return Err(RLPxError::Disconnect());
}
Message::Ping(_) => {
debug!("Received Ping");
self.send(Message::Pong(PongMessage {})).await?;
debug!("Pong sent");
}
Message::Pong(_) => {
// We ignore received Pong messages
}
Message::Status(msg_data) if !peer_supports_eth => {
debug!("Received Status");
backend::validate_status(msg_data, &self.storage)?
}
Message::GetAccountRange(req) => {
Expand Down Expand Up @@ -608,18 +606,25 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
let mut buf = vec![0; MAX_DISC_PACKET_SIZE];

// Read the message's size
self.stream
.read_exact(&mut buf[..2])
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
self.stream.read_exact(&mut buf[..2]).await.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read handshake message size: {}",
e
))
})?;
let ack_data = [buf[0], buf[1]];
let msg_size = u16::from_be_bytes(ack_data) as usize;

// Read the rest of the message
self.stream
.read_exact(&mut buf[2..msg_size + 2])
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read the rest of the handshake message: {}.",
e
))
})?;
let ack_bytes = &buf[..msg_size + 2];
Ok(ack_bytes.to_vec())
}
Expand Down
12 changes: 3 additions & 9 deletions crates/networking/p2p/rlpx/eth/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ use crate::rlpx::{
pub(crate) struct Transactions {
pub(crate) transactions: Vec<Transaction>,
}
// TODO(#1132): Also limit transactions by message byte-size.
// Limit taken from here: https://github.com/ethereum/go-ethereum/blob/df182a742cec68adcc034d4747afa5182fc75ca3/eth/fetcher/tx_fetcher.go#L49
pub const TRANSACTION_LIMIT: usize = 256;

impl Transactions {
pub fn new(transactions: Vec<Transaction>) -> Self {
Expand Down Expand Up @@ -55,12 +52,8 @@ impl RLPxMessage for Transactions {
// or so it seems.
while let Ok((tx, updated_decoder)) = decoder.decode_field::<Transaction>("p2p transaction")
{
if transactions.len() > TRANSACTION_LIMIT {
break;
} else {
decoder = updated_decoder;
transactions.push(tx);
}
decoder = updated_decoder;
transactions.push(tx);
}
Ok(Self::new(transactions))
}
Expand Down Expand Up @@ -256,6 +249,7 @@ impl PooledTransactions {
}

/// Saves every incoming pooled transaction to the mempool.

pub fn handle(self, store: &Store) -> Result<(), MempoolError> {
for tx in self.pooled_transactions {
if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx {
Expand Down
20 changes: 12 additions & 8 deletions crates/networking/p2p/rlpx/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ pub(crate) async fn read<S: AsyncRead + std::marker::Unpin>(

// Receive the message's frame header
let mut frame_header = [0; 32];
stream
.read_exact(&mut frame_header)
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
stream.read_exact(&mut frame_header).await.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read message frame header: {}",
e
))
})?;
// Both are padded to the block's size (16 bytes)
let (header_ciphertext, header_mac) = frame_header.split_at_mut(16);

Expand Down Expand Up @@ -138,10 +140,12 @@ pub(crate) async fn read<S: AsyncRead + std::marker::Unpin>(
// Receive the hello message
let padded_size = frame_size.next_multiple_of(16);
let mut frame_data = vec![0; padded_size + 16];
stream
.read_exact(&mut frame_data)
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
stream.read_exact(&mut frame_data).await.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read the hello message: {}",
e
))
})?;
let (frame_ciphertext, frame_mac) = frame_data.split_at_mut(padded_size);

// check MAC
Expand Down
Loading