Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): Generate syncing related events #312

Merged
merged 12 commits into from
Jul 1, 2024
7 changes: 2 additions & 5 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initialised store, present headers: {stored_ranges}");
}

let node = Node::new(NodeConfig {
let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
genesis_hash,
p2p_local_keypair,
Expand All @@ -80,14 +80,11 @@ pub(crate) async fn run(args: Params) -> Result<()> {
.await
.context("Failed to start node")?;

node.wait_connected_trusted().await?;
let mut events = node.event_subscriber();

// We have nothing else to do, but we want to keep main alive
while let Ok(ev) = events.recv().await {
match ev.event {
// Skip noisy events
NodeEvent::ShareSamplingResult { .. } => continue,
event if event.is_error() => warn!("{event}"),
event => info!("{event}"),
}
}
Expand Down
10 changes: 10 additions & 0 deletions cli/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@
color: var(--fg2);
}

.event-logs {
background-color: var(--bg1);
color: var(--fg1);
border: 1px solid var(--border);
font-family: var(--fonts-mono);
}

h2.status {
margin: 2rem 0 1rem 0;
color: var(--fg1);
Expand Down Expand Up @@ -96,6 +103,9 @@ <h3>Bootnodes</h3>
<button id="start" class="config"><b>Start!</b></button>
</div>

<h2 class="event_logs">Event Logs</h2>
<textarea readonly id="event-logs" class="event-logs" cols=120 rows=8></textarea>

<h2 class="status">Status</h2>

<div class="status">
Expand Down
24 changes: 24 additions & 0 deletions cli/static/run_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,35 @@ function bind_config(data) {
});
}

function log_event(event) {
// Skip noisy events
if (event.data.get("event").type == "share_sampling_result") {
return;
}

const time = new Date(event.data.get("time"));

const log = time.getHours().toString().padStart(2, '0')
+ ":" + time.getMinutes().toString().padStart(2, '0')
+ ":" + time.getSeconds().toString().padStart(2, '0')
+ "." + time.getMilliseconds().toString().padStart(3, '0')
+ ": " + event.data.get("message");

var textarea = document.getElementById("event-logs");
textarea.value += log + "\n";
textarea.scrollTop = textarea.scrollHeight;
}

async function main(document, window) {
await init();

window.node = await new NodeClient("/js/worker.js");

window.events = await window.node.events_channel();
window.events.onmessage = (event) => {
log_event(event);
};

bind_config(await fetch_config());

if (await window.node.is_running() === true) {
Expand Down
25 changes: 17 additions & 8 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct NodeWorker {
}

impl NodeWorker {
async fn new(config: WasmNodeConfig) -> Result<Self> {
async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
let config = config.into_node_config().await?;

if let Ok(store_height) = config.store.head_height().await {
Expand All @@ -64,18 +64,16 @@ impl NodeWorker {
info!("Initialised new empty store");
}

let node = Node::new(config).await?;
let (node, events_sub) = Node::new_subscribed(config).await?;

let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel = BroadcastChannel::new(&events_channel_name)
let events_channel = BroadcastChannel::new(events_channel_name)
.context("Failed to allocate BroadcastChannel")?;

let events_sub = node.event_subscriber();
spawn_local(event_forwarder_task(events_sub, events_channel));

Ok(Self {
node,
events_channel_name,
events_channel_name: events_channel_name.to_owned(),
})
}

Expand Down Expand Up @@ -238,9 +236,10 @@ impl NodeWorker {
}

#[wasm_bindgen]
pub async fn run_worker(queued_events: Vec<MessageEvent>) {
pub async fn run_worker(queued_events: Vec<MessageEvent>) -> Result<()> {
info!("Entered run_worker");
let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH);
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());

let mut message_server: Box<dyn MessageServer> = if SharedWorker::is_worker_type() {
Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events))
Expand All @@ -265,8 +264,14 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
NodeCommand::IsRunning => {
message_server.respond_to(client_id, WorkerResponse::IsRunning(false));
}
NodeCommand::GetEventsChannelName => {
message_server.respond_to(
client_id,
WorkerResponse::EventsChannelName(events_channel_name.clone()),
);
}
NodeCommand::StartNode(config) => {
match NodeWorker::new(config).await {
match NodeWorker::new(&events_channel_name, config).await {
Ok(node) => {
worker = Some(node);
message_server
Expand All @@ -293,19 +298,23 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
}

info!("Channel to WorkerMessageServer closed, exiting the SharedWorker");

Ok(())
}

async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) {
#[derive(Serialize)]
struct Event {
message: String,
is_error: bool,
#[serde(flatten)]
info: NodeEventInfo,
}

while let Ok(ev) = events_sub.recv().await {
let ev = Event {
message: ev.event.to_string(),
is_error: ev.event.is_error(),
info: ev,
};

Expand Down
146 changes: 143 additions & 3 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl EventChannel {
rx: self.tx.subscribe(),
}
}

/// Returns if there are any active subscribers or not.
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl Default for EventChannel {
Expand All @@ -94,6 +99,10 @@ impl EventPublisher {
file_line: location.line(),
});
}

pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl EventSubscriber {
Expand Down Expand Up @@ -219,6 +228,82 @@ pub enum NodeEvent {
/// A human readable error.
error: String,
},

/// A new header was added from HeaderSub.
AddedHeaderFromHeaderSub {
/// The height of the header.
height: u64,
},

/// Fetching header of network head just started.
FetchingHeadHeaderStarted,

/// Fetching header of network head just finished.
FetchingHeadHeaderFinished {
/// The height of the network head.
height: u64,
/// How much time fetching took.
took: Duration,
},

/// Fetching headers of a specific block range just started.
FetchingHeadersStarted {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
},

/// Fetching headers of a specific block range just finished.
FetchingHeadersFinished {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
/// How much time fetching took.
took: Duration,
},

/// Fetching headers of a specific block range just failed.
FetchingHeadersFailed {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
/// A human readable error.
error: String,
/// How much time fetching took.
took: Duration,
},

/// Network was compromised.
///
/// This happens when a valid bad encoding fraud proof is received.
/// Ideally it would never happen, but protection needs to exist.
/// In case of compromised network, syncing and data sampling will
/// stop immediately.
NetworkCompromised,
}

impl NodeEvent {
/// Returns `true` if the event indicates an error.
pub fn is_error(&self) -> bool {
match self {
NodeEvent::FatalDaserError { .. }
| NodeEvent::FetchingHeadersFailed { .. }
| NodeEvent::NetworkCompromised => true,
NodeEvent::PeerConnected { .. }
| NodeEvent::PeerDisconnected { .. }
| NodeEvent::SamplingStarted { .. }
| NodeEvent::ShareSamplingResult { .. }
| NodeEvent::SamplingFinished { .. }
| NodeEvent::AddedHeaderFromHeaderSub { .. }
| NodeEvent::FetchingHeadHeaderStarted
| NodeEvent::FetchingHeadHeaderFinished { .. }
| NodeEvent::FetchingHeadersStarted { .. }
| NodeEvent::FetchingHeadersFinished { .. } => false,
}
}
}

impl fmt::Display for NodeEvent {
Expand All @@ -243,7 +328,7 @@ impl fmt::Display for NodeEvent {
square_width,
shares,
} => {
write!(f, "Sampling for {height} block started. Square: {square_width}x{square_width}, Shares: {shares:?}.")
write!(f, "Sampling for {height} block started. Square: {square_width}x{square_width}, Shares: {shares:?}")
}
NodeEvent::ShareSamplingResult {
height,
Expand All @@ -255,7 +340,7 @@ impl fmt::Display for NodeEvent {
let acc = if *accepted { "accepted" } else { "rejected" };
write!(
f,
"Sampling for share [{row}, {column}] of {height} block was {acc}."
"Sampling for share [{row}, {column}] of {height} block was {acc}"
)
}
NodeEvent::SamplingFinished {
Expand All @@ -266,12 +351,67 @@ impl fmt::Display for NodeEvent {
let acc = if *accepted { "accepted" } else { "rejected" };
write!(
f,
"Sampling for {height} block finished and {acc}. Took {took:?}."
"Sampling for {height} block finished and {acc}. Took: {took:?}"
)
}
NodeEvent::FatalDaserError { error } => {
write!(f, "Daser stopped because of a fatal error: {error}")
}
NodeEvent::AddedHeaderFromHeaderSub { height } => {
write!(f, "Added header {height} from header-sub")
}
NodeEvent::FetchingHeadHeaderStarted => {
write!(f, "Fetching header of network head block started")
}
NodeEvent::FetchingHeadHeaderFinished { height, took } => {
write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}")
}
NodeEvent::FetchingHeadersStarted {
from_height,
to_height,
} => {
if from_height == to_height {
write!(f, "Fetching header of {from_height} block started")
} else {
write!(
f,
"Fetching headers of {from_height}-{to_height} blocks started"
)
}
}
NodeEvent::FetchingHeadersFinished {
from_height,
to_height,
took,
} => {
if from_height == to_height {
write!(
f,
"Fetching header of {from_height} block finished. Took: {took:?}"
)
} else {
write!(f, "Fetching headers of {from_height}-{to_height} blocks finished. Took: {took:?}")
}
}
NodeEvent::FetchingHeadersFailed {
from_height,
to_height,
error,
took,
} => {
if from_height == to_height {
write!(
f,
"Fetching header of {from_height} block failed. Took: {took:?}, Error: {error}"
)
} else {
write!(f, "Fetching headers of {from_height}-{to_height} blocks failed. Took: {took:?}, Error: {error}")
}
}
NodeEvent::NetworkCompromised => {
write!(f, "The network is compromised and should not be trusted. ")?;
write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
}
}
}
}
Expand Down
Loading
Loading