Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch transactions import #1349

Merged
merged 103 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
6ff5c66
Update dependent costs in schema
bvrooman Aug 29, 2023
fc28088
Fix whitespace
bvrooman Aug 29, 2023
1801d8a
Update CHANGELOG.md
bvrooman Aug 29, 2023
1fc2c59
Merge branch 'master' into bvrooman/fix/update-dependent-cost-schema
bvrooman Aug 31, 2023
d5908e0
Update CHANGELOG.md
bvrooman Aug 31, 2023
a3cd09c
Revert "Update CHANGELOG.md"
bvrooman Aug 31, 2023
c51fb6f
Merge branch 'master' into bvrooman/fix/update-dependent-cost-schema
Sep 5, 2023
1ce767a
Refactoring
bvrooman Sep 5, 2023
0869dec
WIP
bvrooman Sep 6, 2023
250e19c
Migrate to canonical serialization
bvrooman Sep 6, 2023
5c4b85b
Migrate to canonical serialization
bvrooman Sep 6, 2023
0f2b81f
Update snapshots
bvrooman Sep 6, 2023
d6fcfac
Update chainspecs with dummy costs
bvrooman Sep 6, 2023
052fcf2
Merge branch 'bvrooman/fix/update-dependent-cost-schema' into bvrooma…
bvrooman Sep 6, 2023
b913c45
Merge branch 'master' into bvrooman/feat/batch-transactions-import
bvrooman Sep 12, 2023
5edbf53
WIP
bvrooman Sep 13, 2023
e33c070
WIP
bvrooman Sep 13, 2023
e47349c
WIP
bvrooman Sep 14, 2023
1722699
WIP
bvrooman Sep 14, 2023
f96ea14
WIP
bvrooman Sep 15, 2023
a010f4f
Merge branch 'master' into bvrooman/feat/batch-transactions-import
Sep 15, 2023
f5e695f
WIP
bvrooman Sep 15, 2023
347b15e
Update tests.rs
bvrooman Sep 15, 2023
d103e0f
WIP Tests
bvrooman Sep 15, 2023
2e4bdeb
WIP
bvrooman Sep 15, 2023
b0e495a
WIP
bvrooman Sep 16, 2023
0fd4500
WIP
bvrooman Sep 16, 2023
6bb319e
Update import.rs
bvrooman Sep 16, 2023
95764ba
WIP
bvrooman Sep 16, 2023
7bfa573
Update tests.rs
bvrooman Sep 16, 2023
b4266bb
WIP
bvrooman Sep 16, 2023
84001cc
WIP
bvrooman Sep 16, 2023
cec91d8
Update import.rs
bvrooman Sep 16, 2023
f58defe
Update tests.rs
bvrooman Sep 16, 2023
00a682f
WIP
bvrooman Sep 18, 2023
80ab2dc
WIP
bvrooman Sep 19, 2023
90c87b0
WIP
bvrooman Sep 19, 2023
02b5a00
WIP
bvrooman Sep 19, 2023
433c033
WIP
bvrooman Sep 20, 2023
8203611
WIP
bvrooman Sep 20, 2023
6115eb2
All tests passing
bvrooman Sep 20, 2023
2865aed
Merge branch 'master' into bvrooman/feat/batch-transactions-import
bvrooman Sep 20, 2023
7723dbb
Merge branch 'master' into bvrooman/feat/batch-transactions-import
Sep 20, 2023
3ab36c5
Update CHANGELOG.md
bvrooman Sep 20, 2023
620e215
Merge branch 'master' into bvrooman/feat/batch-transactions-import
Sep 20, 2023
63abe76
Update
bvrooman Sep 20, 2023
62e4f78
Merge branch 'bvrooman/feat/batch-transactions-import' of https://git…
bvrooman Sep 20, 2023
d3b517f
Clean up
bvrooman Sep 20, 2023
be2c6e2
WIP
bvrooman Sep 21, 2023
ae2bbc7
Update import.rs
bvrooman Sep 21, 2023
a2bdcff
WIP
bvrooman Sep 23, 2023
8f17f5c
WIP
bvrooman Sep 23, 2023
117da4a
WIP
bvrooman Sep 23, 2023
32b4baf
Rename TransactionData to Transactions
bvrooman Sep 23, 2023
713ce61
Merge branch 'master' into bvrooman/feat/batch-transactions-import
Sep 23, 2023
497e48c
Update service.rs
bvrooman Sep 23, 2023
79573f3
Minor
bvrooman Sep 23, 2023
69df941
Clippy
bvrooman Sep 23, 2023
76b475e
Update sync.rs
bvrooman Sep 23, 2023
4c129d3
Fix import tests
bvrooman Sep 24, 2023
7ee4304
Tests WIP
bvrooman Sep 24, 2023
4db827d
Serialization WIP
bvrooman Sep 25, 2023
d377593
Update postcard.rs
bvrooman Sep 25, 2023
6483a32
Integration tests passing
bvrooman Sep 25, 2023
5c72d46
Use range instead of blockids
bvrooman Sep 25, 2023
cc3426d
Merge branch 'master' into bvrooman/feat/batch-transactions-import
bvrooman Sep 25, 2023
94fd510
Update sealed_block.rs
bvrooman Sep 25, 2023
1c312b2
Merge branch 'master' into bvrooman/feat/batch-transactions-import
Sep 26, 2023
428b712
Remove dbg
bvrooman Sep 26, 2023
6d65b0c
Remove dbg
bvrooman Sep 26, 2023
5dceeec
References instead of clone
bvrooman Sep 27, 2023
835f823
Something
xgreenx Sep 27, 2023
d002c53
Batch
bvrooman Sep 28, 2023
90e4ab6
Clippy says implicit lifetimes
bvrooman Sep 28, 2023
7e2eef0
Clean up
bvrooman Sep 28, 2023
8a7a65a
More clippy
bvrooman Sep 28, 2023
721a098
Minor
bvrooman Sep 28, 2023
33e76fd
Merge branch 'master' into bvrooman/feat/batch-transactions-import
Sep 28, 2023
72ef01e
Minor
bvrooman Sep 28, 2023
61bf616
Merge branch 'bvrooman/feat/batch-transactions-import' of https://git…
bvrooman Sep 28, 2023
f3d7a3b
Minor
bvrooman Sep 28, 2023
5798cbc
Instrument
bvrooman Sep 28, 2023
af2d696
Import tests passing
bvrooman Sep 29, 2023
aa63098
All tests passing
bvrooman Sep 29, 2023
3a656d3
Clean up
bvrooman Sep 29, 2023
cfd3006
Restore random peer id from header request
bvrooman Sep 29, 2023
c0b4d1b
WIP
bvrooman Sep 29, 2023
d782a8a
Clean up
bvrooman Sep 29, 2023
c49c1c9
Clean up
bvrooman Sep 29, 2023
45b8376
Clean up
bvrooman Sep 29, 2023
e60a1cd
Clean up
bvrooman Sep 29, 2023
895a7c9
Clean up
bvrooman Sep 29, 2023
bde0022
Update test
bvrooman Sep 29, 2023
b6c8241
Rename
bvrooman Sep 29, 2023
07962aa
Clean up
bvrooman Sep 29, 2023
4ed6aef
Remove redundant test
bvrooman Sep 29, 2023
0a035ed
Parameterize tests + additional test
bvrooman Sep 29, 2023
59cc41c
Fix comment
bvrooman Sep 29, 2023
d5167be
Fix
bvrooman Sep 29, 2023
09bc753
Remove `Result` from streams and use `Batch::is_err` (#1391)
xgreenx Oct 2, 2023
ad477ca
Merge branch 'master' into bvrooman/feat/batch-transactions-import
xgreenx Oct 2, 2023
be48776
Merge branch 'master' into bvrooman/feat/batch-transactions-import
xgreenx Oct 2, 2023
4e40e15
Review feedback
bvrooman Oct 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Description of the upcoming release here.

### Changed

- [#1349](https://github.com/FuelLabs/fuel-core/pull/1349): Updated peer-to-peer transactions API to support multiple blocks in a single request, and updated block synchronization to request multiple blocks based on the configured range of headers.
- [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`.
- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`.
- [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ anyhow = "1.0"
async-trait = "0.1"
cynic = { version = "2.2.1", features = ["http-reqwest"] }
clap = "4.1"
derive_more = { version = "0.99" }
hyper = { version = "0.14.26" }
rand = "0.8"
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ description = "Tx client and schema specification."
[dependencies]
anyhow = { workspace = true }
cynic = { workspace = true }
derive_more = { version = "0.99" }
derive_more = { workspace = true }
eventsource-client = { version = "0.10.2", optional = true }
fuel-core-types = { workspace = true, features = ["serde"] }
futures = { workspace = true, optional = true }
Expand Down
23 changes: 16 additions & 7 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use fuel_core_types::{
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand Down Expand Up @@ -127,12 +127,21 @@ impl Database {
}
}

pub fn get_transactions_on_block(
pub fn get_transactions_on_blocks(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
Ok(self
.get_sealed_block_by_id(block_id)?
.map(|Sealed { entity: block, .. }| block.into_inner().1))
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
let transactions = block_height_range
.into_iter()
.map(BlockHeight::from)
.map(|block_height| {
let transactions = self
.get_sealed_block_by_height(&block_height)?
.map(|Sealed { entity: block, .. }| block.into_inner().1)
.map(Transactions);
Ok(transactions)
})
.collect::<StorageResult<_>>()?;
Ok(transactions)
}
}
9 changes: 4 additions & 5 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand Down Expand Up @@ -41,9 +40,9 @@ impl P2pDb for Database {

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
self.get_transactions_on_block(block_id)
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
self.get_transactions_on_blocks(block_height_range)
}
}

Expand Down
43 changes: 19 additions & 24 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ use fuel_core_sync::ports::{
};
use fuel_core_types::{
blockchain::{
primitives::{
BlockId,
DaBlockHeight,
},
primitives::DaBlockHeight,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::{
peer_reputation::{
Expand All @@ -28,6 +24,7 @@ use fuel_core_types::{
},
PeerId,
SourcePeer,
Transactions,
},
};
use std::ops::Range;
Expand All @@ -50,43 +47,41 @@ impl PeerToPeerPort for P2PAdapter {

async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
block_height_range: Range<u32>,
) -> anyhow::Result<SourcePeer<Option<Vec<SealedBlockHeader>>>> {
if let Some(service) = &self.service {
let (peer_id, headers) =
service.get_sealed_block_headers(block_range_height).await?;
let sourced_headers = SourcePeer {
peer_id: peer_id.into(),
data: headers,
};
Ok(sourced_headers)
let result = if let Some(service) = &self.service {
service.get_sealed_block_headers(block_height_range).await
} else {
Err(anyhow::anyhow!("No P2P service available"))
};
match result {
Ok((peer_id, headers)) => {
let peer_id: PeerId = peer_id.into();
let headers = peer_id.bind(headers);
Ok(headers)
}
Err(err) => Err(err),
}
}

async fn get_transactions(
&self,
block: SourcePeer<BlockId>,
) -> anyhow::Result<Option<Vec<Transaction>>> {
range: SourcePeer<Range<u32>>,
) -> anyhow::Result<Option<Vec<Transactions>>> {
let SourcePeer {
peer_id,
data: block,
} = block;
data: range,
} = range;
if let Some(service) = &self.service {
service
.get_transactions_from_peer(peer_id.into(), block)
.get_transactions_from_peer(peer_id.into(), range)
.await
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
}

async fn report_peer(
&self,
peer: PeerId,
report: PeerReportReason,
) -> anyhow::Result<()> {
fn report_peer(&self, peer: PeerId, report: PeerReportReason) -> anyhow::Result<()> {
if let Some(service) = &self.service {
let service_name = "Sync";
let new_report = self.process_report(report);
Expand Down
5 changes: 2 additions & 3 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,12 @@ impl ProtocolName for MessageExchangePostcardProtocol {

#[cfg(test)]
mod tests {
use fuel_core_types::blockchain::primitives::BlockId;

use super::*;

#[test]
fn test_request_size_fits() {
let m = RequestMessage::Transactions(BlockId::default());
let arbitrary_range = 2..6;
let m = RequestMessage::Transactions(arbitrary_range);
assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE);
}
}
23 changes: 14 additions & 9 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,17 @@ mod tests {
BlockHeader,
PartialBlockHeader,
},
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::{
Transaction,
TransactionBuilder,
},
services::p2p::GossipsubMessageAcceptance,
services::p2p::{
GossipsubMessageAcceptance,
Transactions,
},
};
use futures::{
future::join_all,
Expand Down Expand Up @@ -1557,7 +1559,7 @@ mod tests {
tokio::select! {
message_sent = rx_test_end.recv() => {
// we received a signal to end the test
assert!(message_sent.unwrap(), "Receuved incorrect or missing missing messsage");
assert!(message_sent.unwrap(), "Received incorrect or missing message");
break;
}
node_a_event = node_a.next_event() => {
Expand Down Expand Up @@ -1604,7 +1606,7 @@ mod tests {
}
});
}
RequestMessage::Transactions(_) => {
RequestMessage::Transactions(_range) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Transactions(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();
Expand All @@ -1613,7 +1615,8 @@ mod tests {
let response_message = rx_orchestrator.await;

if let Ok(Some(transactions)) = response_message {
let _ = tx_test_end.send(transactions.len() == 5).await;
let check = transactions.len() == 1 && transactions[0].0.len() == 5;
let _ = tx_test_end.send(check).await;
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
Expand Down Expand Up @@ -1647,7 +1650,8 @@ mod tests {
let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeaders(Some(sealed_headers)));
}
RequestMessage::Transactions(_) => {
let transactions = (0..5).map(|_| Transaction::default_test_tx()).collect();
let txs = (0..5).map(|_| Transaction::default_test_tx()).collect();
let transactions = vec![Transactions(txs)];
let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions))));
}
}
Expand All @@ -1662,8 +1666,8 @@ mod tests {
#[tokio::test]
#[instrument]
async fn request_response_works_with_transactions() {
request_response_works_with(RequestMessage::Transactions(BlockId::default()))
.await
let arbitrary_range = 2..6;
request_response_works_with(RequestMessage::Transactions(arbitrary_range)).await
}

#[tokio::test]
Expand All @@ -1675,7 +1679,8 @@ mod tests {
#[tokio::test]
#[instrument]
async fn request_response_works_with_sealed_headers_range_inclusive() {
request_response_works_with(RequestMessage::SealedHeaders(2..6)).await
let arbitrary_range = 2..6;
request_response_works_with(RequestMessage::SealedHeaders(arbitrary_range)).await
}

#[tokio::test]
Expand Down
7 changes: 3 additions & 4 deletions crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand All @@ -29,8 +28,8 @@ pub trait P2pDb: Send + Sync {

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>>;
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>>;
}

pub trait BlockHeightImporter: Send + Sync {
Expand Down
18 changes: 6 additions & 12 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@ use std::{

use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use libp2p::PeerId;
use serde::{
Deserialize,
Serialize,
};
use serde_with::{
serde_as,
FromInto,
};
use thiserror::Error;
use tokio::sync::oneshot;

Expand All @@ -34,33 +29,32 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>(
// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format.
// The Peer that requested the message receives the response over the wire in `NetworkResponse` format.
// It then unpacks it into `ResponseMessage`.
// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receving channel.
// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receiving channel.
// Client Peer: `RequestMessage` (send request)
// Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response)
// Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response)

#[serde_as]
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)]
pub enum RequestMessage {
Block(BlockHeight),
SealedHeaders(Range<u32>),
Transactions(#[serde_as(as = "FromInto<[u8; 32]>")] BlockId),
Transactions(Range<u32>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk if this is a helpful question, but why aren't we just getting the blocks directly rather than the headers + txs separately in the import code? I can't think of what requirement we have to do them separate, but very possible I'm just forgetting something.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The block itself can be very huge while being invalid at the same time because the header is invalid(the block producer or consensus did not sign it).

Requesting headers first ensures that at least this header(and a related block to it) was created according to the rules. It helps to identify potential forks and prevent DoS vectors. Punishment for sending the wrong header should be less than sending an invalid transaction related to this header because, in the second case, it is a malicious move, while invalid headers are not so problematic or may be related to the fork or misconfiguration.

Copy link
Contributor Author

@bvrooman bvrooman Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a fair question. During the import, we require a few checks for data integrity. Some of those include:

  • The signature of a sealed block header must be valid
  • A Header must have the expected height (as per the requested range)
  • The Merkle root formed by the downloaded transactions must match the root in the header

If any of these checks fail for a given header, then the block is invalid, and we don't want to execute it. A failure could mean that the data is corrupt or the peer is acting maliciously.

If we didn't need any of these checks, it is conceivable that we could execute and commit a block immediately. In that case, we would just download the block for execution. But these checks mitigate the consequences of adversarial behaviour.

Let me know if that answers your question satisfactorily.

}

/// Final Response Message that p2p service sends to the Orchestrator
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ResponseMessage {
SealedBlock(Box<Option<SealedBlock>>),
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Vec<Transaction>>),
Transactions(Option<Vec<Transactions>>),
}

/// Holds oneshot channels for specific responses
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(oneshot::Sender<Option<SealedBlock>>),
SealedHeaders(oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(oneshot::Sender<Option<Vec<Transaction>>>),
Transactions(oneshot::Sender<Option<Vec<Transactions>>>),
}

/// Response that is sent over the wire
Expand All @@ -78,7 +72,7 @@ pub enum NetworkResponse {
pub enum OutboundResponse {
Block(Option<Arc<SealedBlock>>),
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Arc<Vec<Transaction>>>),
Transactions(Option<Arc<Vec<Transactions>>>),
}

#[derive(Debug, Error)]
Expand Down
Loading
Loading