Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: feat(sync): headers stage #58

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 138 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/db/src/kv/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ impl<'tx, T: Table> Cursor<'tx, RW, T> {
.put(k.encode().as_ref(), v.encode().as_ref(), f.unwrap_or_default())
.map_err(KVError::Put)
}

/// Deletes the current `(key, value)` entry on `table` that the cursor is positioned at.
pub fn delete(&mut self) -> Result<(), KVError> {
self.inner.del(WriteFlags::CURRENT).map_err(KVError::Delete)
}
}

impl<'txn, K, T> Cursor<'txn, K, T>
Expand Down
4 changes: 2 additions & 2 deletions crates/db/src/kv/models/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ pub type HeaderHash = H256;
/// element as BlockNumber, helps out with querying/sorting.
///
/// Since it's used as a key, the `BlockNumber` is not compressed when encoding it.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
#[allow(non_camel_case_types)]
pub struct BlockNumHash((BlockNumber, BlockHash));
pub struct BlockNumHash(pub (BlockNumber, BlockHash));

impl BlockNumHash {
/// Consumes `Self` and returns [`BlockNumber`], [`BlockHash`]
Expand Down
1 change: 1 addition & 0 deletions crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ async-trait = "0.1.57"
thiserror = "1.0.37"
auto_impl = "1.0"
tokio = { version = "1.21.2", features = ["sync"] }
futures = "0.3"
13 changes: 9 additions & 4 deletions crates/interfaces/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use async_trait::async_trait;
use reth_primitives::Header;
use reth_primitives::{Header, H256};
use reth_rpc_types::engine::ForkchoiceState;
use std::fmt::Debug;
use thiserror::Error;
use tokio::sync::watch::Receiver;

/// Consensus is a protocol that chooses canonical chain.
/// We are checking validity of block header here.
#[async_trait]
pub trait Consensus {
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Consensus: Sync + Send + Debug {
/// Get a receiver for the fork choice state
fn fork_choice_state(&self) -> Receiver<ForkchoiceState>;
fn forkchoice_state(&self) -> Receiver<ForkchoiceState>;

/// Return the current chain tip
fn tip(&self) -> H256;

/// Validate if header is correct and follows consensus specification
fn validate_header(&self, _header: &Header) -> Result<(), Error> {
fn validate_header(&self, _header: &Header, _parent: &Header) -> Result<(), Error> {
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub mod executor;
/// Consensus traits.
pub mod consensus;

/// Staged sync related traits
pub mod stages;

/// Traits that provide chain access.
pub mod provider;

Expand Down
1 change: 1 addition & 0 deletions crates/interfaces/src/provider/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub trait BlockProvider: Send + Sync + 'static {
BlockNumber::Earliest => 0,
BlockNumber::Pending => return Ok(None),
BlockNumber::Number(num) => num.as_u64(),
BlockNumber::Finalized | BlockNumber::Safe => todo!(),
};
Ok(Some(num))
}
Expand Down
33 changes: 33 additions & 0 deletions crates/interfaces/src/stages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use async_trait::async_trait;
use futures::Stream;
use reth_primitives::{rpc::BlockId, Header, H256, H512};
use std::{collections::HashSet, fmt::Debug, pin::Pin};

/// The stream of messages
pub type MessageStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;

/// The header request struct
#[derive(Debug)]
pub struct HeaderRequest {
/// The starting block
pub start: BlockId,
/// The response max size
pub limit: u64,
/// Flag indicating whether the blocks should
/// arrive in reverse
pub reverse: bool,
}

/// The block headers downloader client
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: Send + Sync + Debug {
/// Update the current node status
async fn update_status(&self, height: u64, hash: H256, td: H256);

/// Send the header request
async fn send_header_request(&self, id: u64, request: HeaderRequest) -> HashSet<H512>;

/// Stream the header response messages
async fn stream_headers(&self) -> MessageStream<(u64, Vec<Header>)>;
}
2 changes: 1 addition & 1 deletion crates/net/rpc-types/src/eth/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct ExecutionPayload {
}

/// This structure encapsulates the fork choice state
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ForkchoiceState {
pub head_block_hash: H256,
Expand Down
1 change: 1 addition & 0 deletions crates/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features =
bytes = "1.2"

serde = "1.0"
ethereum-types = { version = "0.13.1", default-features = false }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's missing from primitives that makes this necessary?

thiserror = "1"
reth-rlp = { path = "../common/rlp", features = ["std", "derive", "ethereum-types"]}
parity-scale-codec = { version = "3.2.1", features = ["derive", "bytes"] }
Expand Down
7 changes: 7 additions & 0 deletions crates/primitives/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ impl Deref for HeaderLocked {
}

impl HeaderLocked {
/// Construct a new locked header.
/// Applicable when hash is known from
/// the database provided it's not corrupted.
pub fn new(header: Header, hash: H256) -> Self {
Self { header, hash }
}

/// Extract raw header that can be modified.
pub fn unlock(self) -> Header {
self.header
Expand Down
3 changes: 3 additions & 0 deletions crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ pub use ethers_core::{
types as rpc,
types::{Bloom, Bytes, H128, H160, H256, H512, H64, U128, U256, U64},
};

// For uint to hash conversion
pub use ethereum_types::BigEndianHash;
Comment on lines +53 to +54
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, will add this to primitives

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks ❤️‍🔥

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkrasiuk this is done^

21 changes: 17 additions & 4 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,29 @@ description = "Staged syncing primitives used in reth."

[dependencies]
reth-primitives = { path = "../primitives" }
reth-interfaces = { path = "../interfaces" }
reth-rpc-types = { path = "../net/rpc-types" }
reth-db = { path = "../db" }
async-trait = "0.1.57"
thiserror = "1.0.37"
tracing = "0.1.36"
tracing-futures = "0.2.5"
tokio = { version = "1.21.2", features = ["sync"] }
rand = "0.8" # TODO:
tokio-stream = "0.1.11"
aquamarine = "0.1.12"

# async/futures
async-trait = "0.1.57"
futures = "0.3"

# error handling
thiserror = "1.0.37"
anyhow = "1.0.65"
rkrasiuk marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"
tokio-stream = { version = "0.1.11", features = ["sync"] }
tempfile = "3.3.0"
reth-db = { path = "../db", features = ["test-utils"] }
reth-db = { path = "../db", features = ["test-utils"] }
reth-rpc-types = { path = "../net/rpc-types" }
assert_matches = "1.5.0"
once_cell = "1.15.0"
6 changes: 3 additions & 3 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::pipeline::PipelineEvent;
use crate::{pipeline::PipelineEvent, Stage};
use reth_db::kv::KVError;
use reth_primitives::BlockNumber;
use thiserror::Error;
Expand All @@ -20,7 +20,7 @@ pub enum StageError {
Database(#[from] KVError),
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Internal(#[from] anyhow::Error),
}

/// A pipeline execution error.
Expand All @@ -37,5 +37,5 @@ pub enum PipelineError {
Channel(#[from] SendError<PipelineEvent>),
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Internal(#[from] anyhow::Error),
}
2 changes: 2 additions & 0 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ mod error;
mod id;
mod pipeline;
mod stage;
mod stages;
mod util;

pub use error::*;
pub use id::*;
pub use pipeline::*;
pub use stage::*;
pub use stages::*;
6 changes: 3 additions & 3 deletions crates/stages/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ mod tests {
pub(crate) struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, Box<dyn Error + Send + Sync>>>,
unwind_outputs: VecDeque<Result<UnwindOutput, anyhow::Error>>,
}

impl TestStage {
Expand All @@ -764,7 +764,7 @@ mod tests {

pub(crate) fn add_unwind(
mut self,
output: Result<UnwindOutput, Box<dyn Error + Send + Sync>>,
output: Result<UnwindOutput, anyhow::Error>,
) -> Self {
self.unwind_outputs.push_back(output);
self
Expand Down Expand Up @@ -797,7 +797,7 @@ mod tests {
&mut self,
_: &mut Tx<'tx, mdbx::RW, E>,
_: UnwindInput,
) -> Result<UnwindOutput, Box<dyn Error + Send + Sync>>
) -> Result<UnwindOutput, anyhow::Error>
where
'db: 'tx,
{
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
&mut self,
tx: &mut Tx<'tx, mdbx::RW, E>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>
) -> Result<UnwindOutput, anyhow::Error>
where
'db: 'tx;
}
100 changes: 100 additions & 0 deletions crates/stages/src/stages/headers/downloader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use async_trait::async_trait;
use reth_interfaces::{
consensus::Consensus,
stages::{HeaderRequest, HeadersClient, MessageStream},
};
use reth_primitives::{rpc::BlockId, Header, HeaderLocked, H256};
use reth_rpc_types::engine::ForkchoiceState;
use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use tokio_stream::StreamExt;

/// The header downloading strategy
#[async_trait]
pub trait Downloader: Sync + Send + Debug {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;
/// The Client used to download the headers
type Client: HeadersClient;

/// The request timeout in seconds
fn timeout(&self) -> u64;

/// The consensus engine
fn consensus(&self) -> &Self::Consensus;

/// The headers client
fn client(&self) -> &Self::Client;

/// Download the headers
async fn download(
&self,
head: &HeaderLocked,
forkchoice: &ForkchoiceState,
) -> Result<Vec<HeaderLocked>, DownloadError>;

/// Perform a header request. Return the request ID
async fn download_headers(
&self,
stream: &mut MessageStream<(u64, Vec<Header>)>,
start: BlockId,
limit: u64,
) -> Result<Vec<Header>, DownloadError> {
let request_id = rand::random();
let request = HeaderRequest { start, limit, reverse: true };
let _ = self.client().send_header_request(request_id, request).await;

// Filter stream by request id and non empty headers content
let stream = stream.filter(|(id, headers)| request_id == *id && !headers.is_empty());

// Wrap the stream with a timeout
let stream = stream.timeout(Duration::from_secs(self.timeout()));
match Box::pin(stream).try_next().await {
Ok(Some((_, h))) => Ok(h),
_ => return Err(DownloadError::NoHeaderResponse { request_id }),
}
}

/// Validate whether the header is valid in relation to it's parent
fn validate(
&self,
header: &HeaderLocked,
parent: &HeaderLocked,
) -> Result<bool, DownloadError> {
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
return Ok(false)
}

self.consensus().validate_header(&header, &parent).map_err(|e| {
DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() }
})?;
Ok(true)
}
}

/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {details}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
details: String,
},
/// No headers reponse received
#[error("Failed to get headers for request {request_id}.")]
NoHeaderResponse {
/// The last request ID
request_id: u64,
},
}

impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::NoHeaderResponse { .. })
}
}
Loading