Skip to content

Commit

Permalink
feat: Add rendezvous rediscovery and registration renewal
Browse files Browse the repository at this point in the history
Add cache with eviction listener to trigger discovery and renewal events
on an interval and at TTL respectively.
  • Loading branch information
bgins committed Oct 27, 2023
1 parent 6dfed94 commit 15f4d32
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use_flake

export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug
export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,moka=debug
export RUST_BACKTRACE=full
export RUSTFLAGS="--cfg tokio_unstable"
100 changes: 100 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ metrics-exporter-prometheus = { version = "0.12.1", default-features = false, fe
"http-listener",
], optional = true }
miette = { version = "5.10", default-features = false, features = ["fancy"] }
moka = { version = "0.12.1", default-features = false, features = [
"future",
"sync",
] }
names = { version = "0.14", default-features = false, optional = true }
proptest = { version = "1.2", optional = true }
puffin = { version = "0.17", default-features = false, optional = true }
Expand Down
20 changes: 17 additions & 3 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ use libp2p::{
core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie,
request_response::RequestId, swarm::Swarm, PeerId,
};
use moka::future::Cache;
use std::{sync::Arc, time::Duration};
use swarm_event::ResponseEvent;
use tokio::select;

pub(crate) mod cache;
pub mod channel;
pub(crate) mod error;
pub(crate) mod event;
pub(crate) mod swarm_event;
pub(crate) use cache::{setup_cache, CacheValue};
pub(crate) use error::RequestResponseError;
pub(crate) use event::Event;

Expand Down Expand Up @@ -54,6 +57,7 @@ pub(crate) struct EventHandler<DB: Database> {
p2p_provider_timeout: Duration,
db: DB,
swarm: Swarm<ComposedBehaviour>,
cache: Cache<String, CacheValue>,
sender: Arc<channel::AsyncBoundedChannelSender<Event>>,
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
Expand All @@ -80,6 +84,7 @@ pub(crate) struct EventHandler<DB: Database> {
p2p_provider_timeout: Duration,
db: DB,
swarm: Swarm<ComposedBehaviour>,
cache: Cache<String, CacheValue>,
sender: Arc<channel::AsyncBoundedChannelSender<Event>>,
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
Expand Down Expand Up @@ -118,13 +123,15 @@ where
ws_msg_sender: ws::Notifier,
) -> Self {
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.network.receipt_quorum,
workflow_quorum: settings.network.workflow_quorum,
p2p_provider_timeout: settings.network.p2p_provider_timeout,
db,
swarm,
sender: Arc::new(sender),
cache: setup_cache(sender.clone()),
sender: sender.clone(),
receiver,
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
Expand All @@ -146,13 +153,15 @@ where
#[cfg(not(feature = "websocket-server"))]
pub(crate) fn new(swarm: Swarm<ComposedBehaviour>, db: DB, settings: &settings::Node) -> Self {
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.network.receipt_quorum,
workflow_quorum: settings.network.workflow_quorum,
p2p_provider_timeout: settings.network.p2p_provider_timeout,
db,
swarm,
sender: Arc::new(sender),
cache: setup_cache(sender.clone()),
sender: sender.clone(),
receiver,
query_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
Expand Down Expand Up @@ -205,8 +214,10 @@ where
swarm_event.handle_event(&mut self).await;

}

}

// Poll cache for expired entries
self.cache.run_pending_tasks().await;
}
}
/// Start [EventHandler] that matches on swarm and pubsub [events].
Expand All @@ -226,6 +237,9 @@ where
swarm_event.handle_event(&mut self, ipfs_clone).await;
}
}

// Poll cache for expired entries
self.cache.run_pending_tasks().await;
}
}
}
91 changes: 91 additions & 0 deletions homestar-runtime/src/event_handler/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::{channel, event_handler::Event};
use libp2p::PeerId;
use moka::{future::Cache, Expiry as ExpiryBase};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};

struct Expiry;

impl ExpiryBase<String, CacheValue> for Expiry {
fn expire_after_create(
&self,
_key: &String,
value: &CacheValue,
_current_time: Instant,
) -> Option<Duration> {
value.expiration.as_duration()
}
}

#[derive(Clone, Debug)]
pub(crate) struct CacheValue {
expiration: Expiration,
data: HashMap<String, CacheData>,
}

impl CacheValue {
pub(crate) fn new(expiration: Expiration, data: HashMap<String, CacheData>) -> Self {
Self { expiration, data }
}
}

#[derive(Clone, Debug)]
pub(crate) enum CacheData {
Peer(PeerId),
OnEviction(DispatchEvent),
}

#[derive(Clone, Debug)]
pub(crate) enum DispatchEvent {
RegisterPeer,
DiscoverPeers,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum Expiration {
Registration(Duration),
Discovery(Duration),
}

impl Expiration {
pub(crate) fn as_duration(&self) -> Option<Duration> {
match self {
Expiration::Registration(ttl) => Some(*ttl),
Expiration::Discovery(interval) => Some(*interval),
}
}
}

pub(crate) fn setup_cache(
sender: Arc<channel::AsyncBoundedChannelSender<Event>>,
) -> Cache<String, CacheValue> {
let eviction_listener = move |_key: Arc<String>, val: CacheValue, _cause| {
let tx = Arc::clone(&sender);

if let Some(CacheData::OnEviction(event)) = val.data.get("on_eviction") {
match event {
DispatchEvent::RegisterPeer => {
if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node")
{
let _ = tx.send(Event::RegisterPeer(rendezvous_node.to_owned()));
};
}
DispatchEvent::DiscoverPeers => {
if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node")
{
let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned()));
};
}
}
}
};

Cache::builder()
.expire_after(Expiry)
.time_to_live(Duration::from_secs(5))
.eviction_listener(eviction_listener)
.build()
}
Loading

0 comments on commit 15f4d32

Please sign in to comment.