Skip to content

Commit

Permalink
feat: Add pubsub receipt sharing notifications (#418)
Browse files Browse the repository at this point in the history
# Description

This PR implements the following changes:

- [x] Add receipt published and received notifications
- [x] Update receipt sharing log messages
- [x] Add receipt sharing integration test
- [x] Add `remove_db` and `wait_for_socket_connection` test utilities
- [x] Update JSON `peer_id` key to `peerId`

## Link to issue

Implements #131 

## Type of change

- [x] New feature (non-breaking change that adds functionality)
- [x] Refactor (non-breaking change that updates existing functionality)

## Test plan (required)

This PR includes an integration test that checks for gossiped receipts
in websocket notifications and logs.

---------

Co-authored-by: Zeeshan Lakhani <zeeshan.lakhani@gmail.com>
  • Loading branch information
bgins and zeeshanlakhani committed Nov 29, 2023
1 parent 522b42a commit 476c47f
Show file tree
Hide file tree
Showing 35 changed files with 747 additions and 422 deletions.
4 changes: 3 additions & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ test-threads = "num-cpus"
threads-required = 1

[profile.ci]
retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true, max-delay = "30s" }
retries = { backoff = "exponential", count = 3, delay = "30s", jitter = true, max-delay = "300s" }
failure-output = "immediate-final"
leak-timeout = "800ms"
slow-timeout = { period = "120s", terminate-after = 2 }
fail-fast = false

[test-groups]
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/tests_and_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ jobs:
- name: Run Doc Tests
if: ${{ matrix.default-features == 'all' }}
run: cargo test --doc --workspace
continue-on-error: ${{ matrix.rust-toolchain == 'nightly' }}

run-docs:
needs: changes
Expand Down
2 changes: 2 additions & 0 deletions .ignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ LICENSE
.pre-commit-config.yaml

**/fixtures
*.ipfs*
*.db

## examples
examples/websocket-relay/relay-app
2 changes: 1 addition & 1 deletion homestar-core/src/test_utils/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::BTreeMap;
use url::Url;

const RAW: u64 = 0x55;
const WASM_CID: &str = "bafkreihxcyjgyrz437ewzi7md55uqt2zf6yr3zn7xrfi4orc34xdc5jgrm";
const WASM_CID: &str = "bafybeiczefaiu7464ehupezpzulnti5jvcwnvdalqrdliugnnwcdz6ljia";

type NonceBytes = Vec<u8>;

Expand Down
2 changes: 1 addition & 1 deletion homestar-core/src/workflow/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ mod test {
(
RESOURCE_KEY.into(),
Ipld::String(
"ipfs://bafkreihxcyjgyrz437ewzi7md55uqt2zf6yr3zn7xrfi4orc34xdc5jgrm".into()
"ipfs://bafybeiczefaiu7464ehupezpzulnti5jvcwnvdalqrdliugnnwcdz6ljia".into()
)
),
(OP_KEY.into(), Ipld::String("ipld/fun".to_string())),
Expand Down
12 changes: 2 additions & 10 deletions homestar-core/src/workflow/pointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ use diesel::{
AsExpression, FromSqlRow,
};
use enum_assoc::Assoc;
use libipld::{
cid::{multibase::Base, Cid},
serde::from_ipld,
Ipld, Link,
};
use libipld::{cid::Cid, serde::from_ipld, Ipld, Link};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::btree_map::BTreeMap, fmt, str::FromStr};

Expand Down Expand Up @@ -183,11 +179,7 @@ pub struct Pointer(Cid);

impl fmt::Display for Pointer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let cid_as_string = self
.0
.to_string_of_base(Base::Base32Lower)
.map_err(|_| fmt::Error)?;

let cid_as_string = self.0.to_string();
write!(f, "{cid_as_string}")
}
}
Expand Down
2 changes: 1 addition & 1 deletion homestar-core/src/workflow/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod test {
(
"rsc".into(),
Ipld::String(
"ipfs://bafkreihxcyjgyrz437ewzi7md55uqt2zf6yr3zn7xrfi4orc34xdc5jgrm".into(),
"ipfs://bafybeiczefaiu7464ehupezpzulnti5jvcwnvdalqrdliugnnwcdz6ljia".into(),
),
),
("op".into(), Ipld::String("ipld/fun".to_string())),
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bench = false
[[test]]
name = "integration"
path = "tests/main.rs"
required-features = ["test-utils"]

[dependencies]
# return to version.workspace = true after the following issue is fixed:
Expand Down
59 changes: 46 additions & 13 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

use super::EventHandler;
#[cfg(feature = "websocket-notify")]
use crate::event_handler::notification::emit_receipt;
use crate::event_handler::notification::{
self, emit_receipt, EventNotificationTyp, SwarmNotification,
};
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
db::Database,
event_handler::{Handler, P2PSender, ResponseEvent},
event_handler::{channel::AsyncBoundedChannelSender, Handler, P2PSender, ResponseEvent},
network::{
pubsub,
swarm::{CapsuleTag, RequestResponseKey, TopicMessage},
Expand All @@ -25,10 +27,11 @@ use libp2p::{
rendezvous::Namespace,
PeerId,
};
#[cfg(feature = "websocket-notify")]
use maplit::btreemap;
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};
#[cfg(feature = "ipfs")]
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tracing::{error, info, warn};

/// A [Receipt] captured (inner) event.
Expand Down Expand Up @@ -92,7 +95,7 @@ pub(crate) enum Event {
#[cfg(feature = "websocket-notify")]
ReplayReceipts(Replay),
/// General shutdown event.
Shutdown(oneshot::Sender<()>),
Shutdown(AsyncBoundedChannelSender<()>),
/// Find a [Record] in the DHT, e.g. a [Receipt].
///
/// [Record]: libp2p::kad::Record
Expand Down Expand Up @@ -255,13 +258,28 @@ impl Captured {
if event_handler.pubsub_enabled {
match event_handler.swarm.behaviour_mut().gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt),
TopicMessage::CapturedReceipt(receipt.clone()),
) {
Ok(msg_id) => info!(
cid = receipt_cid.to_string(),
"message {msg_id} published on {} topic for receipt",
pubsub::RECEIPTS_TOPIC
),
Ok(msg_id) => {
info!(
cid = receipt_cid.to_string(),
message_id = msg_id.to_string(),
"message published on {} topic for receipt with cid: {receipt_cid}",
pubsub::RECEIPTS_TOPIC
);

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::PublishedReceiptPubsub,
),
btreemap! {
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
}
Err(err) => {
warn!(
err=?err,
Expand Down Expand Up @@ -370,11 +388,26 @@ impl Replay {
.behaviour_mut()
.gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt),
TopicMessage::CapturedReceipt(receipt.clone()),
)
.map(|msg_id|
.map(|msg_id| {
info!(cid=receipt_cid,
"message {msg_id} published on {} topic for receipt", pubsub::RECEIPTS_TOPIC))
message_id = msg_id.to_string(),
"message published on {} topic for receipt with cid: {receipt_cid}",
pubsub::RECEIPTS_TOPIC);

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::PublishedReceiptPubsub,
),
btreemap! {
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
})
.map_err(
|err|
warn!(err=?err, cid=receipt_cid,
Expand Down
10 changes: 10 additions & 0 deletions homestar-runtime/src/event_handler/notification/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) enum SwarmNotification {
ListeningOn,
OutgoingConnectionError,
IncomingConnectionError,
PublishedReceiptPubsub,
ReceivedReceiptPubsub,
}

impl fmt::Display for SwarmNotification {
Expand All @@ -24,6 +26,12 @@ impl fmt::Display for SwarmNotification {
SwarmNotification::IncomingConnectionError => {
write!(f, "incomingConnectionError")
}
SwarmNotification::ReceivedReceiptPubsub => {
write!(f, "receivedReceiptPubsub")
}
SwarmNotification::PublishedReceiptPubsub => {
write!(f, "publishedReceiptPubsub")
}
}
}
}
Expand All @@ -38,6 +46,8 @@ impl FromStr for SwarmNotification {
"listeningOn" => Ok(Self::ListeningOn),
"outgoingConnectionError" => Ok(Self::OutgoingConnectionError),
"incomingConnectionError" => Ok(Self::IncomingConnectionError),
"receivedReceiptPubsub" => Ok(Self::ReceivedReceiptPubsub),
"publishedReceiptPubsub" => Ok(Self::PublishedReceiptPubsub),
_ => Err(anyhow!("Missing swarm notification type: {}", ty)),
}
}
Expand Down
31 changes: 24 additions & 7 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,16 +406,33 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
} => match Receipt::try_from(message.data) {
// TODO: dont fail blindly if we get a non receipt message
Ok(receipt) => {
info!("got message: {receipt} from {propagation_source} with message id: {message_id}");
info!(
peer_id = propagation_source.to_string(),
message_id = message_id.to_string(),
"message received on receipts topic: {receipt}"
);

// Store gossiped receipt.
let _ = event_handler
.db
.conn()
.as_mut()
.map(|conn| Db::store_receipt(receipt, conn));
.map(|conn| Db::store_receipt(receipt.clone(), conn));

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::ReceivedReceiptPubsub,
),
btreemap! {
"peerId" => propagation_source.to_string(),
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
},
);
}
Err(err) => info!(err=?err, "cannot handle incoming event message"),
Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"),
},
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!(
Expand Down Expand Up @@ -728,7 +745,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(SwarmNotification::ListeningOn),
btreemap! {
"peer_id" => local_peer.to_string(),
"peerId" => local_peer.to_string(),
"address" => address.to_string()
},
);
Expand All @@ -749,7 +766,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished),
btreemap! {
"peer_id" => peer_id.to_string(),
"peerId" => peer_id.to_string(),
"address" => endpoint.get_remote_address().to_string()
},
);
Expand Down Expand Up @@ -794,7 +811,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionClosed),
btreemap! {
"peer_id" => peer_id.to_string(),
"peerId" => peer_id.to_string(),
"address" => endpoint.get_remote_address().to_string()
},
);
Expand All @@ -816,7 +833,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(SwarmNotification::OutgoingConnectionError),
btreemap! {
"peer_id" => peer_id.map_or("Unknown peer".into(), |p| p.to_string()),
"peerId" => peer_id.map_or("Unknown peer".into(), |p| p.to_string()),
"error" => error.to_string()
},
);
Expand Down
3 changes: 1 addition & 2 deletions homestar-runtime/src/network/ipfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ impl IpfsCli {
let DagPutResponse { cid } = self
.0
.dag_put_with_options(Cursor::new(receipt_bytes), dag_builder)
.await
.expect("a CID");
.await?;

Ok(cid.cid_string)
}
Expand Down
12 changes: 6 additions & 6 deletions homestar-runtime/src/network/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tarpc::{
context,
server::{self, incoming::Incoming, Channel},
};
use tokio::{runtime::Handle, select, sync::oneshot, time};
use tokio::{runtime::Handle, select, time};
use tokio_serde::formats::MessagePack;
use tracing::{info, warn};

Expand All @@ -34,7 +34,7 @@ pub(crate) enum ServerMessage {
/// Message sent by the [Runner] to start a graceful shutdown.
///
/// [Runner]: crate::Runner
GracefulShutdown(oneshot::Sender<()>),
GracefulShutdown(AsyncBoundedChannelSender<()>),
/// Message sent to start a [Workflow] run by reading a [Workflow] file.
///
/// [Workflow]: homestar_core::Workflow
Expand Down Expand Up @@ -119,15 +119,15 @@ impl Interface for ServerHandler {
name: Option<FastStr>,
workflow_file: ReadWorkflow,
) -> Result<Box<response::AckWorkflow>, Error> {
let (tx, rx) = oneshot::channel();
let (tx, rx) = AsyncBoundedChannel::oneshot();
self.runner_sender
.send((ServerMessage::Run((name, workflow_file)), Some(tx)))
.send_async((ServerMessage::Run((name, workflow_file)), Some(tx)))
.await
.map_err(|e| Error::FailureToSendOnChannel(e.to_string()))?;

let now = time::Instant::now();
select! {
Ok(msg) = rx => {
Ok(msg) = rx.recv_async() => {
match msg {
ServerMessage::RunAck(response) => {
Ok(response)
Expand All @@ -149,7 +149,7 @@ impl Interface for ServerHandler {
}
async fn stop(self, _: context::Context) -> Result<(), Error> {
self.runner_sender
.send((ServerMessage::ShutdownCmd, None))
.send_async((ServerMessage::ShutdownCmd, None))
.await
.map_err(|e| Error::FailureToSendOnChannel(e.to_string()))
}
Expand Down
Loading

0 comments on commit 476c47f

Please sign in to comment.