Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Improve rendezvous protocol usage #399

Merged
merged 32 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7250f0a
chore: Add rendezvous config and PEM fixtures
bgins Oct 19, 2023
2e270e3
chore: Add peer registered debug log
bgins Oct 24, 2023
7fade98
test: Add rendezvous connect test
bgins Oct 24, 2023
e6d9e5e
chore: Add rendezvous server config with false default
bgins Oct 24, 2023
5145d58
chore: Add max connected peers config
bgins Oct 24, 2023
d328de7
chore: Add rendezvous server registration expired debug log
bgins Oct 24, 2023
7ea6ff5
test: Add rendezvous disconnect test
bgins Oct 24, 2023
1acc08d
chore: Check identified peer before adding to external addresses
bgins Oct 25, 2023
e79ee5f
chore: Add check to not dial self on rendezvous discovery
bgins Oct 25, 2023
24a1512
refactor: Improve dialing after rendezvous discovery
bgins Oct 25, 2023
4e9135d
chore: Add rendezvous ttl config
bgins Oct 25, 2023
0bab542
chore: Improve logs
bgins Oct 25, 2023
2fc7ee8
chore: Add expired rendezvous peer re-discovery
bgins Oct 26, 2023
6dfed94
chore: Add rendezvous discovery interval config
bgins Oct 27, 2023
15f4d32
feat: Add rendezvous rediscovery and registration renewal
bgins Oct 27, 2023
4e0f08d
chore: Add swarm dialing event debug log
bgins Oct 29, 2023
5a5266c
chore: Require expiration cause for redisovery and registration renewal
bgins Oct 29, 2023
5b37c6c
chore: Add extract timestamps test utility
bgins Oct 29, 2023
89e114f
chore: Remove cache Expiration enum
bgins Oct 30, 2023
9f0b227
chore: Add cache polling loop
bgins Oct 30, 2023
4d7ad72
test: Add rendezvous registration renewal test
bgins Oct 30, 2023
e709f7c
test: Add rendezvous rediscovery test
bgins Oct 31, 2023
0c5308b
test: Add rediscover on expiration test
bgins Oct 31, 2023
189548e
chore: Update fixture ports and add peer ID comments
bgins Oct 31, 2023
fd04b65
chore: Add poll cache to event handler without ipfs feature
bgins Oct 31, 2023
b5f31df
chore: Rename our_registration to self_registration
bgins Oct 31, 2023
402a412
chore: Remove unneeded sender clone
bgins Oct 31, 2023
e02ce80
chore: Move event handler rendezvous fields into a substruct
bgins Oct 31, 2023
c4bbd5f
chore: Move event handler connection fields into a substruct
bgins Oct 31, 2023
0cec3b1
chore: Add poll cache interval config
bgins Oct 31, 2023
263a871
chore: Upgrade to libp2p 0.52.4
bgins Oct 31, 2023
0ea35d0
chore: updates
zeeshanlakhani Nov 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use_flake

export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug
export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,moka=debug
export RUST_BACKTRACE=full
export RUSTFLAGS="--cfg tokio_unstable"
100 changes: 100 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ metrics-exporter-prometheus = { version = "0.12.1", default-features = false, fe
"http-listener",
], optional = true }
miette = { version = "5.10", default-features = false, features = ["fancy"] }
moka = { version = "0.12.1", default-features = false, features = [
"future",
"sync",
] }
names = { version = "0.14", default-features = false, optional = true }
proptest = { version = "1.2", optional = true }
puffin = { version = "0.17", default-features = false, optional = true }
Expand Down
3 changes: 3 additions & 0 deletions homestar-runtime/fixtures/__testkey_ed25519_2.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN PRIVATE KEY-----
AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE=
-----END PRIVATE KEY-----
3 changes: 3 additions & 0 deletions homestar-runtime/fixtures/__testkey_ed25519_3.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN PRIVATE KEY-----
AgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgI=
-----END PRIVATE KEY-----
3 changes: 3 additions & 0 deletions homestar-runtime/fixtures/__testkey_ed25519_4.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN PRIVATE KEY-----
AwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwM=
-----END PRIVATE KEY-----
3 changes: 3 additions & 0 deletions homestar-runtime/fixtures/__testkey_ed25519_5.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN PRIVATE KEY-----
BAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQ=
-----END PRIVATE KEY-----
50 changes: 45 additions & 5 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::network::ws;
use crate::network::IpfsCli;
use crate::{
db::Database,
network::swarm::{ComposedBehaviour, RequestResponseKey},
network::swarm::{ComposedBehaviour, PeerDiscoveryInfo, RequestResponseKey},
settings,
};
use anyhow::Result;
Expand All @@ -16,14 +16,17 @@ use libp2p::{
core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie,
request_response::RequestId, swarm::Swarm, PeerId,
};
use moka::future::Cache;
use std::{sync::Arc, time::Duration};
use swarm_event::ResponseEvent;
use tokio::select;
use tokio::{runtime::Handle, select};

pub(crate) mod cache;
pub mod channel;
pub(crate) mod error;
pub(crate) mod event;
pub(crate) mod swarm_event;
pub(crate) use cache::{setup_cache, CacheValue};
pub(crate) use error::RequestResponseError;
pub(crate) use event::Event;

Expand Down Expand Up @@ -54,11 +57,16 @@ pub(crate) struct EventHandler<DB: Database> {
p2p_provider_timeout: Duration,
db: DB,
swarm: Swarm<ComposedBehaviour>,
cache: Arc<Cache<String, CacheValue>>,
sender: Arc<channel::AsyncBoundedChannelSender<Event>>,
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
connected_peers_limit: u32,
discovered_peers: FnvHashMap<PeerId, PeerDiscoveryInfo>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_registration_ttl: Duration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bgins thinking maybe we do have some substruct here: rendezvous, which has ttl and discovery interval and cookies? Trying to clean up this bit on length here. We may want the same for the two connected peers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Also added discovered_peers to the rendezvous one because it is not relevant for peers discovered through mDNS. We may revisit if we use the proposed ambient discovery protocol in the future.

Was slightly unsure about the naming for the connected peers one. Went with this struct:

// Connected peers configuration and state
struct Connections {
peers: FnvHashMap<PeerId, ConnectedPoint>,
max_peers: u32,
}

Which on the event handler is:

connections: Connections,

And when used we have event_handler.connections.peers and event_handler.connections.max_peers. In the first one, having connections and peers seems a bit redundant? I might be overthinking it.

rendezvous_discovery_interval: Duration,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
pubsub_enabled: bool,
ws_msg_sender: ws::Notifier,
Expand All @@ -76,11 +84,16 @@ pub(crate) struct EventHandler<DB: Database> {
p2p_provider_timeout: Duration,
db: DB,
swarm: Swarm<ComposedBehaviour>,
cache: Cache<String, CacheValue>,
sender: Arc<channel::AsyncBoundedChannelSender<Event>>,
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
connected_peers_limit: u32,
discovered_peers: FnvHashMap<PeerId, PeerDiscoveryInfo>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_registration_ttl: Duration,
rendezvous_discovery_interval: Duration,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
pubsub_enabled: bool,
node_addresses: Vec<libp2p::Multiaddr>,
Expand Down Expand Up @@ -110,17 +123,23 @@ where
ws_msg_sender: ws::Notifier,
) -> Self {
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.network.receipt_quorum,
workflow_quorum: settings.network.workflow_quorum,
p2p_provider_timeout: settings.network.p2p_provider_timeout,
db,
swarm,
sender: Arc::new(sender),
cache: Arc::new(setup_cache(sender.clone())),
sender: sender.clone(),
zeeshanlakhani marked this conversation as resolved.
Show resolved Hide resolved
receiver,
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
connected_peers_limit: settings.network.max_connected_peers,
discovered_peers: FnvHashMap::default(),
rendezvous_registration_ttl: settings.network.rendezvous_registration_ttl,
rendezvous_discovery_interval: settings.network.rendezvous_discovery_interval,
rendezvous_cookies: FnvHashMap::default(),
pubsub_enabled: settings.network.enable_pubsub,
ws_msg_sender,
Expand All @@ -134,17 +153,23 @@ where
#[cfg(not(feature = "websocket-server"))]
pub(crate) fn new(swarm: Swarm<ComposedBehaviour>, db: DB, settings: &settings::Node) -> Self {
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.network.receipt_quorum,
workflow_quorum: settings.network.workflow_quorum,
p2p_provider_timeout: settings.network.p2p_provider_timeout,
db,
swarm,
sender: Arc::new(sender),
cache: Arc::new(setup_cache(sender.clone())),
sender: sender.clone(),
bgins marked this conversation as resolved.
Show resolved Hide resolved
receiver,
query_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
connected_peers_limit: settings.network.max_connected_peers,
discovered_peers: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
rendezvous_registration_ttl: settings.network.rendezvous_registration_ttl,
bgins marked this conversation as resolved.
Show resolved Hide resolved
rendezvous_discovery_interval: settings.network.rendezvous_discovery_interval,
rendezvous_cookies: FnvHashMap::default(),
pubsub_enabled: settings.network.enable_pubsub,
node_addresses: settings.network.node_addresses.clone(),
Expand Down Expand Up @@ -189,15 +214,20 @@ where
swarm_event.handle_event(&mut self).await;

}

}

// Poll cache for expired entries
self.cache.run_pending_tasks().await;
bgins marked this conversation as resolved.
Show resolved Hide resolved
}
}
/// Start [EventHandler] that matches on swarm and pubsub [events].
///
/// [events]: libp2p::swarm::SwarmEvent
#[cfg(feature = "ipfs")]
pub(crate) async fn start(mut self, ipfs: IpfsCli) -> Result<()> {
let handle = Handle::current();
handle.spawn(poll_cache(self.cache.clone()));

loop {
select! {
runtime_event = self.receiver.recv_async() => {
Expand All @@ -213,3 +243,13 @@ where
}
}
}

/// Poll cache for expired entries
async fn poll_cache(cache: Arc<Cache<String, CacheValue>>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
bgins marked this conversation as resolved.
Show resolved Hide resolved

loop {
interval.tick().await;
cache.run_pending_tasks().await;
}
}
Loading
Loading