Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use new aquamarine macro #5785

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ boa_engine = "0.17"
boa_gc = "0.17"

# misc
aquamarine = "0.3"
aquamarine = "0.4"
bytes = "1.5"
bitflags = "2.4"
clap = "4"
Expand Down
51 changes: 18 additions & 33 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,40 +37,25 @@ use tracing::{debug, error, info, instrument, trace, warn};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A Tree of chains.
///
/// Mermaid flowchart represents all the states a block can have inside the blockchaintree.
/// Green blocks belong to canonical chain and are saved inside then database, they are our main
/// chain. Pending blocks and sidechains are found in memory inside [`BlockchainTree`].
/// Both pending and sidechains have same mechanisms only difference is when they get committed to
/// the database. For pending it is an append operation but for sidechains they need to move current
/// canonical blocks to BlockchainTree and commit the sidechain to the database to become canonical
/// chain (reorg). ```mermaid
/// flowchart BT
/// subgraph canonical chain
/// CanonState:::state
/// block0canon:::canon -->block1canon:::canon -->block2canon:::canon -->block3canon:::canon -->
/// block4canon:::canon --> block5canon:::canon end
/// block5canon --> block6pending1:::pending
/// block5canon --> block6pending2:::pending
/// subgraph sidechain2
/// S2State:::state
/// block3canon --> block4s2:::sidechain --> block5s2:::sidechain
/// end
/// subgraph sidechain1
/// S1State:::state
/// block2canon --> block3s1:::sidechain --> block4s1:::sidechain --> block5s1:::sidechain -->
/// block6s1:::sidechain end
/// classDef state fill:#1882C4
/// classDef canon fill:#8AC926
/// classDef pending fill:#FFCA3A
/// classDef sidechain fill:#FF595E
/// ```
///
/// The flowchart represents all the states a block can have inside the tree.
///
/// main functions:
/// * [BlockchainTree::insert_block]: Connect block to chain, execute it and if valid insert block
/// into the tree.
/// * [BlockchainTree::finalize_block]: Remove chains that are branch off the now finalized block.
/// * [BlockchainTree::make_canonical]: Check if we have the hash of block that is the current
/// - Green blocks belong to the canonical chain and are saved inside the database.
/// - Pending blocks and sidechains are found in-memory inside [`BlockchainTree`].
///
/// Both pending chains and sidechains have the same mechanisms, the only difference is when they
/// get committed to the database.
///
/// For pending, it is an append operation, but for sidechains they need to move the current
/// canonical blocks to the tree (by removing them from the database), and commit the sidechain
/// blocks to the database to become the canonical chain (reorg).
///
/// include_mmd!("docs/mermaid/tree.mmd")
///
/// # Main functions
/// * [BlockchainTree::insert_block]: Connect a block to a chain, execute it, and if valid, insert
/// the block into the tree.
/// * [BlockchainTree::finalize_block]: Remove chains that branch off of the now finalized block.
/// * [BlockchainTree::make_canonical]: Check if we have the hash of a block that is the current
/// canonical head and commit it to db.
#[derive(Debug)]
pub struct BlockchainTree<DB: Database, EF: ExecutorFactory> {
Expand Down
37 changes: 3 additions & 34 deletions crates/net/network/src/fetch/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,13 @@ use std::sync::{
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Front-end API for fetching data from the network.
///
/// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and
/// [`BodiesClient::get_block_bodies`] is handled internally.
#[cfg_attr(doc, aquamarine::aquamarine)]
/// ```mermaid
/// sequenceDiagram
// participant Client as FetchClient
// participant Fetcher as StateFetcher
// participant State as NetworkState
// participant Session as Active Peer Session
// participant Peers as PeerManager
// loop Send Request, retry if retriable and remaining retries
// Client->>Fetcher: DownloadRequest{GetHeaders, GetBodies}
// Note over Client,Fetcher: Request and oneshot Sender sent via `request_tx` channel
// loop Process buffered requests
// State->>Fetcher: poll action
// Fetcher->>Fetcher: Select Available Peer
// Note over Fetcher: Peer is available if it's currently idle, no inflight requests
// Fetcher->>State: FetchAction::BlockDownloadRequest
// State->>Session: Delegate Request
// Note over State,Session: Request and oneshot Sender sent via `to_session_tx` channel
// end
// Session->>Session: Send Request to remote
// Session->>Session: Enforce Request timeout
// Session-->>State: Send Response Result via channel
// State->>Fetcher: Delegate Response
// Fetcher-->>Client: Send Response via channel
// opt Bad Response
// Client->>Peers: Penalize Peer
// end
// Peers->>Peers: Apply Reputation Change
// opt reputation dropped below threshold
// Peers->>State: Disconnect Session
// State->>Session: Delegate Disconnect
// end
// end
/// ```
///
/// include_mmd!("docs/mermaid/fetch-client.mmd")
#[derive(Debug, Clone)]
pub struct FetchClient {
/// Sender half of the request channel.
Expand Down
32 changes: 6 additions & 26 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,34 +60,14 @@ use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Manages the _entire_ state of the network.
///
/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
///
/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
#[cfg_attr(doc, aquamarine::aquamarine)]
/// ```mermaid
/// graph TB
/// handle(NetworkHandle)
/// events(NetworkEvents)
/// transactions(Transactions Task)
/// ethrequest(ETH Request Task)
/// discovery(Discovery Task)
/// subgraph NetworkManager
/// direction LR
/// subgraph Swarm
/// direction TB
/// B1[(Session Manager)]
/// B2[(Connection Lister)]
/// B3[(Network State)]
/// end
/// end
/// handle <--> |request response channel| NetworkManager
/// NetworkManager --> |Network events| events
/// transactions <--> |transactions| NetworkManager
/// ethrequest <--> |ETH request handing| NetworkManager
/// discovery --> |Discovered peers| NetworkManager
/// ```
///
/// include_mmd!("docs/mermaid/network-manager.mmd")
#[derive(Debug)]
#[must_use = "The NetworkManager does nothing unless polled"]
pub struct NetworkManager<C> {
Expand Down Expand Up @@ -543,7 +523,7 @@ where
if self.handle.mode().is_stake() {
// See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
return
return;
}
let msg = NewBlockMessage { hash, block: Arc::new(block) };
self.swarm.state_mut().announce_new_block(msg);
Expand Down Expand Up @@ -638,7 +618,7 @@ where
// This is only possible if the channel was deliberately closed since we always
// have an instance of `NetworkHandle`
error!("Network message channel closed.");
return Poll::Ready(())
return Poll::Ready(());
}
Poll::Ready(Some(msg)) => this.on_handle_message(msg),
};
Expand Down Expand Up @@ -909,7 +889,7 @@ where
if budget == 0 {
// make sure we're woken up again
cx.waker().wake_by_ref();
break
break;
}
}

Expand Down
43 changes: 14 additions & 29 deletions crates/net/network/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
};
use tracing::trace;

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Contains the connectivity related state of the network.
///
/// A swarm emits [`SwarmEvent`]s when polled.
Expand All @@ -44,24 +45,8 @@ use tracing::trace;
/// request channel for the created session and sends requests it receives from the
/// [`StateFetcher`], which receives request objects from the client interfaces responsible for
/// downloading headers and bodies.
#[cfg_attr(doc, aquamarine::aquamarine)]
/// ```mermaid
/// graph TB
/// connections(TCP Listener)
/// Discovery[(Discovery)]
/// fetchRequest(Client Interfaces)
/// Sessions[(SessionManager)]
/// SessionTask[(Peer Session)]
/// State[(State)]
/// StateFetch[(State Fetcher)]
/// connections --> |incoming| Sessions
/// State --> |initiate outgoing| Sessions
/// Discovery --> |update peers| State
/// Sessions --> |spawns| SessionTask
/// SessionTask <--> |handle state requests| State
/// fetchRequest --> |request Headers, Bodies| StateFetch
/// State --> |poll pending requests| StateFetch
/// ```
///
/// include_mmd!("docs/mermaid/swarm.mmd")
#[derive(Debug)]
#[must_use = "Swarm does nothing unless polled"]
pub(crate) struct Swarm<C> {
Expand Down Expand Up @@ -207,7 +192,7 @@ where
ListenerEvent::Incoming { stream, remote_addr } => {
// Reject incoming connection if node is shutting down.
if self.is_shutting_down() {
return None
return None;
}
// ensure we can handle an incoming connection from this address
if let Err(err) =
Expand All @@ -225,13 +210,13 @@ where
);
}
}
return None
return None;
}

match self.sessions.on_incoming(stream, remote_addr) {
Ok(session_id) => {
trace!(target: "net", ?remote_addr, "Incoming connection");
return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr });
}
Err(err) => {
trace!(target: "net", ?err, "Incoming connection rejected, capacity already reached.");
Expand All @@ -250,7 +235,7 @@ where
match event {
StateAction::Connect { remote_addr, peer_id } => {
self.dial_outbound(remote_addr, peer_id);
return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id });
}
StateAction::Disconnect { peer_id, reason } => {
self.sessions.disconnect(peer_id, reason);
Expand All @@ -268,7 +253,7 @@ where
StateAction::DiscoveredNode { peer_id, socket_addr, fork_id } => {
// Don't try to connect to peer if node is shutting down
if self.is_shutting_down() {
return None
return None;
}
// Insert peer only if no fork id or a valid fork id
if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
Expand Down Expand Up @@ -317,7 +302,7 @@ where
loop {
while let Poll::Ready(action) = this.state.poll(cx) {
if let Some(event) = this.on_state_action(action) {
return Poll::Ready(Some(event))
return Poll::Ready(Some(event));
}
}

Expand All @@ -326,9 +311,9 @@ where
Poll::Pending => {}
Poll::Ready(event) => {
if let Some(event) = this.on_session_event(event) {
return Poll::Ready(Some(event))
return Poll::Ready(Some(event));
}
continue
continue;
}
}

Expand All @@ -337,13 +322,13 @@ where
Poll::Pending => {}
Poll::Ready(event) => {
if let Some(event) = this.on_connection(event) {
return Poll::Ready(Some(event))
return Poll::Ready(Some(event));
}
continue
continue;
}
}

return Poll::Pending
return Poll::Pending;
}
}
}
Expand Down
Loading
Loading