Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(node)!: Hide internal components #342

Merged
merged 9 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::{BroadcastChannel, MessageEvent, SharedWorker};

use lumina_node::node::Node;
use lumina_node::node::{Node, SyncingInfo};
use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store};
use lumina_node::syncer::SyncingInfo;

use crate::error::{Context, Error, Result};
use crate::node::WasmNodeConfig;
Expand Down
3 changes: 1 addition & 2 deletions node-wasm/src/worker/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use tracing::error;
use wasm_bindgen::{JsError, JsValue};

use celestia_types::hash::Hash;
use lumina_node::peer_tracker::PeerTrackerInfo;
use lumina_node::node::{PeerTrackerInfo, SyncingInfo};
use lumina_node::store::SamplingMetadata;
use lumina_node::syncer::SyncingInfo;

use crate::error::Error;
use crate::error::Result;
Expand Down
6 changes: 4 additions & 2 deletions node/src/block_ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::ops::{Add, RangeInclusive, Sub};
use serde::Serialize;
use smallvec::SmallVec;

/// Type alias to `RangeInclusive<u64>`.
/// Type alias of [`RangeInclusive<u64>`].
///
/// [`RangeInclusive<u64>`]: std::ops::RangeInclusive
pub type BlockRange = RangeInclusive<u64>;

/// Errors that can be produced by `BlockRanges`.
/// Errors that can be produced by [`BlockRanges`].
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum BlockRangesError {
/// Block ranges must be sorted.
Expand Down
22 changes: 11 additions & 11 deletions node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,39 @@ const SAMPLING_WINDOW: Duration = Duration::from_secs(30 * DAY);

type Result<T, E = DaserError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with the [`Daser`].
/// Representation of all the errors that can occur in `Daser` component.
#[derive(Debug, thiserror::Error)]
pub enum DaserError {
/// An error propagated from the [`P2p`] module.
/// An error propagated from the `P2p` component.
#[error("P2p: {0}")]
P2p(#[from] P2pError),

/// An error propagated from the [`Store`] module.
/// An error propagated from the [`Store`] component.
#[error("Store: {0}")]
Store(#[from] StoreError),
}

/// Component responsible for data availability sampling of blocks from the network.
pub struct Daser {
pub(crate) struct Daser {
cancellation_token: CancellationToken,
}

/// Arguments used to configure the [`Daser`].
pub struct DaserArgs<S>
pub(crate) struct DaserArgs<S>
where
S: Store,
{
/// Handler for the peer to peer messaging.
pub p2p: Arc<P2p>,
pub(crate) p2p: Arc<P2p>,
/// Headers storage.
pub store: Arc<S>,
pub(crate) store: Arc<S>,
/// Event publisher.
pub event_pub: EventPublisher,
pub(crate) event_pub: EventPublisher,
}

impl Daser {
/// Create and start the [`Daser`].
pub fn start<S>(args: DaserArgs<S>) -> Result<Self>
pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
where
S: Store + 'static,
{
Expand All @@ -109,8 +109,8 @@ impl Daser {
Ok(Daser { cancellation_token })
}

/// Stop the [`Daser`].
pub fn stop(&self) {
/// Stop the worker.
pub(crate) fn stop(&self) {
// Singal the Worker to stop.
// TODO: Should we wait for the Worker to stop?
self.cancellation_token.cancel();
Expand Down
20 changes: 8 additions & 12 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use tokio::sync::broadcast;

const EVENT_CHANNEL_CAPACITY: usize = 1024;

/// An error returned from the `EventReceiver::recv`.
/// An error returned from the [`EventSubscriber::recv`].
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
/// Node and all its event senders are closed.
#[error("Event channel closed")]
Closed,
}

/// An error returned from the `EventReceiver::try_recv`.
/// An error returned from the [`EventSubscriber::try_recv`].
#[derive(Debug, thiserror::Error)]
pub enum TryRecvError {
/// The event channel is currently empty.
Expand All @@ -32,15 +32,15 @@ pub enum TryRecvError {

/// A channel which users can subscribe for events.
#[derive(Debug)]
pub struct EventChannel {
pub(crate) struct EventChannel {
tx: broadcast::Sender<NodeEventInfo>,
}

/// `EventPublisher` is used to broadcast events generated by [`Node`] to [`EventSubscriber`]s.
///
/// [`Node`]: crate::node::Node
#[derive(Debug, Clone)]
pub struct EventPublisher {
pub(crate) struct EventPublisher {
tx: broadcast::Sender<NodeEventInfo>,
}

Expand All @@ -54,29 +54,24 @@ pub struct EventSubscriber {

impl EventChannel {
/// Create a new `EventChannel`.
pub fn new() -> EventChannel {
pub(crate) fn new() -> EventChannel {
let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
EventChannel { tx }
}

/// Creates a new [`EventPublisher`].
pub fn publisher(&self) -> EventPublisher {
pub(crate) fn publisher(&self) -> EventPublisher {
EventPublisher {
tx: self.tx.clone(),
}
}

/// Creates a new [`EventSubscriber`].
pub fn subscribe(&self) -> EventSubscriber {
pub(crate) fn subscribe(&self) -> EventSubscriber {
EventSubscriber {
rx: self.tx.subscribe(),
}
}

/// Returns if there are any active subscribers or not.
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl Default for EventChannel {
Expand All @@ -100,6 +95,7 @@ impl EventPublisher {
});
}

/// Returns if there are any active subscribers or not.
pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
Expand Down
11 changes: 7 additions & 4 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

pub mod block_ranges;
pub mod blockstore;
pub mod daser;
mod daser;
pub mod events;
mod executor;
pub mod network;
pub mod node;
pub mod p2p;
pub mod peer_tracker;
mod p2p;
mod peer_tracker;
pub mod store;
pub mod syncer;
mod syncer;
#[cfg(any(test, feature = "test-utils"))]
#[cfg_attr(docsrs, doc(cfg(feature = "test-utils")))]
pub mod test_utils;
mod utils;

#[cfg(all(target_arch = "wasm32", test))]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[doc(inline)]
pub use crate::node::{Node, NodeConfig, NodeError, Result};
50 changes: 30 additions & 20 deletions node/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! High-level integration of [`P2p`], [`Store`], [`Syncer`].
//! Node that connects to Celestia's P2P network.
//!
//! [`P2p`]: crate::p2p::P2p
//! [`Store`]: crate::store::Store
//! [`Syncer`]: crate::syncer::Syncer
//! Upon creation, `Node` will try to connect to Celestia's P2P network
//! and then proceed with synchronization and data sampling of the blocks.

use std::ops::RangeBounds;
use std::sync::Arc;
Expand All @@ -18,36 +17,44 @@ use libp2p::identity::Keypair;
use libp2p::swarm::NetworkInfo;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs, DaserError};
use crate::daser::{Daser, DaserArgs};
use crate::events::{EventChannel, EventSubscriber, NodeEvent};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
use crate::p2p::{P2p, P2pArgs};
use crate::store::{SamplingMetadata, Store, StoreError};
use crate::syncer::{Syncer, SyncerArgs, SyncerError, SyncingInfo};
use crate::syncer::{Syncer, SyncerArgs};

type Result<T, E = NodeError> = std::result::Result<T, E>;
pub use crate::daser::DaserError;
pub use crate::p2p::{HeaderExError, P2pError};
pub use crate::peer_tracker::PeerTrackerInfo;
pub use crate::syncer::{SyncerError, SyncingInfo};

/// Alias of [`Result`] with [`NodeError`] error type
///
/// [`Result`]: std::result::Result
pub type Result<T, E = NodeError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with the [`Node`].
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
/// An error propagated from the [`P2p`] module.
#[error(transparent)]
/// An error propagated from the `P2p` component.
#[error("P2p: {0}")]
P2p(#[from] P2pError),

/// An error propagated from the [`Syncer`] module.
#[error(transparent)]
/// An error propagated from the `Syncer` component.
#[error("Syncer: {0}")]
Syncer(#[from] SyncerError),

/// An error propagated from the [`Store`] module.
#[error(transparent)]
/// An error propagated from the [`Store`] component.
#[error("Store: {0}")]
Store(#[from] StoreError),

/// An error propagated from the [`Daser`] module.
#[error(transparent)]
/// An error propagated from the `Daser` component.
#[error("Daser: {0}")]
Daser(#[from] DaserError),
}

Expand Down Expand Up @@ -185,13 +192,16 @@ where
self.p2p.local_peer_id()
}

/// Get current [`PeerTracker`] info.
///
/// [`PeerTracker`]: crate::peer_tracker::PeerTracker
/// Get current [`PeerTrackerInfo`].
pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
self.p2p.peer_tracker_info().clone()
}

/// Get [`PeerTrackerInfo`] watcher.
pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
self.p2p.peer_tracker_info_watcher()
}

/// Wait until the node is connected to at least 1 peer.
pub async fn wait_connected(&self) -> Result<()> {
Ok(self.p2p.wait_connected().await?)
Expand Down
12 changes: 3 additions & 9 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;

type Result<T, E = P2pError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with [`P2p`].
/// Representation of all the errors that can occur in `P2p` component.
#[derive(Debug, thiserror::Error)]
pub enum P2pError {
/// Failed to initialize gossipsub behaviour.
Expand Down Expand Up @@ -163,7 +163,7 @@ impl From<oneshot::error::RecvError> for P2pError {

/// Component responsible for the peer to peer networking handling.
#[derive(Debug)]
pub struct P2p {
pub(crate) struct P2p {
cmd_tx: mpsc::Sender<P2pCmd>,
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
Expand Down Expand Up @@ -255,7 +255,7 @@ impl P2p {
}

/// Creates and starts a new mocked p2p handler.
#[cfg(any(test, feature = "test-utils"))]
#[cfg(test)]
pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (header_sub_tx, header_sub_rx) = watch::channel(None);
Expand All @@ -278,12 +278,6 @@ impl P2p {
(p2p, handle)
}

/// Stop the [`P2p`].
pub async fn stop(&self) -> Result<()> {
// TODO
Ok(())
}

/// Local peer ID on the p2p network.
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
Expand Down
2 changes: 1 addition & 1 deletion node/src/p2p/header_ex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) struct HeaderExConfig<'a, S> {
pub header_store: Arc<S>,
}

/// Representation of all the errors that can occur when interacting with the header-ex.
/// Representation of all the errors that can occur in `HeaderEx` component.
#[derive(Debug, thiserror::Error)]
pub enum HeaderExError {
/// Header not found.
Expand Down
Loading
Loading