From 2301e1529a0edc213cb8ca7d594ff57287aa8f3e Mon Sep 17 00:00:00 2001 From: Tom Date: Fri, 20 Jan 2023 10:30:49 +1100 Subject: [PATCH] fuel-core-sync (#889) - Closes #881 - Closes #882 - Closes #880 Co-authored-by: green --- .gitignore | 3 +- Cargo.lock | 11 +- crates/services/Cargo.toml | 9 +- crates/services/src/lib.rs | 14 + crates/services/src/service.rs | 23 + crates/services/src/state.rs | 4 + crates/services/sync/Cargo.toml | 17 +- crates/services/sync/src/config.rs | 2 - crates/services/sync/src/import.rs | 363 +++++++++++ .../sync/src/import/back_pressure_tests.rs | 273 ++++++++ crates/services/sync/src/import/tests.rs | 605 ++++++++++++++++++ crates/services/sync/src/lib.rs | 11 +- crates/services/sync/src/ports.rs | 59 ++ crates/services/sync/src/service.rs | 221 +++++-- crates/services/sync/src/service/tests.rs | 70 ++ crates/services/sync/src/state.rs | 212 ++++++ crates/services/sync/src/state/test.rs | 125 ++++ crates/services/sync/src/sync.rs | 62 ++ crates/services/sync/src/sync/tests.rs | 24 + crates/types/src/blockchain/block.rs | 17 + crates/types/src/blockchain/header.rs | 23 +- crates/types/src/services/p2p.rs | 25 + 22 files changed, 2108 insertions(+), 65 deletions(-) delete mode 100644 crates/services/sync/src/config.rs create mode 100644 crates/services/sync/src/import.rs create mode 100644 crates/services/sync/src/import/back_pressure_tests.rs create mode 100644 crates/services/sync/src/import/tests.rs create mode 100644 crates/services/sync/src/ports.rs create mode 100644 crates/services/sync/src/service/tests.rs create mode 100644 crates/services/sync/src/state.rs create mode 100644 crates/services/sync/src/state/test.rs create mode 100644 crates/services/sync/src/sync.rs create mode 100644 crates/services/sync/src/sync/tests.rs diff --git a/.gitignore b/.gitignore index 36d7b444ad6..1dec540d0e1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ .vscode .cov lcov.info -version-compatibility/Cargo.lock \ No newline at end of file +version-compatibility/Cargo.lock +benches/benches-outputs/Cargo.lock \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2480879f6e5..505612b0cb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,9 +380,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.59" +version = "0.1.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" +checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3" dependencies = [ "proc-macro2", "quote", @@ -2484,6 +2484,7 @@ dependencies = [ "async-trait", "futures", "mockall", + "parking_lot 0.12.1", "tokio", "tracing", ] @@ -2503,8 +2504,12 @@ name = "fuel-core-sync" version = "0.15.1" dependencies = [ "anyhow", + "async-trait", + "fuel-core-services", "fuel-core-types", - "parking_lot 0.12.1", + "futures", + "mockall", + "test-case", "tokio", ] diff --git a/crates/services/Cargo.toml b/crates/services/Cargo.toml index d7b8d83e147..11899b3eea5 100644 --- a/crates/services/Cargo.toml +++ b/crates/services/Cargo.toml @@ -1,18 +1,19 @@ [package] -name = "fuel-core-services" -version = "0.15.1" authors = ["Fuel Labs "] +description = "The common code for fuel core services." edition = "2021" homepage = "https://fuel.network/" -keywords = ["blockchain", "fuel", "consensus", "bft"] +keywords = ["bft", "blockchain", "consensus", "fuel"] license = "BUSL-1.1" +name = "fuel-core-services" repository = "https://github.com/FuelLabs/fuel-core" -description = "The common code for fuel core services." +version = "0.15.1" [dependencies] anyhow = "1.0" async-trait = "0.1" futures = "0.3" +parking_lot = "0.12" tokio = { version = "1.21", features = ["full"] } tracing = "0.1" diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 42dbbcaf897..f5ae078d522 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -17,6 +17,19 @@ pub mod stream { /// A Send + Sync BoxStream pub type BoxStream = core::pin::Pin + Send + Sync + 'static>>; + + /// Helper trait to create a BoxStream from a Stream + pub trait IntoBoxStream: Stream { + /// Convert this stream into a BoxStream. + fn into_boxed(self) -> BoxStream + where + Self: Sized + Send + Sync + 'static, + { + Box::pin(self) + } + } + + impl IntoBoxStream for S where S: Stream + Send + Sync + 'static {} } pub use service::{ @@ -26,6 +39,7 @@ pub use service::{ Service, ServiceRunner, Shared, + SharedMutex, }; pub use state::{ State, diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index 40e59a449c9..5d0920070f5 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -11,6 +11,16 @@ use tokio::{ /// Alias for Arc pub type Shared = std::sync::Arc; +/// A mutex that can safely be in async contexts and avoids deadlocks. +#[derive(Debug)] +pub struct SharedMutex(Shared>); + +impl Clone for SharedMutex { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + /// Used if services have no asynchronously shared data #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct EmptyShared; @@ -290,6 +300,19 @@ where }) } +impl SharedMutex { + /// Creates a new `SharedMutex` with the given value. + pub fn new(t: T) -> Self { + Self(Shared::new(parking_lot::Mutex::new(t))) + } + + /// Apply a function to the inner value and return a value. + pub fn apply(&self, f: impl FnOnce(&mut T) -> R) -> R { + let mut t = self.0.lock(); + f(&mut t) + } +} + // TODO: Add tests #[cfg(test)] mod tests { diff --git a/crates/services/src/state.rs b/crates/services/src/state.rs index cd219ec9723..404de419d9d 100644 --- a/crates/services/src/state.rs +++ b/crates/services/src/state.rs @@ -76,6 +76,10 @@ impl StateWatcher { impl StateWatcher { /// Infinity loop while the state is `State::Started`. Returns the next received state. pub async fn while_started(&mut self) -> anyhow::Result { + let state = self.borrow().clone(); + if !state.started() { + return Ok(state) + } loop { self.changed().await?; diff --git a/crates/services/sync/Cargo.toml b/crates/services/sync/Cargo.toml index cf16e199c90..294c25840dc 100644 --- a/crates/services/sync/Cargo.toml +++ b/crates/services/sync/Cargo.toml @@ -1,16 +1,25 @@ [package] -name = "fuel-core-sync" -version = "0.15.1" authors = ["Fuel Labs "] +description = "Fuel Synchronizer" edition = "2021" homepage = "https://fuel.network/" keywords = ["blockchain", "fuel", "fuel-vm"] license = "BUSL-1.1" +name = "fuel-core-sync" repository = "https://github.com/FuelLabs/fuel-core" -description = "Fuel Synchronizer" +version = "0.15.1" [dependencies] anyhow = "1.0" +async-trait = "0.1.60" +fuel-core-services = { path = "../" } fuel-core-types = { path = "../../types", version = "0.15.1" } -parking_lot = "0.12" +futures = "0.3.25" tokio = { version = "1.21", features = ["full"] } + +[dev-dependencies] +fuel-core-types = { path = "../../types", features = [ + "test-helpers", +] } +mockall = "0.11.3" +test-case = "2.2.2" diff --git a/crates/services/sync/src/config.rs b/crates/services/sync/src/config.rs deleted file mode 100644 index 15dcc589b67..00000000000 --- a/crates/services/sync/src/config.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[derive(Default, Clone, Debug)] -pub struct Config {} diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs new file mode 100644 index 00000000000..e37a071871f --- /dev/null +++ b/crates/services/sync/src/import.rs @@ -0,0 +1,363 @@ +//! # Importer Task +//! This module contains the import task which is responsible for +//! importing blocks from the network into the local blockchain. + +use std::{ + ops::RangeInclusive, + sync::Arc, +}; + +use fuel_core_services::{ + SharedMutex, + StateWatcher, +}; +use fuel_core_types::{ + blockchain::{ + block::Block, + consensus::Sealed, + primitives::BlockHeight, + SealedBlock, + SealedBlockHeader, + }, + services::p2p::SourcePeer, +}; +use futures::{ + stream::{ + self, + StreamExt, + }, + Stream, +}; +use std::future::Future; +use tokio::sync::Notify; + +use crate::{ + ports::{ + BlockImporterPort, + ConsensusPort, + PeerToPeerPort, + }, + state::State, +}; + +#[cfg(test)] +pub(crate) use tests::empty_header; + +#[cfg(test)] +mod tests; + +#[cfg(test)] +mod back_pressure_tests; + +#[derive(Clone, Copy, Debug)] +/// Parameters for the import task. +pub struct Config { + /// The maximum number of get header requests to make in a single batch. + pub max_get_header_requests: usize, + /// The maximum number of get transaction requests to make in a single batch. + pub max_get_txns_requests: usize, +} + +pub(crate) struct Import { + /// Shared state between import and sync tasks. + state: SharedMutex, + /// Notify import when sync has new work. + notify: Arc, + /// Configuration parameters. + params: Config, + /// Network port. + p2p: Arc

, + /// Executor port. + executor: Arc, + /// Consensus port. + consensus: Arc, +} + +impl Import { + pub(crate) fn new( + state: SharedMutex, + notify: Arc, + params: Config, + p2p: Arc

, + executor: Arc, + consensus: Arc, + ) -> Self { + Self { + state, + notify, + params, + p2p, + executor, + consensus, + } + } +} +impl Import +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + pub(crate) async fn import( + &self, + shutdown: &mut StateWatcher, + ) -> anyhow::Result { + self.import_inner(shutdown).await?; + + Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await) + } + + async fn import_inner(&self, shutdown: &StateWatcher) -> anyhow::Result<()> { + // If there is a range to process, launch the stream. + if let Some(range) = self.state.apply(|s| s.process_range()) { + // Launch the stream to import the range. + let (count, result) = self.launch_stream(range.clone(), shutdown).await; + + // Get the size of the range. + let range_len = range.size_hint().0 as u32; + + // If we did not process the entire range, mark the failed heights as failed. + if (count as u32) < range_len { + let range = (*range.start() + count as u32)..=*range.end(); + self.state.apply(|s| s.failed_to_process(range)); + } + result?; + } + Ok(()) + } + + /// Launches a stream to import and execute a range of blocks. + /// + /// This stream will process all blocks up to the given range or + /// an error occurs. + /// If an error occurs, the preceding blocks still be processed + /// and the error will be returned. + async fn launch_stream( + &self, + range: RangeInclusive, + shutdown: &StateWatcher, + ) -> (usize, anyhow::Result<()>) { + let Self { + state, + params, + p2p, + executor, + consensus, + .. + } = &self; + // Request up to `max_get_header_requests` headers from the network. + get_header_range_buffered(range.clone(), params, p2p.clone()) + .map({ + let p2p = p2p.clone(); + let consensus_port = consensus.clone(); + move |result| { + let p2p = p2p.clone(); + let consensus_port = consensus_port.clone(); + async move { + // Short circuit on error. + let header = match result { + Ok(h) => h, + Err(e) => return Err(e), + }; + let SourcePeer { + peer_id, + data: header, + } = header; + let id = header.entity.id(); + let block_id = SourcePeer { peer_id, data: id }; + + // Check the consensus is valid on this header. + if !consensus_port.check_sealed_header(&header).await? { + return Ok(None) + } + let Sealed { + entity: header, + consensus, + } = header; + + // Request the transactions for this block. + Ok(p2p + .get_transactions(block_id) + .await? + .and_then(|transactions| { + Some(SealedBlock { + entity: Block::try_from_executed(header, transactions)?, + consensus, + }) + })) + } + } + }) + // Request up to `max_get_txns_requests` transactions from the network. + .buffered(params.max_get_txns_requests) + // Continue the stream unless an error or none occurs. + // Note the error will be returned but the stream will close. + .into_scan_none_or_err() + .scan_none_or_err() + // Continue the stream until the shutdown signal is received. + .take_until({ + let mut s = shutdown.clone(); + async move { s.while_started().await } + }) + .then({ + let state = state.clone(); + let executor = executor.clone(); + move |block| { + let state = state.clone(); + let executor = executor.clone(); + async move { + // Short circuit on error. + let block = match block { + Ok(b) => b, + Err(e) => return Err(e), + }; + + // Execute and commit the block. + let height = *block.entity.header().height(); + let r = executor.execute_and_commit(block).await; + + // If the block executed successfully, mark it as committed. + if r.is_ok() { + state.apply(|s| s.commit(*height)) + } + r + } + } + }) + // Continue the stream unless an error occurs. + .into_scan_err() + .scan_err() + // Count the number of successfully executed blocks and + // find any errors. + // Fold the stream into a count and any errors. + .fold((0usize, Ok(())), |(count, err), result| async move { + match result { + Ok(_) => (count + 1, err), + Err(e) => (count, Err(e)), + } + }) + .await + } +} + +/// Waits for a notify or shutdown signal. +/// Returns true if the notify signal was received. +async fn wait_for_notify_or_shutdown( + notify: &Notify, + shutdown: &mut StateWatcher, +) -> bool { + let n = notify.notified(); + let s = shutdown.while_started(); + futures::pin_mut!(n); + futures::pin_mut!(s); + + // Select the first signal to be received. + let r = futures::future::select(n, s).await; + + // Check if the notify signal was received. + matches!(r, futures::future::Either::Left(_)) +} + +/// Returns a stream of headers processing concurrently up to `max_get_header_requests`. +/// The headers are returned in order. +fn get_header_range_buffered( + range: RangeInclusive, + params: &Config, + p2p: Arc, +) -> impl Stream>> { + get_header_range(range, p2p) + .buffered(params.max_get_header_requests) + // Continue the stream unless an error or none occurs. + .into_scan_none_or_err() + .scan_none_or_err() +} + +/// Returns a stream of network requests for headers. +fn get_header_range( + range: RangeInclusive, + p2p: Arc, +) -> impl Stream< + Item = impl Future>>>, +> { + stream::iter(range).map(move |height| { + let p2p = p2p.clone(); + let height: BlockHeight = height.into(); + async move { + Ok(p2p + .get_sealed_block_header(height) + .await? + .and_then(|header| { + // Check the header is the expected height. + validate_header_height(height, &header.data).then_some(header) + })) + } + }) +} + +/// Returns true if the header is the expected height. +fn validate_header_height( + expected_height: BlockHeight, + header: &SealedBlockHeader, +) -> bool { + header.entity.consensus.height == expected_height +} + +/// Extra stream utilities. +trait StreamUtil: Sized { + /// Turn a stream of `Result>` into a stream of `Result`. + /// Close the stream if an error occurs or a `None` is received. + /// Return the error if the stream closes. + fn into_scan_none_or_err(self) -> ScanNoneErr { + ScanNoneErr(self) + } + + /// Turn a stream of `Result` into a stream of `Result`. + /// Close the stream if an error occurs. + /// Return the error if the stream closes. + fn into_scan_err(self) -> ScanErr { + ScanErr(self) + } +} + +impl StreamUtil for S {} + +struct ScanNoneErr(S); +struct ScanErr(S); + +impl ScanNoneErr { + /// Scan the stream for `None` or errors. + fn scan_none_or_err(self) -> impl Stream> + where + S: Stream>> + Send + 'static, + { + let stream = self.0.boxed(); + futures::stream::unfold((false, stream), |(mut err, mut stream)| async move { + if err { + None + } else { + let result = stream.next().await?; + err = result.is_err(); + result.transpose().map(|result| (result, (err, stream))) + } + }) + } +} + +impl ScanErr { + /// Scan the stream for errors. + fn scan_err(self) -> impl Stream> + where + S: Stream> + Send + 'static, + { + let stream = self.0.boxed(); + futures::stream::unfold((false, stream), |(mut err, mut stream)| async move { + if err { + None + } else { + let result = stream.next().await?; + err = result.is_err(); + Some((result, (err, stream))) + } + }) + } +} diff --git a/crates/services/sync/src/import/back_pressure_tests.rs b/crates/services/sync/src/import/back_pressure_tests.rs new file mode 100644 index 00000000000..ad23931a6bf --- /dev/null +++ b/crates/services/sync/src/import/back_pressure_tests.rs @@ -0,0 +1,273 @@ +use std::time::Duration; + +use fuel_core_services::stream::BoxStream; +use fuel_core_types::{ + blockchain::primitives::BlockId, + fuel_tx::Transaction, +}; + +use crate::ports::{ + BlockImporterPort, + MockBlockImporterPort, + MockConsensusPort, + MockPeerToPeerPort, +}; + +use super::{ + tests::empty_header, + *, +}; +use test_case::test_case; + +#[derive(Default)] +struct Input { + headers: Duration, + transactions: Duration, + consensus: Duration, + executes: Duration, +} + +#[test_case( + Input::default(), State::new(None, None), + Config{ + max_get_header_requests: 1, + max_get_txns_requests: 1, + } + => Count::default() ; "Empty sanity test" +)] +#[test_case( + Input { + headers: Duration::from_millis(10), + ..Default::default() + }, + State::new(None, 1), + Config{ + max_get_header_requests: 1, + max_get_txns_requests: 1, + } + => is less_or_equal_than Count{ headers: 1, transactions: 1, consensus_calls: 1, executes: 1, blocks: 1 } + ; "Single with slow headers" +)] +#[test_case( + Input { + headers: Duration::from_millis(10), + ..Default::default() + }, + State::new(None, 100), + Config{ + max_get_header_requests: 10, + max_get_txns_requests: 10, + } + => is less_or_equal_than Count{ headers: 10, transactions: 10, consensus_calls: 10, executes: 1, blocks: 21 } + ; "100 headers with max 10 with slow headers" +)] +#[test_case( + Input { + transactions: Duration::from_millis(10), + ..Default::default() + }, + State::new(None, 100), + Config{ + max_get_header_requests: 10, + max_get_txns_requests: 10, + } + => is less_or_equal_than Count{ headers: 10, transactions: 10, consensus_calls: 10, executes: 1, blocks: 21 } + ; "100 headers with max 10 with slow transactions" +)] +#[test_case( + Input { + consensus: Duration::from_millis(10), + ..Default::default() + }, + State::new(None, 100), + Config{ + max_get_header_requests: 10, + max_get_txns_requests: 10, + } + => is less_or_equal_than Count{ headers: 10, transactions: 10, consensus_calls: 10, executes: 1, blocks: 21 } + ; "100 headers with max 10 with slow consensus" +)] +#[test_case( + Input { + executes: Duration::from_millis(10), + ..Default::default() + }, + State::new(None, 50), + Config{ + max_get_header_requests: 10, + max_get_txns_requests: 10, + } + => is less_or_equal_than Count{ headers: 10, transactions: 10, consensus_calls: 10, executes: 1, blocks: 21 } + ; "50 headers with max 10 with slow executes" +)] +#[tokio::test(flavor = "multi_thread")] +async fn test_back_pressure(input: Input, state: State, params: Config) -> Count { + let counts = SharedCounts::new(Default::default()); + let state = SharedMutex::new(state); + + let p2p = Arc::new(PressurePeerToPeerPort::new( + counts.clone(), + [input.headers, input.transactions], + )); + let consensus = Arc::new(PressureConsensusPort::new(counts.clone(), input.consensus)); + let executor = Arc::new(PressureBlockImporterPort::new( + counts.clone(), + input.executes, + )); + let notify = Arc::new(Notify::new()); + + let import = Import { + state, + notify, + params, + p2p, + executor, + consensus, + }; + + import.notify.notify_one(); + let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + import.import(&mut watcher).await.unwrap(); + counts.apply(|c| c.max.clone()) +} + +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] +struct Count { + headers: usize, + transactions: usize, + consensus_calls: usize, + executes: usize, + blocks: usize, +} + +#[derive(Debug, Default, PartialEq, Eq)] +struct Counts { + now: Count, + max: Count, +} + +type SharedCounts = SharedMutex; + +struct PressurePeerToPeerPort(MockPeerToPeerPort, [Duration; 2], SharedCounts); +struct PressureConsensusPort(MockConsensusPort, Duration, SharedCounts); +struct PressureBlockImporterPort(MockBlockImporterPort, Duration, SharedCounts); + +#[async_trait::async_trait] +impl PeerToPeerPort for PressurePeerToPeerPort { + fn height_stream(&self) -> BoxStream { + self.0.height_stream() + } + async fn get_sealed_block_header( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + self.2.apply(|c| c.inc_headers()); + tokio::time::sleep(self.1[0]).await; + self.2.apply(|c| { + c.dec_headers(); + c.inc_blocks(); + }); + self.0.get_sealed_block_header(height).await + } + async fn get_transactions( + &self, + block_id: SourcePeer, + ) -> anyhow::Result>> { + self.2.apply(|c| c.inc_transactions()); + tokio::time::sleep(self.1[1]).await; + self.2.apply(|c| c.dec_transactions()); + self.0.get_transactions(block_id).await + } +} + +#[async_trait::async_trait] +impl ConsensusPort for PressureConsensusPort { + async fn check_sealed_header( + &self, + header: &SealedBlockHeader, + ) -> anyhow::Result { + self.2.apply(|c| c.inc_consensus_calls()); + tokio::time::sleep(self.1).await; + self.2.apply(|c| c.dec_consensus_calls()); + self.0.check_sealed_header(header).await + } +} + +#[async_trait::async_trait] +impl BlockImporterPort for PressureBlockImporterPort { + async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> { + self.2.apply(|c| c.inc_executes()); + tokio::time::sleep(self.1).await; + self.2.apply(|c| { + c.dec_executes(); + c.dec_blocks(); + }); + self.0.execute_and_commit(block).await + } +} + +impl PressurePeerToPeerPort { + fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { + let mut mock = MockPeerToPeerPort::default(); + mock.expect_get_sealed_block_header() + .returning(|h| Ok(Some(empty_header(h)))); + mock.expect_get_transactions() + .returning(|_| Ok(Some(vec![]))); + Self(mock, delays, counts) + } +} + +impl PressureConsensusPort { + fn new(counts: SharedCounts, delays: Duration) -> Self { + let mut mock = MockConsensusPort::default(); + mock.expect_check_sealed_header().returning(|_| Ok(true)); + Self(mock, delays, counts) + } +} + +impl PressureBlockImporterPort { + fn new(counts: SharedCounts, delays: Duration) -> Self { + let mut mock = MockBlockImporterPort::default(); + mock.expect_execute_and_commit().returning(move |_| Ok(())); + Self(mock, delays, counts) + } +} + +impl Counts { + fn inc_headers(&mut self) { + self.now.headers += 1; + self.max.headers = self.max.headers.max(self.now.headers); + } + fn dec_headers(&mut self) { + self.now.headers -= 1; + } + fn inc_transactions(&mut self) { + self.now.transactions += 1; + self.max.transactions = self.max.transactions.max(self.now.transactions); + } + fn dec_transactions(&mut self) { + self.now.transactions -= 1; + } + fn inc_consensus_calls(&mut self) { + self.now.consensus_calls += 1; + self.max.consensus_calls = self.max.consensus_calls.max(self.now.consensus_calls); + } + fn dec_consensus_calls(&mut self) { + self.now.consensus_calls -= 1; + } + fn inc_executes(&mut self) { + self.now.executes += 1; + self.max.executes = self.max.executes.max(self.now.executes); + } + fn dec_executes(&mut self) { + self.now.executes -= 1; + } + fn inc_blocks(&mut self) { + self.now.blocks += 1; + self.max.blocks = self.max.blocks.max(self.now.blocks); + } + fn dec_blocks(&mut self) { + self.now.blocks -= 1; + } +} diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs new file mode 100644 index 00000000000..039b05cca74 --- /dev/null +++ b/crates/services/sync/src/import/tests.rs @@ -0,0 +1,605 @@ +use fuel_core_types::blockchain::{ + consensus::Consensus, + header::BlockHeader, +}; + +use crate::ports::{ + MockBlockImporterPort, + MockConsensusPort, + MockPeerToPeerPort, +}; +use test_case::test_case; + +use super::*; + +#[test_case(State::new(None, 5), Mocks::times([6]) => (State::new(5, None), true) ; "executes 5")] +#[test_case(State::new(3, 5), Mocks::times([2]) => (State::new(5, None), true) ; "executes 3 to 5")] +#[test_case( + State::new(3, 5), + { + let mut consensus_port = MockConsensusPort::default(); + consensus_port.expect_check_sealed_header() + .times(1) + .returning(|_| Ok(false)); + Mocks{ + consensus_port, + p2p: DefaultMocks::times([2, 0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), true) ; "Signature always fails" +)] +#[test_case( + State::new(3, 5), + { + let mut consensus_port = MockConsensusPort::default(); + consensus_port.expect_check_sealed_header() + .times(2) + .returning(|h| Ok(**h.entity.height() != 5)); + Mocks{ + consensus_port, + p2p: DefaultMocks::times([2, 1]), + executor: DefaultMocks::times([1]) + } + } + => (State::new(4, None), true) ; "Signature fails on header 5 only" +)] +#[test_case( + State::new(3, 5), + { + let mut consensus_port = MockConsensusPort::default(); + consensus_port.expect_check_sealed_header() + .times(1) + .returning(|h| Ok(**h.entity.height() != 4)); + Mocks{ + consensus_port, + p2p: DefaultMocks::times([2, 0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), true) ; "Signature fails on header 4 only" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(1) + .returning(|_| Ok(None)); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), true) ; "Header not found" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok((*h != 5).then(|| empty_header(h)))); + p2p.expect_get_transactions() + .times(1) + .returning(|_| Ok(Some(vec![]))); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([1]), + executor: DefaultMocks::times([1]) + } + } + => (State::new(4, None), true) ; "Header 5 not found" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(1) + .returning(|h| Ok((*h != 4).then(|| empty_header(h)))); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), true) ; "Header 4 not found" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok(Some(empty_header(h)))); + p2p.expect_get_transactions() + .times(1) + .returning(|_| Ok(None)); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([1]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), true) ; "transactions not found" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok(Some(empty_header(h)))); + let mut count = 0; + p2p.expect_get_transactions() + .times(1) + .returning(move|_| { + count += 1; + if count > 1 { + Ok(Some(vec![])) + } else { + Ok(None) + } + }); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([1]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), true) ; "transactions not found for header 4" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok(Some(empty_header(h)))); + let mut count = 0; + p2p.expect_get_transactions() + .times(2) + .returning(move|_| { + count += 1; + if count > 1 { + Ok(None) + } else { + Ok(Some(vec![])) + } + }); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([2]), + executor: DefaultMocks::times([1]) + } + } + => (State::new(4, None), true) ; "transactions not found for header 5" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(1) + .returning(|_| Err(anyhow::anyhow!("Some network error"))); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), false); "p2p error" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(1) + .returning(|h| if *h == 4 { + Err(anyhow::anyhow!("Some network error")) + } else { + Ok(Some(empty_header(h))) + }); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), false); "header 4 p2p error" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| if *h == 5 { + Err(anyhow::anyhow!("Some network error")) + } else { + Ok(Some(empty_header(h))) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|_| Ok(Some(vec![]))); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([1]), + executor: DefaultMocks::times([1]) + } + } + => (State::new(4, None), false); "header 5 p2p error" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok(Some(empty_header(h)))); + p2p.expect_get_transactions() + .times(1) + .returning(|_| Err(anyhow::anyhow!("Some network error"))); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([1]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), false); "p2p error on transactions" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok(Some(empty_header(h)))); + let mut count = 0; + p2p.expect_get_transactions() + .times(1) + .returning(move|_| { + count += 1; + if count > 1 { + Ok(Some(vec![])) + } else { + Err(anyhow::anyhow!("Some network error")) + } + }); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([1]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), false); "p2p error on 4 transactions" +)] +#[test_case( + State::new(3, 5), + { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(2) + .returning(|h| Ok(Some(empty_header(h)))); + let mut count = 0; + p2p.expect_get_transactions() + .times(2) + .returning(move|_| { + count += 1; + if count > 1 { + Err(anyhow::anyhow!("Some network error")) + } else { + Ok(Some(vec![])) + } + }); + Mocks{ + p2p, + consensus_port: DefaultMocks::times([2]), + executor: DefaultMocks::times([1]) + } + } + => (State::new(4, None), false); "p2p error on 5 transactions" +)] +#[test_case( + State::new(3, 5), + { + let mut consensus_port = MockConsensusPort::default(); + consensus_port.expect_check_sealed_header() + .times(1) + .returning(|_| Err(anyhow::anyhow!("Some consensus error"))); + Mocks{ + consensus_port, + p2p: DefaultMocks::times([2, 0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), false) ; "consensus error" +)] +#[test_case( + State::new(3, 5), + { + let mut consensus_port = MockConsensusPort::default(); + consensus_port.expect_check_sealed_header() + .times(1) + .returning(|h| if **h.entity.height() == 4 { + Err(anyhow::anyhow!("Some consensus error")) + } else { + Ok(true) + }); + Mocks{ + consensus_port, + p2p: DefaultMocks::times([2, 0]), + executor: DefaultMocks::times([0]) + } + } + => (State::new(3, None), false) ; "consensus error on 4" +)] +#[test_case( + State::new(3, 5), + { + let mut consensus_port = MockConsensusPort::default(); + consensus_port.expect_check_sealed_header() + .times(2) + .returning(|h| if **h.entity.height() == 5 { + Err(anyhow::anyhow!("Some consensus error")) + } else { + Ok(true) + }); + Mocks{ + consensus_port, + p2p: DefaultMocks::times([2, 1]), + executor: DefaultMocks::times([1]) + } + } + => (State::new(4, None), false) ; "consensus error on 5" +)] +#[test_case( + State::new(3, 5), + { + let mut executor = MockBlockImporterPort::default(); + executor + .expect_execute_and_commit() + .times(1) + .returning(|_| Err(anyhow::anyhow!("Some execution error"))); + Mocks{ + consensus_port: DefaultMocks::times([1]), + p2p: DefaultMocks::times([2, 1]), + executor, + } + } + => (State::new(3, None), false) ; "execution error" +)] +#[test_case( + State::new(3, 5), + { + let mut executor = MockBlockImporterPort::default(); + executor + .expect_execute_and_commit() + .times(1) + .returning(|h| { + if **h.entity.header().height() == 4 { + Err(anyhow::anyhow!("Some execution error")) + } else { + Ok(()) + } + }); + Mocks{ + consensus_port: DefaultMocks::times([1]), + p2p: DefaultMocks::times([2, 1]), + executor, + } + } + => (State::new(3, None), false) ; "execution error on header 4" +)] +#[test_case( + State::new(3, 5), + { + let mut executor = MockBlockImporterPort::default(); + executor + .expect_execute_and_commit() + .times(2) + .returning(|h| { + if **h.entity.header().height() == 5 { + Err(anyhow::anyhow!("Some execution error")) + } else { + Ok(()) + } + }); + Mocks{ + consensus_port: DefaultMocks::times([2]), + p2p: DefaultMocks::times([2, 2]), + executor, + } + } + => (State::new(4, None), false) ; "execution error on header 5" +)] +#[tokio::test] +async fn test_import(state: State, mocks: Mocks) -> (State, bool) { + let state = SharedMutex::new(state); + test_import_inner(state, mocks, None).await +} + +#[test_case( + { + let s = SharedMutex::new(State::new(3, 5)); + let state = s.clone(); + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_header() + .times(3) + .returning(move |h| { + state.apply(|s| s.observe(6)); + Ok(Some(empty_header(h))) + }); + p2p.expect_get_transactions() + .times(3) + .returning(move|_| Ok(Some(vec![]))); + let c = DefaultMocks::times([2]); + (s, c, Mocks{ + consensus_port: DefaultMocks::times([3]), + p2p, + executor: DefaultMocks::times([3]), + }) + } + => (State::new(6, None), true) ; "Loop 1 with headers 4, 5. Loop 2 with header 6" +)] +#[tokio::test] +async fn test_import_loop( + (state, count, mocks): (SharedMutex, Count, Mocks), +) -> (State, bool) { + test_import_inner(state, mocks, Some(count)).await +} + +async fn test_import_inner( + state: SharedMutex, + mocks: Mocks, + count: Option, +) -> (State, bool) { + let notify = Arc::new(Notify::new()); + let Mocks { + consensus_port, + p2p, + executor, + } = mocks; + let params = Config { + max_get_header_requests: 10, + max_get_txns_requests: 10, + }; + let p2p = Arc::new(p2p); + + let executor = Arc::new(executor); + let consensus = Arc::new(consensus_port); + + let import = Import { + state, + notify, + params, + p2p, + executor, + consensus, + }; + let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + let r = match count { + Some(Count(count)) => { + let mut r = false; + for _ in 0..count { + import.notify.notify_one(); + r = import.import(&mut watcher).await.is_ok(); + if !r { + break + } + } + r + } + None => { + import.notify.notify_one(); + import.import(&mut watcher).await.is_ok() + } + }; + let s = import.state.apply(|s| s.clone()); + (s, r) +} + +struct Mocks { + consensus_port: MockConsensusPort, + p2p: MockPeerToPeerPort, + executor: MockBlockImporterPort, +} + +struct Count(usize); + +trait DefaultMocks { + fn times(t: T) -> Self + where + T: IntoIterator + Clone, + ::IntoIter: Clone; +} + +impl DefaultMocks for Mocks { + fn times(t: T) -> Self + where + T: IntoIterator + Clone, + ::IntoIter: Clone, + { + Self { + consensus_port: DefaultMocks::times(t.clone()), + p2p: DefaultMocks::times(t.clone()), + executor: DefaultMocks::times(t), + } + } +} + +impl DefaultMocks for Count { + fn times(t: T) -> Self + where + T: IntoIterator + Clone, + ::IntoIter: Clone, + { + Self(t.into_iter().next().unwrap()) + } +} + +impl DefaultMocks for MockConsensusPort { + fn times(t: T) -> Self + where + T: IntoIterator + Clone, + ::IntoIter: Clone, + { + let mut consensus_port = MockConsensusPort::new(); + consensus_port + .expect_check_sealed_header() + .times(t.into_iter().next().unwrap()) + .returning(|_| Ok(true)); + consensus_port + } +} + +impl DefaultMocks for MockPeerToPeerPort { + fn times(t: T) -> Self + where + T: IntoIterator + Clone, + ::IntoIter: Clone, + { + let mut p2p = MockPeerToPeerPort::default(); + let mut t = t.into_iter().cycle(); + + p2p.expect_get_sealed_block_header() + .times(t.next().unwrap()) + .returning(|h| Ok(Some(empty_header(h)))); + p2p.expect_get_transactions() + .times(t.next().unwrap()) + .returning(|_| Ok(Some(vec![]))); + p2p + } +} + +impl DefaultMocks for MockBlockImporterPort { + fn times + Clone>(t: T) -> Self { + let mut executor = MockBlockImporterPort::default(); + let t = t.into_iter().next().unwrap(); + + executor + .expect_execute_and_commit() + .times(t) + .returning(move |_| Ok(())); + executor + } +} + +pub(crate) fn empty_header(h: BlockHeight) -> SourcePeer { + let mut header = BlockHeader::default(); + header.consensus.height = h; + + let consensus = Consensus::default(); + let sealed = Sealed { + entity: header, + consensus, + }; + SourcePeer { + peer_id: vec![].into(), + data: sealed, + } +} diff --git a/crates/services/sync/src/lib.rs b/crates/services/sync/src/lib.rs index 456016fb48f..44415118a6c 100644 --- a/crates/services/sync/src/lib.rs +++ b/crates/services/sync/src/lib.rs @@ -1,7 +1,10 @@ #![deny(unused_crate_dependencies)] +#![deny(missing_docs)] +//! # Sync Service +//! Responsible for syncing the blockchain from the network. -pub mod config; +pub mod import; +pub mod ports; pub mod service; - -pub use config::Config; -pub use service::Service; +mod state; +pub mod sync; diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs new file mode 100644 index 00000000000..c2a68c57ef3 --- /dev/null +++ b/crates/services/sync/src/ports.rs @@ -0,0 +1,59 @@ +//! Ports this services requires to function. + +use fuel_core_services::stream::BoxStream; +use fuel_core_types::{ + blockchain::{ + primitives::{ + BlockHeight, + BlockId, + }, + SealedBlock, + SealedBlockHeader, + }, + fuel_tx::Transaction, + services::p2p::SourcePeer, +}; + +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +/// Port for communication with the network. +pub trait PeerToPeerPort { + /// Stream of newly observed block heights. + fn height_stream(&self) -> BoxStream; + + /// Request sealed block header from the network + /// at the given height. + /// + /// Returns the source peer this header was received from. + async fn get_sealed_block_header( + &self, + height: BlockHeight, + ) -> anyhow::Result>>; + + /// Request transactions from the network for the given block + /// and source peer. + async fn get_transactions( + &self, + block_id: SourcePeer, + ) -> anyhow::Result>>; +} + +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +/// Port for communication with the consensus service. +pub trait ConsensusPort { + /// Check if the given sealed block header is valid. + async fn check_sealed_header( + &self, + header: &SealedBlockHeader, + ) -> anyhow::Result; +} + +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +/// Port for communication with the block importer. +pub trait BlockImporterPort { + /// Execute the given sealed block + /// and commit it to the database. + async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()>; +} diff --git a/crates/services/sync/src/service.rs b/crates/services/sync/src/service.rs index 4324eb33d86..8e8c60ac5b2 100644 --- a/crates/services/sync/src/service.rs +++ b/crates/services/sync/src/service.rs @@ -1,61 +1,198 @@ -use crate::Config; -use fuel_core_types::services::p2p::BlockGossipData; -use parking_lot::Mutex; -use tokio::{ - sync::{ - mpsc, - oneshot, +//! Service utilities for running fuel sync. +use std::sync::Arc; + +use crate::{ + import::{ + Config, + Import, + }, + ports::{ + self, + BlockImporterPort, + ConsensusPort, + PeerToPeerPort, }, - task::JoinHandle, + state::State, + sync::SyncHeights, }; -pub enum SyncStatus { - Stopped, - InitialSync, -} +use fuel_core_services::{ + stream::{ + BoxStream, + IntoBoxStream, + }, + RunnableService, + RunnableTask, + Service, + ServiceRunner, + SharedMutex, + StateWatcher, +}; +use fuel_core_types::blockchain::primitives::BlockHeight; +use futures::StreamExt; +use tokio::sync::Notify; + +#[cfg(test)] +mod tests; -pub enum SyncMpsc { - Status { ret: oneshot::Sender }, - Start, - Stop, +/// Creates an instance of runnable sync service. +pub fn new_service( + current_fuel_block_height: BlockHeight, + p2p: P, + executor: E, + consensus: C, + params: Config, +) -> anyhow::Result>> +where + P: ports::PeerToPeerPort + Send + Sync + 'static, + E: ports::BlockImporterPort + Send + Sync + 'static, + C: ports::ConsensusPort + Send + Sync + 'static, +{ + let height_stream = p2p.height_stream(); + let state = State::new(*current_fuel_block_height, None); + Ok(ServiceRunner::new(SyncTask::new( + height_stream, + state, + params, + p2p, + executor, + consensus, + )?)) } -pub struct Service { - join: Mutex>>, - sender: mpsc::Sender, +/// Task for syncing heights. +/// Contains import task as a child task. +pub struct SyncTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + sync_heights: SyncHeights, + import_task_handle: ServiceRunner>, } -impl Service { - pub async fn new(_config: &Config) -> anyhow::Result { - let (sender, _receiver) = mpsc::channel(100); +struct ImportTask(Import); + +impl SyncTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + fn new( + height_stream: BoxStream, + state: State, + params: Config, + p2p: P, + executor: E, + consensus: C, + ) -> anyhow::Result { + let notify = Arc::new(Notify::new()); + let state = SharedMutex::new(state); + let p2p = Arc::new(p2p); + let executor = Arc::new(executor); + let consensus = Arc::new(consensus); + let sync_heights = SyncHeights::new(height_stream, state.clone(), notify.clone()); + let import = Import::new(state, notify, params, p2p, executor, consensus); + let import_task_handle = ServiceRunner::new(ImportTask(import)); Ok(Self { - sender, - join: Mutex::new(None), + sync_heights, + import_task_handle, }) } +} - pub async fn start( - &self, - _p2p_block: mpsc::Receiver, - // TODO: re-introduce this when sync actually depends on the coordinator - // _bft: mpsc::Sender, - _block_importer: mpsc::Sender<()>, - ) { - let mut join = self.join.lock(); - if join.is_none() { - *join = Some(tokio::spawn(async {})); +#[async_trait::async_trait] +impl RunnableTask for SyncTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + async fn run( + &mut self, + _: &mut fuel_core_services::StateWatcher, + ) -> anyhow::Result { + if self.import_task_handle.state().stopped() { + return Ok(false) } + Ok(self.sync_heights.sync().await.is_some()) } +} - pub async fn stop(&self) -> Option> { - let join = self.join.lock().take(); - if join.is_some() { - let _ = self.sender.send(SyncMpsc::Stop); - } - join +#[async_trait::async_trait] +impl RunnableService for SyncTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + const NAME: &'static str = "fuel-core-sync"; + + type SharedData = (); + + type Task = SyncTask; + + fn shared_data(&self) -> Self::SharedData {} + + async fn into_task(mut self, watcher: &StateWatcher) -> anyhow::Result { + let mut watcher = watcher.clone(); + self.sync_heights.map_stream(|height_stream| { + height_stream + .take_until(async move { + let _ = watcher.while_started().await; + }) + .into_boxed() + }); + self.import_task_handle.start_and_await().await?; + + Ok(self) } +} + +#[async_trait::async_trait] +impl RunnableTask for ImportTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + async fn run( + &mut self, + watcher: &mut fuel_core_services::StateWatcher, + ) -> anyhow::Result { + self.0.import(watcher).await + } +} + +#[async_trait::async_trait] +impl RunnableService for ImportTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + const NAME: &'static str = "fuel-core-sync/import-task"; + + type SharedData = (); + + type Task = ImportTask; + + fn shared_data(&self) -> Self::SharedData {} + + async fn into_task(self, _: &StateWatcher) -> anyhow::Result { + Ok(self) + } +} - pub fn sender(&self) -> &mpsc::Sender { - &self.sender +impl Drop for SyncTask +where + P: PeerToPeerPort + Send + Sync + 'static, + E: BlockImporterPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +{ + fn drop(&mut self) { + self.import_task_handle.stop(); } } diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs new file mode 100644 index 00000000000..db0dddf98c1 --- /dev/null +++ b/crates/services/sync/src/service/tests.rs @@ -0,0 +1,70 @@ +use fuel_core_services::{ + stream::IntoBoxStream, + Service, +}; +use futures::{ + stream, + StreamExt, +}; + +use crate::{ + import::empty_header, + ports::{ + MockBlockImporterPort, + MockConsensusPort, + MockPeerToPeerPort, + }, +}; + +use super::*; + +#[tokio::test] +async fn test_new_service() { + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_height_stream().returning(|| { + stream::iter( + std::iter::successors(Some(6u32), |n| Some(n + 1)).map(BlockHeight::from), + ) + .then(|h| async move { + if *h == 17 { + futures::future::pending::<()>().await; + } + h + }) + .into_boxed() + }); + p2p.expect_get_sealed_block_header() + .returning(|h| Ok(Some(empty_header(h)))); + p2p.expect_get_transactions() + .returning(|_| Ok(Some(vec![]))); + let mut executor = MockBlockImporterPort::default(); + let (tx, mut rx) = tokio::sync::mpsc::channel(100); + executor.expect_execute_and_commit().returning(move |h| { + tx.try_send(**h.entity.header().height()).unwrap(); + Ok(()) + }); + let mut consensus = MockConsensusPort::default(); + consensus + .expect_check_sealed_header() + .returning(|_| Ok(true)); + let params = Config { + max_get_header_requests: 10, + max_get_txns_requests: 10, + }; + let s = new_service(4u32.into(), p2p, executor, consensus, params).unwrap(); + + assert_eq!( + s.start_and_await().await.unwrap(), + fuel_core_services::State::Started + ); + while let Some(h) = rx.recv().await { + if h == 16 { + break + } + } + + assert_eq!( + s.stop_and_await().await.unwrap(), + fuel_core_services::State::Stopped + ); +} diff --git a/crates/services/sync/src/state.rs b/crates/services/sync/src/state.rs new file mode 100644 index 00000000000..b8f3ba1cc71 --- /dev/null +++ b/crates/services/sync/src/state.rs @@ -0,0 +1,212 @@ +//! State of the sync service. + +use std::{ + cmp::Ordering, + ops::RangeInclusive, +}; + +#[cfg(test)] +mod test; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// State of the sync service. +/// +/// The state takes evidence and produces a status. +pub struct State { + status: Status, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// Status of the sync service. +pub enum Status { + /// The service is not initialized and there is nothing to sync. + Uninitialized, + /// This range is being processed. + Processing(RangeInclusive), + /// This height is committed. + Committed(u32), +} + +impl State { + /// Create a new state from the current committed and observed heights. + pub fn new( + committed: impl Into>, + observed: impl Into>, + ) -> Self { + let status = match (committed.into(), observed.into()) { + // Both the committed and observed heights are known. + (Some(committed), Some(observed)) => { + // If there is a gap between the committed and observed heights, + // the service is processing the gap otherwise the service is + // has nothing to sync. + committed + .checked_add(1) + .map_or(Status::Committed(committed), |next| { + let range = next..=observed; + if range.is_empty() { + Status::Committed(committed) + } else { + Status::Processing(range) + } + }) + } + // Only the committed height is known, so the service has nothing to sync. + (Some(committed), None) => Status::Committed(committed), + // Only the observed height is known, so the service is processing + // up to that height. + (None, Some(observed)) => Status::Processing(0..=observed), + // No heights are known, so the service is uninitialized. + (None, None) => Status::Uninitialized, + }; + Self { status } + } + + /// Get the current range to process. + pub fn process_range(&self) -> Option> { + match &self.status { + Status::Processing(range) => Some(range.clone()), + _ => None, + } + } + + /// Record that a block has been committed. + pub fn commit(&mut self, height: u32) { + let new_status = match &self.status { + // Currently processing a range and recording a commit. + Status::Processing(range) => match height.cmp(range.end()) { + // The commit is less than the end of the range, so the range + // is still being processed. + Ordering::Less => { + Some(Status::Processing(height.saturating_add(1)..=*range.end())) + } + // The commit is equal or greater than the end of the range, + // so the range is fully committed. + Ordering::Equal | Ordering::Greater => Some(Status::Committed(height)), + }, + // Currently uninitialized so now are committed. + Status::Uninitialized => Some(Status::Committed(height)), + // Currently committed and recording a commit. + Status::Committed(existing) => { + // Check if the new commit creates a gap. If not then + // take the max of the existing and new commits. + match commit_creates_processing(existing, &height) { + Some(range) => Some(Status::Processing(range)), + None => Some(Status::Committed(*existing.max(&height))), + } + } + }; + self.apply_status(new_status); + } + + /// Record that a block has been observed. + pub fn observe(&mut self, height: u32) -> bool { + let new_status = match &self.status { + // Currently uninitialized so process from the start to the observed height. + Status::Uninitialized => Some(Status::Processing(0..=height)), + // Currently processing a range and recording an observation. + Status::Processing(range) => match range.end().cmp(&height) { + // The range end is less than the observed height, so + // extend the range to the observed height. + Ordering::Less => Some(Status::Processing(*range.start()..=height)), + // The range end is equal or greater than the observed height, + // so ignore it. + Ordering::Equal | Ordering::Greater => None, + }, + // Currently committed and recording an observation. + // If there is a gap between the committed and observed heights, + // the service is processing. + Status::Committed(committed) => committed.checked_add(1).and_then(|next| { + let r = next..=height; + (!r.is_empty()).then_some(Status::Processing(r)) + }), + }; + let status_change = new_status.is_some(); + self.apply_status(new_status); + status_change + } + + /// Record that a range of blocks have failed to process. + pub fn failed_to_process(&mut self, range: RangeInclusive) { + // Ignore empty ranges. + let status = (!range.is_empty()) + .then_some(()) + .and_then(|_| match &self.status { + // Currently uninitialized or committed. + // Failures do not override these status. + Status::Uninitialized | Status::Committed(_) => None, + // Currently processing a range and recording a failure. + Status::Processing(processing) => range + // If the failure range contains the start of the processing range, + // then there is no reason to continue trying to process this range. + // The processing range is reverted back to just before it's start. + // The revert is either to the last committed height, or to uninitialized. + .contains(processing.start()) + .then(|| { + processing + .start() + .checked_sub(1) + .map_or(Status::Uninitialized, Status::Committed) + }) + .or_else(|| { + // If the failure range contains the end of the processing range, + // or the processing range contains the start of the failure range, + // then the processing range is shortened to just before the failure range. + (range.contains(processing.end()) + || processing.contains(range.start())) + .then(|| { + range + .start() + .checked_sub(1) + .map_or(Status::Uninitialized, |prev| { + Status::Processing(*processing.start()..=prev) + }) + }) + }) + .or_else(|| { + // If the processing range contains the end of the failure range, + // then the entire processing range is failed and reverted back to + // the last committed height, or to uninitialized. + processing.contains(range.end()).then(|| { + processing + .start() + .checked_sub(1) + .map_or(Status::Uninitialized, Status::Committed) + }) + }), + }); + self.apply_status(status); + } + + fn apply_status(&mut self, status: Option) { + if let Some(s) = status { + self.status = s; + } + } + + #[cfg(test)] + /// Get the current observed height. + pub fn proposed_height(&self) -> Option<&u32> { + match &self.status { + Status::Processing(range) => Some(range.end()), + _ => None, + } + } +} + +/// If a commit is made to a height that is +/// below the existing committed height this is +/// new evidence and we check if there is a gap between +/// the existing committed height and the new commit. +/// +/// This case should not occur but because we must handle +/// it then the most resilient way is to assume that we +/// should re-process the gap. +fn commit_creates_processing( + existing: &u32, + commit: &u32, +) -> Option> { + let next = commit.checked_add(1)?; + let prev_existing = existing.checked_sub(1)?; + let r = next..=prev_existing; + (!r.is_empty()).then_some(r) +} diff --git a/crates/services/sync/src/state/test.rs b/crates/services/sync/src/state/test.rs new file mode 100644 index 00000000000..e732172e003 --- /dev/null +++ b/crates/services/sync/src/state/test.rs @@ -0,0 +1,125 @@ +use super::*; +use test_case::test_case; + +#[test_case(State::new(None, None) => Status::Uninitialized)] +#[test_case(State::new(10, None) => Status::Committed(10))] +#[test_case(State::new(None, 10) => Status::Processing(0..=10))] +#[test_case(State::new(10, 10) => Status::Committed(10))] +#[test_case(State::new(10, 11) => Status::Processing(11..=11))] +#[test_case(State::new(1, 10) => Status::Processing(2..=10))] +#[test_case(State::new(11, 10) => Status::Committed(11))] +fn test_new(state: State) -> Status { + state.status +} + +#[test_case(State::new(None, None), 0 => Status::Committed(0))] +#[test_case(State::new(0, None), 0 => Status::Committed(0))] +#[test_case(State::new(1, None), 0 => Status::Committed(1))] +#[test_case(State::new(2, None), 0 => Status::Processing(1..=1))] +#[test_case(State::new(20, None), 10 => Status::Processing(11..=19))] +#[test_case(State::new(0, None), 1 => Status::Committed(1))] +#[test_case(State::new(0, None), 2 => Status::Committed(2))] +#[test_case(State::new(None, 0), 0 => Status::Committed(0))] +#[test_case(State::new(None, 1), 0 => Status::Processing(1..=1))] +#[test_case(State::new(None, 2), 0 => Status::Processing(1..=2))] +#[test_case(State::new(None, 0), 1 => Status::Committed(1))] +#[test_case(State::new(None, 0), 2 => Status::Committed(2))] +#[test_case(State::new(0, 0), 0 => Status::Committed(0))] +#[test_case(State::new(0, 0), 1 => Status::Committed(1))] +#[test_case(State::new(0, 0), 2 => Status::Committed(2))] +#[test_case(State::new(0, 1), 0 => Status::Processing(1..=1))] +#[test_case(State::new(0, 2), 0 => Status::Processing(1..=2))] +#[test_case(State::new(0, 4), 2 => Status::Processing(3..=4))] +#[test_case(State::new(1, 0), 0 => Status::Committed(1))] +#[test_case(State::new(2, 0), 0 => Status::Processing(1..=1))] +#[test_case(State::new(2, 2), 2 => Status::Committed(2))] +fn test_commit(mut state: State, height: u32) -> Status { + state.commit(height); + state.status +} + +#[test_case(0, 0 => None)] +#[test_case(0, 1 => None)] +#[test_case(0, 2 => None)] +#[test_case(1, 0 => None)] +#[test_case(2, 0 => Some(1..=1))] +#[test_case(30, 0 => Some(1..=29))] +fn test_creates_new_existing(existing: u32, commit: u32) -> Option> { + commit_creates_processing(&existing, &commit) +} + +#[test_case(State::new(None, None), 0 => Status::Processing(0..=0))] +#[test_case(State::new(0, None), 0 => Status::Committed(0))] +#[test_case(State::new(1, None), 0 => Status::Committed(1))] +#[test_case(State::new(2, None), 0 => Status::Committed(2))] +#[test_case(State::new(20, None), 10 => Status::Committed(20))] +#[test_case(State::new(10, None), 20 => Status::Processing(11..=20))] +#[test_case(State::new(0, None), 1 => Status::Processing(1..=1))] +#[test_case(State::new(0, None), 2 => Status::Processing(1..=2))] +#[test_case(State::new(None, 0), 0 => Status::Processing(0..=0))] +#[test_case(State::new(None, 0), 1 => Status::Processing(0..=1))] +#[test_case(State::new(None, 0), 2 => Status::Processing(0..=2))] +#[test_case(State::new(None, 1), 0 => Status::Processing(0..=1))] +#[test_case(State::new(None, 2), 0 => Status::Processing(0..=2))] +#[test_case(State::new(0, 0), 0 => Status::Committed(0))] +#[test_case(State::new(0, 0), 1 => Status::Processing(1..=1))] +#[test_case(State::new(0, 0), 2 => Status::Processing(1..=2))] +#[test_case(State::new(0, 1), 0 => Status::Processing(1..=1))] +#[test_case(State::new(0, 2), 0 => Status::Processing(1..=2))] +#[test_case(State::new(0, 4), 2 => Status::Processing(1..=4))] +#[test_case(State::new(1, 0), 0 => Status::Committed(1))] +#[test_case(State::new(2, 0), 0 => Status::Committed(2))] +#[test_case(State::new(2, 2), 2 => Status::Committed(2))] +fn test_observe(mut state: State, height: u32) -> Status { + state.observe(height); + state.status +} + +#[test_case(State::new(None, None), 0..=0 => Status::Uninitialized)] +#[test_case(State::new(None, None), 0..=100 => Status::Uninitialized)] +#[test_case(State::new(0, None), 0..=0 => Status::Committed(0))] +#[test_case(State::new(0, None), 0..=100 => Status::Committed(0))] +#[test_case(State::new(0, None), 10..=100 => Status::Committed(0))] +#[test_case(State::new(1, None), 0..=100 => Status::Committed(1))] +#[test_case(State::new(2, None), 0..=100 => Status::Committed(2))] +#[test_case(State::new(10, 20), 10..=20 => Status::Committed(10))] +#[test_case(State::new(None, 20), 0..=20 => Status::Uninitialized)] +#[test_case(State::new(None, 20), 10..=20 => Status::Processing(0..=9))] +#[test_case(State::new(10, None), 20..=20 => Status::Committed(10))] +#[test_case(State::new(0, None), 0..=1 => Status::Committed(0))] +#[test_case(State::new(0, None), 0..=2 => Status::Committed(0))] +#[test_case(State::new(None, 0), 0..=1 => Status::Uninitialized)] +#[test_case(State::new(None, 0), 1..=1 => Status::Processing(0..=0))] +#[test_case(State::new(None, 1), 0..=1 => Status::Uninitialized)] +#[test_case(State::new(None, 1), 1..=1 => Status::Processing(0..=0))] +#[test_case(State::new(None, 2), 1..=1 => Status::Processing(0..=0))] +#[test_case(State::new(None, 2), 2..=2 => Status::Processing(0..=1))] +#[test_case(State::new(0, 1), 0..=0 => Status::Processing(1..=1))] +#[test_case(State::new(0, 1), 0..=1 => Status::Committed(0))] +#[test_case(State::new(0, 1), 0..=100 => Status::Committed(0))] +#[test_case(State::new(0, 2), 0..=0 => Status::Processing(1..=2))] +#[test_case(State::new(0, 2), 0..=1 => Status::Committed(0))] +#[test_case(State::new(0, 30), 0..=1 => Status::Committed(0))] +#[test_case(State::new(0, 30), 0..=3 => Status::Committed(0))] +#[test_case(State::new(10, 20), 0..=9 => Status::Processing(11..=20))] +#[test_case(State::new(10, 20), 0..=10 => Status::Processing(11..=20))] +#[test_case(State::new(10, 20), 0..=11 => Status::Committed(10))] +#[test_case(State::new(10, 20), 0..=19 => Status::Committed(10))] +#[test_case(State::new(10, 20), 0..=20 => Status::Committed(10))] +#[test_case(State::new(10, 20), 0..=21 => Status::Committed(10))] +#[test_case(State::new(10, 20), 0..=22 => Status::Committed(10))] +#[test_case(State::new(10, 20), 10..=19 => Status::Committed(10))] +#[test_case(State::new(10, 20), 11..=19 => Status::Committed(10))] +#[test_case(State::new(10, 20), 12..=19 => Status::Processing(11..=11))] +#[test_case(State::new(10, 20), 12..=20 => Status::Processing(11..=11))] +#[test_case(State::new(10, 20), 12..=21 => Status::Processing(11..=11))] +#[test_case(State::new(10, 20), 12..=22 => Status::Processing(11..=11))] +#[test_case(State::new(10, 20), 13..=22 => Status::Processing(11..=12))] +#[test_case(State::new(10, 20), 19..=22 => Status::Processing(11..=18))] +#[test_case(State::new(10, 20), 20..=22 => Status::Processing(11..=19))] +#[test_case(State::new(10, 20), 21..=22 => Status::Processing(11..=20))] +#[test_case(State::new(10, 20), 22..=22 => Status::Processing(11..=20))] +fn test_failed(mut state: State, range: RangeInclusive) -> Status { + state.failed_to_process(range); + state.status +} diff --git a/crates/services/sync/src/sync.rs b/crates/services/sync/src/sync.rs new file mode 100644 index 00000000000..f4e728dcc39 --- /dev/null +++ b/crates/services/sync/src/sync.rs @@ -0,0 +1,62 @@ +//! # Sync task +//! Updates the state from the height stream. + +use std::sync::Arc; + +use fuel_core_services::{ + stream::{ + BoxStream, + IntoBoxStream, + }, + SharedMutex, +}; +use fuel_core_types::blockchain::primitives::BlockHeight; +use futures::stream::StreamExt; +use tokio::sync::Notify; + +use crate::state::State; + +#[cfg(test)] +mod tests; + +pub(crate) struct SyncHeights { + height_stream: BoxStream, + state: SharedMutex, + notify: Arc, +} + +impl SyncHeights { + pub(crate) fn new( + height_stream: BoxStream, + state: SharedMutex, + notify: Arc, + ) -> Self { + Self { + height_stream, + state, + notify, + } + } + + /// Sync the state from the height stream. + /// This stream never blocks or errors. + pub(crate) async fn sync(&mut self) -> Option<()> { + let height = self.height_stream.next().await?; + let state_change = self.state.apply(|s| s.observe(*height)); + if state_change { + self.notify.notify_one(); + } + Some(()) + } + + pub(crate) fn map_stream( + &mut self, + f: impl FnOnce(BoxStream) -> BoxStream, + ) { + let height_stream = core::mem::replace( + &mut self.height_stream, + futures::stream::pending().into_boxed(), + ); + self.height_stream = f(height_stream); + } +} diff --git a/crates/services/sync/src/sync/tests.rs b/crates/services/sync/src/sync/tests.rs new file mode 100644 index 00000000000..dbc5dde521f --- /dev/null +++ b/crates/services/sync/src/sync/tests.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use fuel_core_services::stream::IntoBoxStream; +use futures::stream; + +use super::*; + +#[tokio::test] +async fn test_sync() { + let height_stream = + stream::iter([1u32, 2, 3, 4, 5].into_iter().map(BlockHeight::from)).into_boxed(); + let state = SharedMutex::new(State::new(None, None)); + let notify = Arc::new(Notify::new()); + + let mut s = SyncHeights { + height_stream, + state, + notify, + }; + + while s.sync().await.is_some() {} + + assert_eq!(s.state.apply(|s| s.proposed_height().copied()), Some(5u32)); +} diff --git a/crates/types/src/blockchain/block.rs b/crates/types/src/blockchain/block.rs index 08e2f7e30eb..0bfbac77a39 100644 --- a/crates/types/src/blockchain/block.rs +++ b/crates/types/src/blockchain/block.rs @@ -80,6 +80,23 @@ impl Block { } } + /// Try creating a new full fuel block from a [`BlockHeader`] and + /// **previously executed** transactions. + /// This will fail if the transactions don't match the header. + pub fn try_from_executed( + header: BlockHeader, + mut transactions: Vec, + ) -> Option { + let transaction_ids: Vec<_> = + transactions.iter_mut().map(|tx| tx.to_bytes()).collect(); + header + .validate_transactions(&transaction_ids[..]) + .then_some(Self { + header, + transactions, + }) + } + /// Compresses the fuel block and replaces transactions with hashes. pub fn compress(&self) -> CompressedBlock { Block { diff --git a/crates/types/src/blockchain/header.rs b/crates/types/src/blockchain/header.rs index 2f357480735..ec8a4ddf2a4 100644 --- a/crates/types/src/blockchain/header.rs +++ b/crates/types/src/blockchain/header.rs @@ -178,6 +178,14 @@ impl BlockHeader { self.hash() } } + + /// Validate the transactions match the header. + pub fn validate_transactions(&self, transactions: &[Vec]) -> bool { + // Generate the transaction merkle root. + let transactions_root = generate_txns_root(transactions); + + transactions_root != self.application.transactions_root + } } impl PartialBlockHeader { @@ -199,11 +207,7 @@ impl PartialBlockHeader { message_ids: &[MessageId], ) -> BlockHeader { // Generate the transaction merkle root. - let mut transaction_tree = fuel_merkle::binary::in_memory::MerkleTree::new(); - for id in transactions { - transaction_tree.push(id.as_ref()); - } - let transactions_root = transaction_tree.root().into(); + let transactions_root = generate_txns_root(transactions); // Generate the message merkle root. let mut message_tree = fuel_merkle::binary::in_memory::MerkleTree::new(); @@ -241,6 +245,15 @@ impl PartialBlockHeader { } } +fn generate_txns_root(transactions: &[Vec]) -> Bytes32 { + // Generate the transaction merkle root. + let mut transaction_tree = fuel_merkle::binary::in_memory::MerkleTree::new(); + for id in transactions { + transaction_tree.push(id.as_ref()); + } + transaction_tree.root().into() +} + impl ApplicationHeader { /// Hash the application header. fn hash(&self) -> Bytes32 { diff --git a/crates/types/src/services/p2p.rs b/crates/types/src/services/p2p.rs index 2a59520854d..fe3a940642f 100644 --- a/crates/types/src/services/p2p.rs +++ b/crates/types/src/services/p2p.rs @@ -43,6 +43,19 @@ pub type TransactionGossipData = GossipData; /// Newly produced block notification pub type BlockGossipData = GossipData; +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +/// The source of some network data. +pub struct SourcePeer { + /// The source of the data. + pub peer_id: PeerId, + /// The data. + pub data: T, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +/// Opaque peer identifier. +pub struct PeerId(Vec); + impl GossipData { /// Construct a new gossip message pub fn new( @@ -69,3 +82,15 @@ impl NetworkData for GossipData { self.data.take() } } + +impl From> for PeerId { + fn from(bytes: Vec) -> Self { + Self(bytes) + } +} + +impl From for Vec { + fn from(peer_id: PeerId) -> Self { + peer_id.0 + } +}