Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: static/dynamic nodeinfo and recv deadlines #435

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub(crate) enum Event {
RegisterPeer(PeerId),
/// Discover peers from a rendezvous node.
DiscoverPeers(PeerId),
/// TODO
GetListeners(AsyncBoundedChannelSender<Vec<libp2p::core::Multiaddr>>),
}

const RENDEZVOUS_NAMESPACE: &str = "homestar";
Expand All @@ -134,6 +136,9 @@ impl Event {
event_handler.shutdown().await;
let _ = tx.send(());
}
Event::GetListeners(tx) => {
let _ = tx.send(event_handler.swarm.listeners().cloned().collect());
}
Event::FindRecord(record) => record.find(event_handler).await,
Event::RemoveRecord(record) => record.remove(event_handler).await,
Event::OutboundRequest(PeerRequest {
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
info!(
peer_id = propagation_source.to_string(),
message_id = message_id.to_string(),
"message received on receipts topic: {receipt}"
"message received on receipts topic: {}",
receipt.cid()
);

// Store gossiped receipt.
Expand Down
11 changes: 6 additions & 5 deletions homestar-runtime/src/network/webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
runner,
runner::{NodeInfo, WsSender},
runner::{DynamicNodeInfo, StaticNodeInfo, WsSender},
settings,
};
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -63,7 +63,7 @@ pub(crate) enum Message {
/// TODO
GetNodeInfo,
/// TODO
AckNodeInfo(NodeInfo),
AckNodeInfo((StaticNodeInfo, DynamicNodeInfo)),
}

/// WebSocket server fields.
Expand Down Expand Up @@ -266,6 +266,7 @@ mod test {
#[cfg(feature = "websocket-notify")]
use jsonrpsee::types::error::ErrorCode;
use jsonrpsee::{core::client::ClientT, rpc_params, ws_client::WsClientBuilder};
use libp2p::Multiaddr;
#[cfg(feature = "websocket-notify")]
use notifier::{self, Header};

Expand Down Expand Up @@ -307,17 +308,17 @@ mod test {
let peer_id =
libp2p::PeerId::from_str("12D3KooWRNw2pJC9748Fmq4WNV27HoSTcX3r37132FLkQMrbKAiC")
.unwrap();
let nodeinfo = NodeInfo::new(peer_id);
let static_info = StaticNodeInfo::new(peer_id);
assert_eq!(
ws_resp,
serde_json::json!({"healthy": true, "nodeInfo": nodeinfo})
serde_json::json!({"healthy": true, "nodeInfo": {"static": static_info, "dynamic": {"listeners": Vec::<Multiaddr>::new()}}})
);
let http_resp = reqwest::get(format!("{}/health", http_url)).await.unwrap();
assert_eq!(http_resp.status(), 200);
let http_resp = http_resp.json::<serde_json::Value>().await.unwrap();
assert_eq!(
http_resp,
serde_json::json!({"healthy": true, "nodeInfo": nodeinfo})
serde_json::json!({"healthy": true, "nodeInfo": {"static": static_info, "dynamic": {"listeners": Vec::<Multiaddr>::new()}}})
);
});

Expand Down
23 changes: 10 additions & 13 deletions homestar-runtime/src/network/webserver/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ use metrics_exporter_prometheus::PrometheusHandle;
#[cfg(feature = "websocket-notify")]
use std::sync::Arc;
use std::time::Duration;
#[allow(unused_imports)]
use tokio::sync::oneshot;
#[cfg(feature = "websocket-notify")]
use tokio::{runtime::Handle, select};
#[allow(unused_imports)]
use tokio::{
sync::oneshot,
time::{self, Instant},
};
#[cfg(feature = "websocket-notify")]
use tokio_stream::wrappers::BroadcastStream;
#[allow(unused_imports)]
Expand Down Expand Up @@ -147,10 +144,11 @@ impl JsonRpc {
.await
.map_err(|err| internal_err(err.to_string()))?;

if let Ok(Ok(Message::AckNodeInfo(info))) =
time::timeout_at(Instant::now() + ctx.receiver_timeout, rx.recv_async()).await
if let Ok(Message::AckNodeInfo((static_info, dyn_info))) =
rx.recv_deadline(std::time::Instant::now() + ctx.receiver_timeout)
{
Ok(serde_json::json!({ "healthy": true, "nodeInfo": info}))
Ok(serde_json::json!({ "healthy": true, "nodeInfo": {
"static": static_info, "dynamic": dyn_info}}))
} else {
warn!(sub = HEALTH_ENDPOINT, "did not acknowledge message in time");
Err(internal_err("failed to get node information".to_string()))
Expand All @@ -159,13 +157,13 @@ impl JsonRpc {

#[cfg(test)]
module.register_async_method(HEALTH_ENDPOINT, |_, _| async move {
use crate::runner::NodeInfo;
use crate::runner::{DynamicNodeInfo, StaticNodeInfo};
use std::str::FromStr;
let peer_id =
libp2p::PeerId::from_str("12D3KooWRNw2pJC9748Fmq4WNV27HoSTcX3r37132FLkQMrbKAiC")
.unwrap();
Ok::<serde_json::Value, ErrorObject<'_>>(serde_json::json!({
"healthy": true, "nodeInfo": NodeInfo::new(peer_id)
"healthy": true, "nodeInfo": {"static": StaticNodeInfo::new(peer_id), "dynamic": DynamicNodeInfo::new(vec![])},
}))
})?;

Expand Down Expand Up @@ -221,9 +219,8 @@ impl JsonRpc {
))
.await?;

if let Ok(Ok(Message::AckWorkflow((cid, name)))) =
time::timeout_at(Instant::now() + ctx.receiver_timeout, rx.recv_async())
.await
if let Ok(Message::AckWorkflow((cid, name))) =
rx.recv_deadline(std::time::Instant::now() + ctx.receiver_timeout)
{
let sink = pending.accept().await?;
ctx.workflow_listeners
Expand Down
17 changes: 12 additions & 5 deletions homestar-runtime/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use libipld::Cid;
use metrics_exporter_prometheus::PrometheusHandle;
#[cfg(not(test))]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{ops::ControlFlow, rc::Rc, sync::Arc, task::Poll};
use std::{ops::ControlFlow, rc::Rc, sync::Arc, task::Poll, time::Instant};
#[cfg(not(windows))]
use tokio::signal::unix::{signal, SignalKind};
#[cfg(windows)]
Expand All @@ -45,7 +45,7 @@ pub(crate) mod file;
mod nodeinfo;
pub(crate) mod response;
pub(crate) use error::Error;
pub(crate) use nodeinfo::NodeInfo;
pub(crate) use nodeinfo::{DynamicNodeInfo, StaticNodeInfo};

#[cfg(not(test))]
const HOMESTAR_THREAD: &str = "homestar-runtime";
Expand Down Expand Up @@ -104,7 +104,7 @@ impl ModifiedSet for RunningTaskSet {
pub struct Runner {
event_sender: Arc<AsyncBoundedChannelSender<Event>>,
expiration_queue: Rc<AtomicRefCell<DelayQueue<Cid>>>,
node_info: NodeInfo,
node_info: StaticNodeInfo,
running_tasks: Arc<RunningTaskSet>,
running_workers: RunningWorkerSet,
pub(crate) runtime: tokio::runtime::Runtime,
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Runner {
Ok(Self {
event_sender,
expiration_queue: Rc::new(AtomicRefCell::new(DelayQueue::new())),
node_info: NodeInfo::new(peer_id),
node_info: StaticNodeInfo::new(peer_id),
running_tasks: DashMap::new().into(),
running_workers: DashMap::new(),
runtime,
Expand Down Expand Up @@ -298,7 +298,14 @@ impl Runner {
}
(webserver::Message::GetNodeInfo, Some(oneshot_tx)) => {
info!("getting node info");
let _ = oneshot_tx.send(webserver::Message::AckNodeInfo(self.node_info.clone()));
let (tx, rx) = AsyncBoundedChannel::oneshot();
let _ = self.event_sender.send_async(Event::GetListeners(tx)).await;
let dyn_node_info = if let Ok(listeners) = rx.recv_deadline(Instant::now() + self.settings.node.network.webserver_timeout) {
DynamicNodeInfo::new(listeners)
} else {
DynamicNodeInfo::new(vec![])
};
let _ = oneshot_tx.send(webserver::Message::AckNodeInfo((self.node_info.clone(), dyn_node_info)));
}
_ => ()
}
Expand Down
19 changes: 16 additions & 3 deletions homestar-runtime/src/runner/nodeinfo.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use libp2p::PeerId;
use libp2p::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize};

/// TODO
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct NodeInfo {
pub(crate) struct StaticNodeInfo {
pub(crate) peer_id: PeerId,
}

impl NodeInfo {
impl StaticNodeInfo {
/// TODO
pub(crate) fn new(peer_id: PeerId) -> Self {
Self { peer_id }
Expand All @@ -19,3 +19,16 @@ impl NodeInfo {
&self.peer_id
}
}

/// TODO
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct DynamicNodeInfo {
pub(crate) listeners: Vec<Multiaddr>,
}

impl DynamicNodeInfo {
/// TODO
pub(crate) fn new(listeners: Vec<Multiaddr>) -> Self {
Self { listeners }
}
}
26 changes: 8 additions & 18 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ use homestar_wasm::{
};
use indexmap::IndexMap;
use libipld::{Cid, Ipld};
use std::{collections::BTreeMap, sync::Arc};
use tokio::{
sync::RwLock,
task::JoinSet,
time::{self, Instant},
};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::{sync::RwLock, task::JoinSet};
use tracing::{debug, error, info};

/// [JoinSet] of tasks run by a [Worker].
Expand Down Expand Up @@ -215,29 +211,23 @@ where
)))
.await;

let found = match time::timeout_at(
Instant::now() + workflow_settings.p2p_timeout,
rx.recv_async(),
)
.await
let found = match rx
.recv_deadline(Instant::now() + workflow_settings.p2p_timeout)
{
Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found))))) => found,
Ok(Ok(ResponseEvent::Found(Err(err)))) => {
Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found)))) => found,
Ok(ResponseEvent::Found(Err(err))) => {
bail!(ResolveError::UnresolvedCid(format!(
"failure in attempting to find event: {err}"
)))
}
Ok(Ok(ResponseEvent::NoPeersAvailable)) => {
Ok(ResponseEvent::NoPeersAvailable) => {
bail!(ResolveError::UnresolvedCid(
"no peers available to communicate with".to_string()
))
}
Ok(Ok(_)) => bail!(ResolveError::UnresolvedCid(
Ok(_) => bail!(ResolveError::UnresolvedCid(
"wrong or unexpected event message received".to_string(),
)),
Ok(Err(err)) => bail!(ResolveError::UnresolvedCid(format!(
"failure in attempting to find receipt: {err}"
))),
Err(err) => bail!(ResolveError::UnresolvedCid(format!(
"timeout deadline reached for invocation receipt @ {cid}: {err}",
))),
Expand Down
19 changes: 10 additions & 9 deletions homestar-runtime/src/workflow/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use faststr::FastStr;
use homestar_core::{ipld::DagJson, workflow::Pointer};
use libipld::{cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Cid, Ipld};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, fmt, sync::Arc, time::Duration};
use tokio::{
runtime::Handle,
time::{self, Instant},
use std::{
collections::BTreeMap,
fmt,
sync::Arc,
time::{Duration, Instant},
};
use tokio::runtime::Handle;
use tracing::info;

/// [Workflow] header tag, for sharing workflow information over libp2p.
Expand Down Expand Up @@ -356,8 +358,8 @@ impl Info {
)))
.await?;

match time::timeout_at(Instant::now() + p2p_timeout, rx.recv_async()).await {
Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(workflow_info))))) => {
match rx.recv_deadline(Instant::now() + p2p_timeout) {
Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(workflow_info)))) => {
// store workflow receipts from info, as we've already stored
// the static information.
if let Some(mut conn) = conn {
Expand All @@ -366,13 +368,12 @@ impl Info {

Ok(workflow_info)
}
Ok(Ok(ResponseEvent::Found(Err(err)))) => {
Ok(ResponseEvent::Found(Err(err))) => {
bail!("failure in attempting to find event: {err}")
}
Ok(Ok(event)) => {
Ok(event) => {
bail!("received unexpected event {event:?} for workflow {workflow_cid}")
}
Ok(Err(err)) => bail!("failure in attempting to find workflow: {err}"),
Err(err) => handle_timeout_fn
.map(|f| f(workflow_cid, conn).context(err))
.unwrap_or(Err(anyhow!(
Expand Down
Loading