Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add DHT and RequestResponse notifications #461

Merged
merged 54 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
6bc234b
fix: Decode IndexedResources with try_from implementation
bgins Dec 1, 2023
65e2ce7
test: Add IPLD to IndexedResources roundtrip test
bgins Dec 1, 2023
e793b11
test: Add resources to IPLD to workflow info test
bgins Dec 1, 2023
f17e292
chore: Remove err from successful kad.get_record event
bgins Dec 1, 2023
1b7ce5f
test: Add DHT insufficient quorum test
bgins Dec 2, 2023
6e2fc33
refactor: Update event notification constructor to accept Ipld data
bgins Dec 2, 2023
7a6a1f3
chore: Add provider peer ID to request_response_senders
bgins Dec 4, 2023
b89eb85
feat: Add receipt and workflow info DHT notifications
bgins Dec 4, 2023
1563164
chore: Remove peer ID from request_response_senders
bgins Dec 4, 2023
f3985b4
feat: Add workflow info req_resp notifications
bgins Dec 4, 2023
d883bb7
chore: Feature flag RequestResponse message peer ID
bgins Dec 4, 2023
e7fdc8a
chore: Log found receipt and workflow info records separately
bgins Dec 5, 2023
eddaeae
test: Add DHT receipt and workflow info test
bgins Dec 5, 2023
96916c0
chore: Increase test_utils p2p_timeout
bgins Dec 5, 2023
6796f70
chore: Allow unused peer ID in RequestResponse event
bgins Dec 5, 2023
d884b28
feat: Add DHT quorum notifications
bgins Dec 7, 2023
90cd24c
chore: Add RequestResponse debug logs
bgins Dec 7, 2023
97f8f95
chore: Update get record debug logs
bgins Dec 7, 2023
b1a0dbc
test: Fix flaky gossipsub test
bgins Dec 7, 2023
8a66822
test: Remove unneeded p2p_provider_timeout settings
bgins Dec 8, 2023
3bc8edc
refactor: Replace p2p_timeout with settings
bgins Dec 8, 2023
aba909e
chore: Add DHT debug logs
bgins Dec 11, 2023
3501a28
feat: Add non-recursive workflow info provider lookup
bgins Dec 11, 2023
71b24ed
feat: Filter out self from workflow info providers
bgins Dec 11, 2023
7445d61
test: Check for GetProviders event in initialize_worker test
bgins Dec 11, 2023
5d46bdc
test: Add workflow info providers test
bgins Dec 13, 2023
d2d0232
test: Rename check_lines_for utility to check_for_line_with
bgins Dec 13, 2023
53aa88b
chore: Port shuffling
bgins Dec 13, 2023
087af0e
refactor: Break FoundEvent into DecodedRecord and FoundEvent
bgins Dec 15, 2023
422c5a3
chore: Move defaults to config directory
bgins Dec 15, 2023
b8f8270
feat!: Update ReceivedReceiptPubsub peerId field
bgins Dec 15, 2023
c128f17
chore: Add missing serial to test names
bgins Dec 15, 2023
a6d54d0
fix: Do not persist receipts on receiving workflow info
bgins Dec 16, 2023
143f4af
chore: Remove print line
bgins Dec 18, 2023
f9fd79c
test: Add WIP workflow info provider recursive test
bgins Dec 19, 2023
5b8074e
refactor: Use non-blocking timeout instead of blocking recv_deadline
bgins Jan 4, 2024
16cb428
refactor: Seperate DHT and provider workflow info lookups
bgins Jan 6, 2024
7e9bd70
chore: Bail with more specific error when decoding timed out record
bgins Jan 6, 2024
c1164fa
test: Increase threads allocated to each test
bgins Jan 6, 2024
5e90724
chore: Remove duplicate received workflow info notification
bgins Jan 6, 2024
d193fe9
chore: Update DHT behavior to only add nodes manually
bgins Jan 8, 2024
42d3ad3
chore: Add RequestResponse ResponseSent event handler
bgins Jan 8, 2024
42319c0
chore: Ignore recursive provider test and annotate it
bgins Jan 8, 2024
775c3ef
test: Handle get providers event
bgins Jan 8, 2024
20f3b56
chore: Allow unused response event providerse
bgins Jan 8, 2024
9d781a9
chore: Remove retrieve_from_provider from gather call
bgins Jan 11, 2024
850832c
test: Add timeout between workflow runs in provider test
bgins Jan 11, 2024
0371851
chore: Add workflow info and receipt record docs
bgins Jan 11, 2024
ce2f78f
chore: Set max-threads to one
bgins Jan 11, 2024
79c752f
chore: Implement workflow_info_source_label on event notification type
bgins Jan 11, 2024
656c742
chore: Use all of record instead of individual parts
bgins Jan 11, 2024
536fa73
chore: Rename gather to retrieve
bgins Jan 11, 2024
c07dbc8
chore: Add comments
bgins Jan 11, 2024
fce81d0
chore: Feature flag StoredRecord event with websocket-notify
bgins Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ slow-timeout = { period = "120s", terminate-after = 2 }
fail-fast = false

[test-groups]
serial = { max-threads = 1 }
serial = { max-threads = 4 }
zeeshanlakhani marked this conversation as resolved.
Show resolved Hide resolved

[[profile.default.overrides]]
filter = 'test(/_serial$/)'
test-group = 'serial'
threads-required = 4

[[profile.ci.overrides]]
filter = 'test(/_serial$/)'
test-group = 'serial'
threads-required = 4
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ mesh_n = 2
mesh_outbound_min = 1

[node.network.libp2p.dht]
p2p_provider_timeout = 30
p2p_receipt_timeout = 500
p2p_workflow_info_timeout = 500
p2p_provider_timeout = 10000
receipt_quorum = 2
workflow_quorum = 3

Expand Down
10 changes: 8 additions & 2 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ pub(crate) struct EventHandler<DB: Database> {
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
/// Timeout for p2p provider requests.
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
p2p_provider_timeout: Duration,
/// Accessible database instance.
db: DB,
Expand Down Expand Up @@ -101,7 +103,9 @@ pub(crate) struct EventHandler<DB: Database> {
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
/// Timeout for p2p provider requests.
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
p2p_provider_timeout: Duration,
/// Accesible database instance.
db: DB,
Expand Down Expand Up @@ -174,6 +178,7 @@ where
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
swarm,
Expand Down Expand Up @@ -214,6 +219,7 @@ where
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
swarm,
Expand Down
165 changes: 123 additions & 42 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
//! Internal [Event] type and [Handler] implementation.

use super::EventHandler;
use super::{swarm_event::FoundEvent, EventHandler};
#[cfg(feature = "websocket-notify")]
use crate::event_handler::notification::{
self, emit_receipt, EventNotificationTyp, SwarmNotification,
use crate::event_handler::{
notification::{
self, emit_receipt, swarm::workflow_info_source_label, EventNotificationTyp,
SwarmNotification,
},
swarm_event::{ReceiptEvent, WorkflowInfoEvent},
};
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
db::Database,
event_handler::{channel::AsyncChannelSender, Handler, P2PSender, ResponseEvent},
event_handler::{channel::AsyncChannelSender, Handler, P2PSender},
network::{
pubsub,
swarm::{CapsuleTag, RequestResponseKey, TopicMessage},
Expand All @@ -32,7 +36,7 @@ use maplit::btreemap;
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};
#[cfg(all(feature = "ipfs", not(feature = "test-utils")))]
use tokio::runtime::Handle;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

const RENDEZVOUS_NAMESPACE: &str = "homestar";

Expand Down Expand Up @@ -105,6 +109,8 @@ pub(crate) enum Event {
FindRecord(QueryRecord),
/// Remove a given record from the DHT, e.g. a [Receipt].
RemoveRecord(QueryRecord),
/// [Receipt] or [workflow::Info] stored event.
StoredRecord(FoundEvent),
/// Outbound request event to pull data from peers.
OutboundRequest(PeerRequest),
/// Get providers for a record in the DHT, e.g. workflow information.
Expand Down Expand Up @@ -146,6 +152,49 @@ impl Event {
}
Event::FindRecord(record) => record.find(event_handler).await,
Event::RemoveRecord(record) => record.remove(event_handler).await,
Event::StoredRecord(event) => {
#[allow(unused_variables)]
let event = event;
zeeshanlakhani marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(feature = "websocket-notify")]
match event {
FoundEvent::Receipt(ReceiptEvent {
peer_id,
receipt,
notification_type,
}) => notification::emit_event(
event_handler.ws_evt_sender(),
notification_type,
btreemap! {
"publisher" => peer_id.map_or(Ipld::Null, |peer_id| Ipld::String(peer_id.to_string())),
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
),
FoundEvent::Workflow(WorkflowInfoEvent {
peer_id,
workflow_info,
notification_type,
}) => {
if let Some(peer_label) =
workflow_info_source_label(notification_type.clone())
{
notification::emit_event(
event_handler.ws_evt_sender(),
notification_type,
btreemap! {
peer_label => peer_id.map_or(Ipld::Null, |peer_id| Ipld::String(peer_id.to_string())),
"cid" => Ipld::String(workflow_info.cid().to_string()),
"name" => workflow_info.name.map_or(Ipld::Null, |name| Ipld::String(name.to_string())),
"numTasks" => Ipld::Integer(workflow_info.num_tasks as i128),
"progress" => Ipld::List(workflow_info.progress.iter().map(|cid| Ipld::String(cid.to_string())).collect()),
"progressCount" => Ipld::Integer(workflow_info.progress_count as i128),
},
)
}
}
}
}
Event::OutboundRequest(PeerRequest {
peer,
request,
Expand All @@ -171,7 +220,6 @@ impl Event {
.map_err(anyhow::Error::new)?;

let key = RequestResponseKey::new(cid.to_string().into(), capsule_tag);

event_handler.query_senders.insert(query_id, (key, sender));
}
Event::Providers(Ok((providers, key, sender))) => {
Expand Down Expand Up @@ -306,8 +354,8 @@ impl Captured {
SwarmNotification::PublishedReceiptPubsub,
),
btreemap! {
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
);
}
Expand Down Expand Up @@ -337,38 +385,95 @@ impl Captured {
};

if let Ok(receipt_bytes) = Receipt::invocation_capsule(&invocation_receipt) {
let _id = event_handler
event_handler
.swarm
.behaviour_mut()
.kademlia
.put_record(
Record::new(instruction_bytes, receipt_bytes.to_vec()),
receipt_quorum,
)
.map_err(|err| {
warn!(subject = "libp2p.put_record.err",
.map_or_else(
|err| {
warn!(subject = "libp2p.put_record.err",
category = "publish_event",
err=?err,
"receipt not PUT onto DHT")
});
},
|query_id| {
let key = RequestResponseKey::new(
receipt.cid().to_string().into(),
CapsuleTag::Receipt,
);
event_handler.query_senders.insert(query_id, (key, None));

debug!(
subject = "libp2p.put_record",
category = "publish_event",
"receipt PUT onto DHT"
);

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::PutReceiptDht,
),
btreemap! {
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
);
},
);

Arc::make_mut(&mut self.workflow).increment_progress(receipt_cid);
let workflow_cid_bytes = self.workflow.cid_as_bytes();
if let Ok(workflow_bytes) = self.workflow.capsule() {
let _id = event_handler
event_handler
.swarm
.behaviour_mut()
.kademlia
.put_record(
Record::new(workflow_cid_bytes, workflow_bytes),
workflow_quorum,
)
.map_err(|err| {
warn!(subject = "libp2p.put_record.err",
.map_or_else(
|err| {
warn!(subject = "libp2p.put_record.err",
category = "publish_event",
err=?err,
"workflow information not PUT onto DHT")
});
},
|query_id| {
let key = RequestResponseKey::new(
self.workflow.cid().to_string().into(),
CapsuleTag::Workflow,
);
event_handler.query_senders.insert(query_id, (key, None));

debug!(
subject = "libp2p.put_record",
category = "publish_event",
"workflow info PUT onto DHT"
);

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::PutWorkflowInfoDht,
),
btreemap! {
"cid" => Ipld::String(self.workflow.cid().to_string()),
"name" => self.workflow.name.as_ref().map_or(Ipld::Null, |name| Ipld::String(name.to_string())),
"numTasks" => Ipld::Integer(self.workflow.num_tasks as i128),
"progress" => Ipld::List(self.workflow.progress.iter().map(|cid| Ipld::String(cid.to_string())).collect()),
"progressCount" => Ipld::Integer(self.workflow.progress_count as i128),
},
)
},
);
} else {
error!(
subject = "libp2p.put_record.err",
Expand Down Expand Up @@ -457,8 +562,8 @@ impl Replay {
SwarmNotification::PublishedReceiptPubsub,
),
btreemap! {
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
"cid" => Ipld::String(receipt.cid().to_string()),
bgins marked this conversation as resolved.
Show resolved Hide resolved
"ran" => Ipld::String(receipt.ran().to_string())
},
);
})
Expand Down Expand Up @@ -490,14 +595,6 @@ impl QueryRecord {
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

let id = event_handler
.swarm
.behaviour_mut()
Expand All @@ -512,14 +609,6 @@ impl QueryRecord {
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

event_handler
.swarm
.behaviour_mut()
Expand All @@ -537,14 +626,6 @@ impl QueryRecord {
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

let id = event_handler
.swarm
.behaviour_mut()
Expand Down
Loading
Loading