From b33829a2ab507e26d8733851e679a584985b840a Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 14 Nov 2023 16:35:29 -0500 Subject: [PATCH] refactor: static/dynamic nodeinfo and recv deadlines --- homestar-runtime/src/event_handler/event.rs | 5 ++++ .../src/event_handler/swarm_event.rs | 3 ++- homestar-runtime/src/network/webserver.rs | 11 ++++---- homestar-runtime/src/network/webserver/rpc.rs | 23 +++++++--------- homestar-runtime/src/runner.rs | 17 ++++++++---- homestar-runtime/src/runner/nodeinfo.rs | 19 +++++++++++--- homestar-runtime/src/worker.rs | 26 ++++++------------- homestar-runtime/src/workflow/info.rs | 19 +++++++------- 8 files changed, 69 insertions(+), 54 deletions(-) diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index de2828f4..de46ddd1 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -115,6 +115,8 @@ pub(crate) enum Event { RegisterPeer(PeerId), /// Discover peers from a rendezvous node. DiscoverPeers(PeerId), + /// TODO + GetListeners(AsyncBoundedChannelSender>), } const RENDEZVOUS_NAMESPACE: &str = "homestar"; @@ -134,6 +136,9 @@ impl Event { event_handler.shutdown().await; let _ = tx.send(()); } + Event::GetListeners(tx) => { + let _ = tx.send(event_handler.swarm.listeners().cloned().collect()); + } Event::FindRecord(record) => record.find(event_handler).await, Event::RemoveRecord(record) => record.remove(event_handler).await, Event::OutboundRequest(PeerRequest { diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index bc861e5f..72fc13a8 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -409,7 +409,8 @@ async fn handle_swarm_event( info!( peer_id = propagation_source.to_string(), message_id = message_id.to_string(), - "message received on receipts topic: {receipt}" + "message received on receipts topic: {}", + receipt.cid() ); // Store gossiped receipt. diff --git a/homestar-runtime/src/network/webserver.rs b/homestar-runtime/src/network/webserver.rs index ab6aab4a..64bcea61 100644 --- a/homestar-runtime/src/network/webserver.rs +++ b/homestar-runtime/src/network/webserver.rs @@ -3,7 +3,7 @@ use crate::{ runner, - runner::{NodeInfo, WsSender}, + runner::{DynamicNodeInfo, StaticNodeInfo, WsSender}, settings, }; use anyhow::{anyhow, Result}; @@ -63,7 +63,7 @@ pub(crate) enum Message { /// TODO GetNodeInfo, /// TODO - AckNodeInfo(NodeInfo), + AckNodeInfo((StaticNodeInfo, DynamicNodeInfo)), } /// WebSocket server fields. @@ -266,6 +266,7 @@ mod test { #[cfg(feature = "websocket-notify")] use jsonrpsee::types::error::ErrorCode; use jsonrpsee::{core::client::ClientT, rpc_params, ws_client::WsClientBuilder}; + use libp2p::Multiaddr; #[cfg(feature = "websocket-notify")] use notifier::{self, Header}; @@ -307,17 +308,17 @@ mod test { let peer_id = libp2p::PeerId::from_str("12D3KooWRNw2pJC9748Fmq4WNV27HoSTcX3r37132FLkQMrbKAiC") .unwrap(); - let nodeinfo = NodeInfo::new(peer_id); + let static_info = StaticNodeInfo::new(peer_id); assert_eq!( ws_resp, - serde_json::json!({"healthy": true, "nodeInfo": nodeinfo}) + serde_json::json!({"healthy": true, "nodeInfo": {"static": static_info, "dynamic": {"listeners": Vec::::new()}}}) ); let http_resp = reqwest::get(format!("{}/health", http_url)).await.unwrap(); assert_eq!(http_resp.status(), 200); let http_resp = http_resp.json::().await.unwrap(); assert_eq!( http_resp, - serde_json::json!({"healthy": true, "nodeInfo": nodeinfo}) + serde_json::json!({"healthy": true, "nodeInfo": {"static": static_info, "dynamic": {"listeners": Vec::::new()}}}) ); }); diff --git a/homestar-runtime/src/network/webserver/rpc.rs b/homestar-runtime/src/network/webserver/rpc.rs index afc8c647..d3e93a7f 100644 --- a/homestar-runtime/src/network/webserver/rpc.rs +++ b/homestar-runtime/src/network/webserver/rpc.rs @@ -28,13 +28,10 @@ use metrics_exporter_prometheus::PrometheusHandle; #[cfg(feature = "websocket-notify")] use std::sync::Arc; use std::time::Duration; +#[allow(unused_imports)] +use tokio::sync::oneshot; #[cfg(feature = "websocket-notify")] use tokio::{runtime::Handle, select}; -#[allow(unused_imports)] -use tokio::{ - sync::oneshot, - time::{self, Instant}, -}; #[cfg(feature = "websocket-notify")] use tokio_stream::wrappers::BroadcastStream; #[allow(unused_imports)] @@ -147,10 +144,11 @@ impl JsonRpc { .await .map_err(|err| internal_err(err.to_string()))?; - if let Ok(Ok(Message::AckNodeInfo(info))) = - time::timeout_at(Instant::now() + ctx.receiver_timeout, rx.recv_async()).await + if let Ok(Message::AckNodeInfo((static_info, dyn_info))) = + rx.recv_deadline(std::time::Instant::now() + ctx.receiver_timeout) { - Ok(serde_json::json!({ "healthy": true, "nodeInfo": info})) + Ok(serde_json::json!({ "healthy": true, "nodeInfo": { + "static": static_info, "dynamic": dyn_info}})) } else { warn!(sub = HEALTH_ENDPOINT, "did not acknowledge message in time"); Err(internal_err("failed to get node information".to_string())) @@ -159,13 +157,13 @@ impl JsonRpc { #[cfg(test)] module.register_async_method(HEALTH_ENDPOINT, |_, _| async move { - use crate::runner::NodeInfo; + use crate::runner::{DynamicNodeInfo, StaticNodeInfo}; use std::str::FromStr; let peer_id = libp2p::PeerId::from_str("12D3KooWRNw2pJC9748Fmq4WNV27HoSTcX3r37132FLkQMrbKAiC") .unwrap(); Ok::>(serde_json::json!({ - "healthy": true, "nodeInfo": NodeInfo::new(peer_id) + "healthy": true, "nodeInfo": {"static": StaticNodeInfo::new(peer_id), "dynamic": DynamicNodeInfo::new(vec![])}, })) })?; @@ -221,9 +219,8 @@ impl JsonRpc { )) .await?; - if let Ok(Ok(Message::AckWorkflow((cid, name)))) = - time::timeout_at(Instant::now() + ctx.receiver_timeout, rx.recv_async()) - .await + if let Ok(Message::AckWorkflow((cid, name))) = + rx.recv_deadline(std::time::Instant::now() + ctx.receiver_timeout) { let sink = pending.accept().await?; ctx.workflow_listeners diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index 179bcea1..42bf19a9 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -27,7 +27,7 @@ use libipld::Cid; use metrics_exporter_prometheus::PrometheusHandle; #[cfg(not(test))] use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{ops::ControlFlow, rc::Rc, sync::Arc, task::Poll}; +use std::{ops::ControlFlow, rc::Rc, sync::Arc, task::Poll, time::Instant}; #[cfg(not(windows))] use tokio::signal::unix::{signal, SignalKind}; #[cfg(windows)] @@ -45,7 +45,7 @@ pub(crate) mod file; mod nodeinfo; pub(crate) mod response; pub(crate) use error::Error; -pub(crate) use nodeinfo::NodeInfo; +pub(crate) use nodeinfo::{DynamicNodeInfo, StaticNodeInfo}; #[cfg(not(test))] const HOMESTAR_THREAD: &str = "homestar-runtime"; @@ -104,7 +104,7 @@ impl ModifiedSet for RunningTaskSet { pub struct Runner { event_sender: Arc>, expiration_queue: Rc>>, - node_info: NodeInfo, + node_info: StaticNodeInfo, running_tasks: Arc, running_workers: RunningWorkerSet, pub(crate) runtime: tokio::runtime::Runtime, @@ -197,7 +197,7 @@ impl Runner { Ok(Self { event_sender, expiration_queue: Rc::new(AtomicRefCell::new(DelayQueue::new())), - node_info: NodeInfo::new(peer_id), + node_info: StaticNodeInfo::new(peer_id), running_tasks: DashMap::new().into(), running_workers: DashMap::new(), runtime, @@ -298,7 +298,14 @@ impl Runner { } (webserver::Message::GetNodeInfo, Some(oneshot_tx)) => { info!("getting node info"); - let _ = oneshot_tx.send(webserver::Message::AckNodeInfo(self.node_info.clone())); + let (tx, rx) = AsyncBoundedChannel::oneshot(); + let _ = self.event_sender.send_async(Event::GetListeners(tx)).await; + let dyn_node_info = if let Ok(listeners) = rx.recv_deadline(Instant::now() + self.settings.node.network.webserver_timeout) { + DynamicNodeInfo::new(listeners) + } else { + DynamicNodeInfo::new(vec![]) + }; + let _ = oneshot_tx.send(webserver::Message::AckNodeInfo((self.node_info.clone(), dyn_node_info))); } _ => () } diff --git a/homestar-runtime/src/runner/nodeinfo.rs b/homestar-runtime/src/runner/nodeinfo.rs index fd916151..58e9484a 100644 --- a/homestar-runtime/src/runner/nodeinfo.rs +++ b/homestar-runtime/src/runner/nodeinfo.rs @@ -1,13 +1,13 @@ -use libp2p::PeerId; +use libp2p::{Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; /// TODO #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct NodeInfo { +pub(crate) struct StaticNodeInfo { pub(crate) peer_id: PeerId, } -impl NodeInfo { +impl StaticNodeInfo { /// TODO pub(crate) fn new(peer_id: PeerId) -> Self { Self { peer_id } @@ -19,3 +19,16 @@ impl NodeInfo { &self.peer_id } } + +/// TODO +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct DynamicNodeInfo { + pub(crate) listeners: Vec, +} + +impl DynamicNodeInfo { + /// TODO + pub(crate) fn new(listeners: Vec) -> Self { + Self { listeners } + } +} diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 15a73e1a..c80a9b31 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -43,12 +43,8 @@ use homestar_wasm::{ }; use indexmap::IndexMap; use libipld::{Cid, Ipld}; -use std::{collections::BTreeMap, sync::Arc}; -use tokio::{ - sync::RwLock, - task::JoinSet, - time::{self, Instant}, -}; +use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use tokio::{sync::RwLock, task::JoinSet}; use tracing::{debug, error, info}; /// [JoinSet] of tasks run by a [Worker]. @@ -215,29 +211,23 @@ where ))) .await; - let found = match time::timeout_at( - Instant::now() + workflow_settings.p2p_timeout, - rx.recv_async(), - ) - .await + let found = match rx + .recv_deadline(Instant::now() + workflow_settings.p2p_timeout) { - Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found))))) => found, - Ok(Ok(ResponseEvent::Found(Err(err)))) => { + Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found)))) => found, + Ok(ResponseEvent::Found(Err(err))) => { bail!(ResolveError::UnresolvedCid(format!( "failure in attempting to find event: {err}" ))) } - Ok(Ok(ResponseEvent::NoPeersAvailable)) => { + Ok(ResponseEvent::NoPeersAvailable) => { bail!(ResolveError::UnresolvedCid( "no peers available to communicate with".to_string() )) } - Ok(Ok(_)) => bail!(ResolveError::UnresolvedCid( + Ok(_) => bail!(ResolveError::UnresolvedCid( "wrong or unexpected event message received".to_string(), )), - Ok(Err(err)) => bail!(ResolveError::UnresolvedCid(format!( - "failure in attempting to find receipt: {err}" - ))), Err(err) => bail!(ResolveError::UnresolvedCid(format!( "timeout deadline reached for invocation receipt @ {cid}: {err}", ))), diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 4222b9e0..19330e10 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -17,11 +17,13 @@ use faststr::FastStr; use homestar_core::{ipld::DagJson, workflow::Pointer}; use libipld::{cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Cid, Ipld}; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, fmt, sync::Arc, time::Duration}; -use tokio::{ - runtime::Handle, - time::{self, Instant}, +use std::{ + collections::BTreeMap, + fmt, + sync::Arc, + time::{Duration, Instant}, }; +use tokio::runtime::Handle; use tracing::info; /// [Workflow] header tag, for sharing workflow information over libp2p. @@ -356,8 +358,8 @@ impl Info { ))) .await?; - match time::timeout_at(Instant::now() + p2p_timeout, rx.recv_async()).await { - Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(workflow_info))))) => { + match rx.recv_deadline(Instant::now() + p2p_timeout) { + Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(workflow_info)))) => { // store workflow receipts from info, as we've already stored // the static information. if let Some(mut conn) = conn { @@ -366,13 +368,12 @@ impl Info { Ok(workflow_info) } - Ok(Ok(ResponseEvent::Found(Err(err)))) => { + Ok(ResponseEvent::Found(Err(err))) => { bail!("failure in attempting to find event: {err}") } - Ok(Ok(event)) => { + Ok(event) => { bail!("received unexpected event {event:?} for workflow {workflow_cid}") } - Ok(Err(err)) => bail!("failure in attempting to find workflow: {err}"), Err(err) => handle_timeout_fn .map(|f| f(workflow_cid, conn).context(err)) .unwrap_or(Err(anyhow!(