Skip to content

Commit

Permalink
feat: Add event notifications (#410)
Browse files Browse the repository at this point in the history
# Description

This pull request implements the following changes:

- [x] Add event notifications
- [x] Add `emit_event` notification utility function
- [x] Add `emit_receipt` notification utility function
- [x] Move receipt notifications to event handler notifications
- [x] Add `ConnnectionEstablished`, `ConnnectionClosed`, `ListeningOn`,
`OutgoingConnectionError`, and `IncomingConnectionError` network
notifications
- [x] Test JSON event notification bytes roundtrip
- [x] Test JSON event notification string roundtrip
- [x] Integration test connection notifications with two Homestar nodes

## Link to issue

Closes #407

## Type of change

- [x] New feature (non-breaking change that adds functionality)

## Test plan (required)

We have included unit tests to check roundtrip conversions between JSON
bytes and strings. In addition, we have included an integration test
that subscribes and listens for connection messages between Homestar
nodes.

---------

Co-authored-by: Zeeshan Lakhani <zeeshan.lakhani@gmail.com>
  • Loading branch information
bgins and zeeshanlakhani authored Nov 7, 2023
1 parent 8618f1f commit ce815d7
Show file tree
Hide file tree
Showing 14 changed files with 599 additions and 121 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ libp2p = { version = "0.52", default-features = false, features = [
libsqlite3-sys = { version = "0.26", default-features = false, features = [
"bundled",
] }
maplit = "1.0"
metrics = { version = "0.21", default-features = false }
metrics-exporter-prometheus = { version = "0.12.1", default-features = false, features = [
"http-listener",
Expand Down Expand Up @@ -194,7 +195,6 @@ homestar_runtime_proc_macro = { path = "src/test_utils/proc_macro", package = "h
jsonrpsee = { version = "0.20", default-features = false, features = [
"client",
] }
maplit = "1.0"
nix = { version = "0.27", features = ["signal"] }
predicates = { version = "3.0", default-features = false }
prometheus-parse = "0.2.4"
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod cache;
pub mod channel;
pub(crate) mod error;
pub(crate) mod event;
pub(crate) mod notification;
pub(crate) mod swarm_event;
pub(crate) use cache::{setup_cache, CacheValue};
pub(crate) use error::RequestResponseError;
Expand Down
70 changes: 14 additions & 56 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use super::EventHandler;
#[cfg(feature = "websocket-notify")]
use crate::network::webserver::notifier::{Header, Message, NotifyReceipt, SubscriptionTyp};
use crate::event_handler::notification::emit_receipt;
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
Expand All @@ -16,15 +16,9 @@ use crate::{
};
use anyhow::Result;
use async_trait::async_trait;
use homestar_core::workflow::Receipt as InvocationReceipt;
#[cfg(feature = "websocket-notify")]
use homestar_core::{
ipld::DagJson,
workflow::{
receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY},
Pointer,
},
};
use homestar_core::workflow::Pointer;
use homestar_core::workflow::Receipt as InvocationReceipt;
use libipld::{Cid, Ipld};
use libp2p::{
kad::{record::Key, Quorum, Record},
Expand Down Expand Up @@ -251,23 +245,11 @@ impl Captured {

#[cfg(feature = "websocket-notify")]
{
let invocation_notification = invocation_receipt.clone();
let ws_tx = event_handler.ws_workflow_sender();
let metadata = self.metadata.to_owned();
let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata);
if let Ok(json) = receipt.to_json() {
info!(
cid = receipt_cid.to_string(),
"Sending receipt to websocket"
);
let _ = ws_tx.notify(Message::new(
Header::new(
SubscriptionTyp::Cid(self.workflow.cid),
self.workflow.name.clone(),
),
json,
));
}
emit_receipt(
event_handler.ws_workflow_sender(),
receipt.clone(),
self.metadata.to_owned(),
)
}

if event_handler.pubsub_enabled {
Expand Down Expand Up @@ -367,37 +349,13 @@ impl Replay {
self.pointers.iter().collect::<Vec<_>>()
);

#[cfg(feature = "websocket-notify")]
receipts.into_iter().for_each(|receipt| {
let invocation_receipt = InvocationReceipt::from(&receipt);
let invocation_notification = invocation_receipt;
let receipt_cid = receipt.cid();

let ws_tx = event_handler.ws_workflow_sender();
let metadata = self.metadata.to_owned();
let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata);
if let Ok(json) = receipt.to_json() {
info!(
cid = receipt_cid.to_string(),
"Sending receipt to websocket"
);

if let Some(ipld) = &self.metadata {
match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) {
(Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => {
let header = Header::new(
SubscriptionTyp::Cid(*cid),
Some((name.to_string()).into()),
);
let _ = ws_tx.notify(Message::new(header, json));
}
(Ok(Ipld::Link(cid)), Err(_err)) => {
let header = Header::new(SubscriptionTyp::Cid(*cid), None);
let _ = ws_tx.notify(Message::new(header, json));
}
_ => (),
}
}
}
emit_receipt(
event_handler.ws_workflow_sender(),
receipt,
self.metadata.to_owned(),
);
});

Ok(())
Expand Down
Loading

0 comments on commit ce815d7

Please sign in to comment.