Skip to content

Commit

Permalink
feat: Add event notifications (#410)
Browse files Browse the repository at this point in the history
# Description

This pull request implements the following changes:

- [x] Add event notifications
- [x] Add `emit_event` notification utility function
- [x] Add `emit_receipt` notification utility function
- [x] Move receipt notifications to event handler notifications
- [x] Add `ConnnectionEstablished`, `ConnnectionClosed`, `ListeningOn`,
`OutgoingConnectionError`, and `IncomingConnectionError` network
notifications
- [x] Test JSON event notification bytes roundtrip
- [x] Test JSON event notification string roundtrip
- [x] Integration test connection notifications with two Homestar nodes

## Link to issue

Closes #407

## Type of change

- [x] New feature (non-breaking change that adds functionality)

## Test plan (required)

We have included unit tests to check roundtrip conversions between JSON
bytes and strings. In addition, we have included an integration test
that subscribes and listens for connection messages between Homestar
nodes.

---------

Co-authored-by: Zeeshan Lakhani <zeeshan.lakhani@gmail.com>
  • Loading branch information
2 people authored and hugomrdias committed Nov 14, 2023
1 parent 8679f19 commit b1a0bbf
Show file tree
Hide file tree
Showing 13 changed files with 591 additions and 113 deletions.
2 changes: 1 addition & 1 deletion homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ libp2p = { version = "0.52", default-features = false, features = [
libsqlite3-sys = { version = "0.26", default-features = false, features = [
"bundled",
] }
maplit = "1.0"
metrics = { version = "0.21", default-features = false }
metrics-exporter-prometheus = { version = "0.12.1", default-features = false, features = [
"http-listener",
Expand Down Expand Up @@ -194,7 +195,6 @@ homestar_runtime_proc_macro = { path = "src/test_utils/proc_macro", package = "h
jsonrpsee = { version = "0.20", default-features = false, features = [
"client",
] }
maplit = "1.0"
nix = { version = "0.27", features = ["signal"] }
predicates = { version = "3.0", default-features = false }
prometheus-parse = "0.2.4"
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod cache;
pub mod channel;
pub(crate) mod error;
pub(crate) mod event;
pub(crate) mod notification;
pub(crate) mod swarm_event;
pub(crate) use cache::{setup_cache, CacheValue};
pub(crate) use error::RequestResponseError;
Expand Down
70 changes: 14 additions & 56 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use super::EventHandler;
#[cfg(feature = "websocket-notify")]
use crate::network::webserver::notifier::{Header, Message, NotifyReceipt, SubscriptionTyp};
use crate::event_handler::notification::emit_receipt;
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
Expand All @@ -16,15 +16,9 @@ use crate::{
};
use anyhow::Result;
use async_trait::async_trait;
use homestar_core::workflow::Receipt as InvocationReceipt;
#[cfg(feature = "websocket-notify")]
use homestar_core::{
ipld::DagJson,
workflow::{
receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY},
Pointer,
},
};
use homestar_core::workflow::Pointer;
use homestar_core::workflow::Receipt as InvocationReceipt;
use libipld::{Cid, Ipld};
use libp2p::{
kad::{record::Key, Quorum, Record},
Expand Down Expand Up @@ -251,23 +245,11 @@ impl Captured {

#[cfg(feature = "websocket-notify")]
{
let invocation_notification = invocation_receipt.clone();
let ws_tx = event_handler.ws_workflow_sender();
let metadata = self.metadata.to_owned();
let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata);
if let Ok(json) = receipt.to_json() {
info!(
cid = receipt_cid.to_string(),
"Sending receipt to websocket"
);
let _ = ws_tx.notify(Message::new(
Header::new(
SubscriptionTyp::Cid(self.workflow.cid),
self.workflow.name.clone(),
),
json,
));
}
emit_receipt(
event_handler.ws_workflow_sender(),
receipt.clone(),
self.metadata.to_owned(),
)
}

if event_handler.pubsub_enabled {
Expand Down Expand Up @@ -367,37 +349,13 @@ impl Replay {
self.pointers.iter().collect::<Vec<_>>()
);

#[cfg(feature = "websocket-notify")]
receipts.into_iter().for_each(|receipt| {
let invocation_receipt = InvocationReceipt::from(&receipt);
let invocation_notification = invocation_receipt;
let receipt_cid = receipt.cid();

let ws_tx = event_handler.ws_workflow_sender();
let metadata = self.metadata.to_owned();
let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata);
if let Ok(json) = receipt.to_json() {
info!(
cid = receipt_cid.to_string(),
"Sending receipt to websocket"
);

if let Some(ipld) = &self.metadata {
match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) {
(Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => {
let header = Header::new(
SubscriptionTyp::Cid(*cid),
Some((name.to_string()).into()),
);
let _ = ws_tx.notify(Message::new(header, json));
}
(Ok(Ipld::Link(cid)), Err(_err)) => {
let header = Header::new(SubscriptionTyp::Cid(*cid), None);
let _ = ws_tx.notify(Message::new(header, json));
}
_ => (),
}
}
}
emit_receipt(
event_handler.ws_workflow_sender(),
receipt,
self.metadata.to_owned(),
);
});

Ok(())
Expand Down
242 changes: 242 additions & 0 deletions homestar-runtime/src/event_handler/notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use crate::{
network::webserver::{
notifier::{self, Header, Message, Notifier, SubscriptionTyp},
SUBSCRIBE_NETWORK_EVENTS_ENDPOINT,
},
Receipt,
};
use anyhow::anyhow;
use chrono::prelude::Utc;
use homestar_core::{
ipld::DagJson,
workflow::{
receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY},
Receipt as InvocationReceipt,
},
};
use libipld::{serde::from_ipld, Ipld};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, str::FromStr};
use tracing::{info, warn};

pub(crate) mod receipt;
pub(crate) mod swarm;
pub(crate) use receipt::ReceiptNotification;
pub(crate) use swarm::SwarmNotification;

const TYPE_KEY: &str = "type";
const DATA_KEY: &str = "data";
const TIMESTAMP_KEY: &str = "timestamp";

/// Send receipt notification as bytes.
pub(crate) fn emit_receipt(
notifier: Notifier<notifier::Message>,
receipt: Receipt,
metadata: Option<Ipld>,
) {
let invocation_receipt = InvocationReceipt::from(&receipt);
let receipt_cid = receipt.cid();
let notification = ReceiptNotification::with(invocation_receipt, receipt_cid, metadata.clone());

if let Ok(json) = notification.to_json() {
info!(
cid = receipt_cid.to_string(),
"Sending receipt to websocket"
);
if let Some(ipld) = metadata {
match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) {
(Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => {
let header =
Header::new(SubscriptionTyp::Cid(*cid), Some((name.to_string()).into()));
let _ = notifier.notify(Message::new(header, json));
}
(Ok(Ipld::Link(cid)), Err(_err)) => {
let header = Header::new(SubscriptionTyp::Cid(*cid), None);
let _ = notifier.notify(Message::new(header, json));
}
_ => (),
}
}
} else {
warn!("Unable to serialize receipt as bytes: {receipt:?}");
}
}

/// Send event notification as bytes.
pub(crate) fn emit_event(
notifier: Notifier<notifier::Message>,
ty: EventNotificationTyp,
data: BTreeMap<&str, String>,
) {
let header = Header::new(
SubscriptionTyp::EventSub(SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string()),
None,
);
let notification = EventNotification::new(ty, data);

if let Ok(json) = notification.to_json() {
let _ = notifier.notify(Message::new(header, json));
} else {
warn!("Unable to serialize notification as bytes: {notification:?}");
}
}

/// Notification sent to clients.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct EventNotification {
typ: EventNotificationTyp,
data: Ipld,
timestamp: i64,
}

impl EventNotification {
pub(crate) fn new(typ: EventNotificationTyp, data: BTreeMap<&str, String>) -> Self {
let ipld_data = data
.iter()
.map(|(key, val)| (key.to_string(), Ipld::String(val.to_owned())))
.collect();

Self {
typ,
data: Ipld::Map(ipld_data),
timestamp: Utc::now().timestamp_millis(),
}
}
}

impl DagJson for EventNotification where Ipld: From<EventNotification> {}

impl From<EventNotification> for Ipld {
fn from(notification: EventNotification) -> Self {
Ipld::Map(BTreeMap::from([
("type".into(), notification.typ.into()),
("data".into(), notification.data),
("timestamp".into(), notification.timestamp.into()),
]))
}
}

impl TryFrom<Ipld> for EventNotification {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let typ: EventNotificationTyp = map
.get(TYPE_KEY)
.ok_or_else(|| anyhow!("missing {TYPE_KEY}"))?
.to_owned()
.try_into()?;

let data = map
.get(DATA_KEY)
.ok_or_else(|| anyhow!("missing {DATA_KEY}"))?
.to_owned();

let timestamp = from_ipld(
map.get(TIMESTAMP_KEY)
.ok_or_else(|| anyhow!("missing {TIMESTAMP_KEY}"))?
.to_owned(),
)?;

Ok(EventNotification {
typ,
data,
timestamp,
})
}
}

/// Types of notification sent to clients.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) enum EventNotificationTyp {
SwarmNotification(SwarmNotification),
}

impl DagJson for EventNotificationTyp where Ipld: From<EventNotificationTyp> {}

impl From<EventNotificationTyp> for Ipld {
fn from(typ: EventNotificationTyp) -> Self {
match typ {
EventNotificationTyp::SwarmNotification(subtype) => {
Ipld::String(format!("network:{}", subtype))
}
}
}
}

impl TryFrom<Ipld> for EventNotificationTyp {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
if let Some((ty, subtype)) = from_ipld::<String>(ipld)?.split_once(':') {
match ty {
"network" => Ok(EventNotificationTyp::SwarmNotification(
SwarmNotification::from_str(subtype)?,
)),
_ => Err(anyhow!("Missing event notification type: {}", ty)),
}
} else {
Err(anyhow!(
"Event notification type missing colon delimiter between type and subtype."
))
}
}
}

#[cfg(test)]
mod test {
use super::*;
use libp2p::PeerId;
use maplit::btreemap;

#[test]
fn notification_bytes_rountrip() {
let peer_id = PeerId::random().to_string();
let address: String = "/ip4/127.0.0.1/tcp/7000".to_string();

let notification = EventNotification::new(
EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished),
btreemap! {
"peer_id" => peer_id.clone(),
"address" => address.clone()
},
);
let bytes = notification.to_json().unwrap();

let parsed = EventNotification::from_json(bytes.as_ref()).unwrap();
let data: BTreeMap<String, String> = from_ipld(parsed.data).unwrap();

assert_eq!(
parsed.typ,
EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished)
);
assert_eq!(data.get("peer_id").unwrap(), &peer_id);
assert_eq!(data.get("address").unwrap(), &address);
}

#[test]
fn notification_json_string_rountrip() {
let peer_id = PeerId::random().to_string();
let address: String = "/ip4/127.0.0.1/tcp/7000".to_string();

let notification = EventNotification::new(
EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished),
btreemap! {
"peer_id" => peer_id.clone(),
"address" => address.clone()
},
);
let json_string = notification.to_json_string().unwrap();

let parsed = EventNotification::from_json_string(json_string).unwrap();
let data: BTreeMap<String, String> = from_ipld(parsed.data).unwrap();

assert_eq!(
parsed.typ,
EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished)
);
assert_eq!(data.get("peer_id").unwrap(), &peer_id);
assert_eq!(data.get("address").unwrap(), &address);
}
}
Loading

0 comments on commit b1a0bbf

Please sign in to comment.