Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): Generate syncing related events #312

Merged
merged 12 commits into from
Jul 1, 2024
9 changes: 4 additions & 5 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initialised store, present headers: {stored_ranges}");
}

let node = Node::new(NodeConfig {
let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
genesis_hash,
p2p_local_keypair,
Expand All @@ -80,14 +80,13 @@ pub(crate) async fn run(args: Params) -> Result<()> {
.await
.context("Failed to start node")?;

node.wait_connected_trusted().await?;
let mut events = node.event_subscriber();

// We have nothing else to do, but we want to keep main alive
while let Ok(ev) = events.recv().await {
match ev.event {
// Skip noisy events
NodeEvent::ShareSamplingResult { .. } => continue,
event @ (NodeEvent::FatalDaserError { .. } | NodeEvent::NetworkCompromised) => {
warn!("{event}");
}
event => info!("{event}"),
}
}
Expand Down
23 changes: 15 additions & 8 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct NodeWorker {
}

impl NodeWorker {
async fn new(config: WasmNodeConfig) -> Result<Self> {
async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
let config = config.into_node_config().await?;

if let Ok(store_height) = config.store.head_height().await {
Expand All @@ -64,18 +64,16 @@ impl NodeWorker {
info!("Initialised new empty store");
}

let node = Node::new(config).await?;
let (node, events_sub) = Node::new_subscribed(config).await?;

let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel = BroadcastChannel::new(&events_channel_name)
let events_channel = BroadcastChannel::new(events_channel_name)
.context("Failed to allocate BroadcastChannel")?;

let events_sub = node.event_subscriber();
spawn_local(event_forwarder_task(events_sub, events_channel));

Ok(Self {
node,
events_channel_name,
events_channel_name: events_channel_name.to_owned(),
})
}

Expand Down Expand Up @@ -238,9 +236,10 @@ impl NodeWorker {
}

#[wasm_bindgen]
pub async fn run_worker(queued_events: Vec<MessageEvent>) {
pub async fn run_worker(queued_events: Vec<MessageEvent>) -> Result<()> {
info!("Entered run_worker");
let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH);
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());

let mut message_server: Box<dyn MessageServer> = if SharedWorker::is_worker_type() {
Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events))
Expand All @@ -265,8 +264,14 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
NodeCommand::IsRunning => {
message_server.respond_to(client_id, WorkerResponse::IsRunning(false));
}
NodeCommand::GetEventsChannelName => {
message_server.respond_to(
client_id,
WorkerResponse::EventsChannelName(events_channel_name.clone()),
);
}
NodeCommand::StartNode(config) => {
match NodeWorker::new(config).await {
match NodeWorker::new(&events_channel_name, config).await {
Ok(node) => {
worker = Some(node);
message_server
Expand All @@ -293,6 +298,8 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
}

info!("Channel to WorkerMessageServer closed, exiting the SharedWorker");

Ok(())
}

async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) {
Expand Down
63 changes: 63 additions & 0 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl EventChannel {
rx: self.tx.subscribe(),
}
}

/// Returns if there are any active subscribers or not.
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl Default for EventChannel {
Expand All @@ -94,6 +99,10 @@ impl EventPublisher {
file_line: location.line(),
});
}

pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl EventSubscriber {
Expand Down Expand Up @@ -219,6 +228,37 @@ pub enum NodeEvent {
/// A human readable error.
error: String,
},

/// A new head that was observed by the node.
NewSubjectiveHead {
/// The height of the head.
height: u64,
},

/// A new header was added from HeaderSub.
AddedHeaderFromHeaderSub {
/// The height of the header.
height: u64,
},

/// Fetching header of network head in progress.
FetchingHeadHeader,

/// Fetching headers for a specific block range.
FetchingHeaders {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
},

/// Network was compromised.
///
/// This happens when a valid bad encoding fraud proof is received.
/// Ideally it would never happen, but protection needs to exist.
/// In case of compromised network, syncing and data sampling will
/// stop immediately.
NetworkCompromised,
}

impl fmt::Display for NodeEvent {
Expand Down Expand Up @@ -272,6 +312,29 @@ impl fmt::Display for NodeEvent {
NodeEvent::FatalDaserError { error } => {
write!(f, "Daser stopped because of a fatal error: {error}")
}
NodeEvent::NewSubjectiveHead { height } => {
write!(f, "New subjective head: {height}")
}
NodeEvent::AddedHeaderFromHeaderSub { height } => {
write!(f, "Added header {height} from HeaderSub")
}
NodeEvent::FetchingHeadHeader => {
write!(f, "Fetching header of network head block")
}
NodeEvent::FetchingHeaders {
from_height,
to_height,
} => {
if from_height == to_height {
write!(f, "Fetching header of {from_height} block")
} else {
write!(f, "Fetching headers of {from_height}-{to_height} blocks")
}
}
NodeEvent::NetworkCompromised => {
write!(f, "The network is compromised and should not be trusted. ")?;
write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
}
}
}
}
Expand Down
36 changes: 30 additions & 6 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs, DaserError};
use crate::events::{EventChannel, EventSubscriber};
use crate::events::{EventChannel, EventSubscriber, NodeEvent};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
Expand Down Expand Up @@ -92,10 +92,23 @@ where
{
/// Creates and starts a new celestia node with a given config.
pub async fn new<B>(config: NodeConfig<B, S>) -> Result<Self>
where
B: Blockstore + 'static,
{
let (node, _) = Node::new_subscribed(config).await?;
Ok(node)
}

/// Creates and starts a new celestia node with a given config.
///
/// Returns `Node` alogn with `EventSubscriber`. Use this to avoid missing any
/// events that will be generated on the construction of the node.
pub async fn new_subscribed<B>(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)>
where
B: Blockstore + 'static,
{
let event_channel = EventChannel::new();
let event_sub = event_channel.subscribe();
let store = Arc::new(config.store);

let p2p = Arc::new(P2p::start(P2pArgs {
Expand All @@ -111,6 +124,7 @@ where
let syncer = Arc::new(Syncer::start(SyncerArgs {
store: store.clone(),
p2p: p2p.clone(),
event_pub: event_channel.publisher(),
})?);

let daser = Arc::new(Daser::start(DaserArgs {
Expand All @@ -122,32 +136,42 @@ where
// spawn the task that will stop the services when the fraud is detected
let network_compromised_token = p2p.get_network_compromised_token().await?;
let tasks_cancellation_token = CancellationToken::new();

spawn({
let syncer = syncer.clone();
let daser = daser.clone();
let tasks_cancellation_token = tasks_cancellation_token.child_token();
let event_pub = event_channel.publisher();

async move {
select! {
_ = tasks_cancellation_token.cancelled() => (),
_ = network_compromised_token.cancelled() => {
warn!("The network is compromised and should not be trusted.");
warn!("The node will stop synchronizing and sampling.");
warn!("You can still make some queries to the network.");
syncer.stop();
daser.stop();

if event_pub.has_subscribers() {
event_pub.send(NodeEvent::NetworkCompromised);
} else {
// This is a very important message and we want to log it if user
// does not consume our events.
warn!("{}", NodeEvent::NetworkCompromised);
}
}
}
}
});

Ok(Node {
let node = Node {
event_channel,
p2p,
store,
syncer,
_daser: daser,
tasks_cancellation_token,
})
};

Ok((node, event_sub))
}

/// Returns a new `EventSubscriber`.
Expand Down
Loading
Loading