Skip to content

Commit

Permalink
Merge branch 'master' of github.com:aspectron/rusty-kaspa into wallet…
Browse files Browse the repository at this point in the history
…-store
  • Loading branch information
aspect committed Jun 19, 2023
2 parents 0504c03 + 6a2a1dc commit 23feef7
Show file tree
Hide file tree
Showing 68 changed files with 1,691 additions and 885 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/addressmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ itertools.workspace = true
rand.workspace = true
parking_lot.workspace = true
borsh.workspace = true
log.workspace = true
7 changes: 5 additions & 2 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ extern crate self as address_manager;
use std::{collections::HashSet, sync::Arc};

use itertools::Itertools;
use kaspa_core::time::unix_now;
use kaspa_core::{debug, time::unix_now};
use kaspa_database::prelude::{StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
use parking_lot::Mutex;
Expand All @@ -31,7 +31,10 @@ impl AddressManager {
}

pub fn add_address(&mut self, address: NetAddress) {
// TODO: Don't add non routable addresses
if address.ip.is_loopback() || address.ip.is_unspecified() {
debug!("[Address manager] skipping local address {}", address.ip);
return;
}

if self.address_store.has(address) {
return;
Expand Down
45 changes: 29 additions & 16 deletions components/connectionmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use futures_util::future::join_all;
use itertools::Itertools;
use kaspa_addressmanager::{AddressManager, NetAddress};
use kaspa_core::{debug, info, warn};
use kaspa_p2p_lib::Peer;
use kaspa_p2p_lib::{common::ProtocolError, ConnectionError, Peer};
use kaspa_utils::triggers::SingleTrigger;
use parking_lot::Mutex as ParkingLotMutex;
use rand::{seq::SliceRandom, thread_rng};
use tokio::{
Expand All @@ -32,7 +33,7 @@ pub struct ConnectionManager {
address_manager: Arc<ParkingLotMutex<AddressManager>>,
connection_requests: TokioMutex<HashMap<SocketAddr, ConnectionRequest>>,
force_next_iteration: UnboundedSender<()>,
shutdown_signal: UnboundedSender<()>,
shutdown_signal: SingleTrigger,
}

#[derive(Clone, Debug)]
Expand All @@ -58,32 +59,34 @@ impl ConnectionManager {
address_manager: Arc<ParkingLotMutex<AddressManager>>,
) -> Arc<Self> {
let (tx, rx) = unbounded_channel::<()>();
let (shutdown_signal_tx, shutdown_signal_rx) = unbounded_channel();
let manager = Arc::new(Self {
p2p_adaptor,
outbound_target,
inbound_limit,
address_manager,
connection_requests: Default::default(),
force_next_iteration: tx,
shutdown_signal: shutdown_signal_tx,
shutdown_signal: SingleTrigger::new(),
dns_seeders,
default_port,
});
manager.clone().start_event_loop(rx, shutdown_signal_rx);
manager.clone().start_event_loop(rx);
manager.force_next_iteration.send(()).unwrap();
manager
}

fn start_event_loop(self: Arc<Self>, mut rx: UnboundedReceiver<()>, mut shutdown_signal_rx: UnboundedReceiver<()>) {
fn start_event_loop(self: Arc<Self>, mut rx: UnboundedReceiver<()>) {
let mut ticker = interval(Duration::from_secs(30));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::spawn(async move {
loop {
if self.shutdown_signal.trigger.is_triggered() {
break;
}
select! {
_ = rx.recv() => self.clone().handle_event().await,
_ = ticker.tick() => self.clone().handle_event().await,
_ = shutdown_signal_rx.recv() => break,
_ = self.shutdown_signal.listener.clone() => break,
}
}
debug!("Connection manager event loop exiting");
Expand All @@ -107,7 +110,7 @@ impl ConnectionManager {
}

pub async fn stop(&self) {
self.shutdown_signal.send(()).unwrap();
self.shutdown_signal.trigger.trigger()
}

async fn handle_connection_requests(self: &Arc<Self>, peer_by_address: &HashMap<SocketAddr, Peer>) {
Expand All @@ -124,7 +127,7 @@ impl ConnectionManager {

if !is_connected && request.next_attempt <= SystemTime::now() {
debug!("Connecting to peer request {}", address);
if self.p2p_adaptor.connect_peer(address.to_string()).await.is_none() {
if self.p2p_adaptor.connect_peer(address.to_string()).await.is_err() {
debug!("Failed connecting to peer request {}", address);
if request.is_permanent {
const MAX_ACCOUNTABLE_ATTEMPTS: u32 = 4;
Expand Down Expand Up @@ -164,6 +167,9 @@ impl ConnectionManager {
let mut progressing = true;
let mut connecting = true;
while connecting && missing_connections > 0 {
if self.shutdown_signal.trigger.is_triggered() {
return;
}
let mut addrs_to_connect = Vec::with_capacity(missing_connections);
let mut jobs = Vec::with_capacity(missing_connections);
for _ in 0..missing_connections {
Expand Down Expand Up @@ -197,13 +203,20 @@ impl ConnectionManager {
}

for (res, net_addr) in (join_all(jobs).await).into_iter().zip(addrs_to_connect) {
if res.is_none() {
debug!("Failed connecting to {:?}", net_addr);
self.address_manager.lock().mark_connection_failure(net_addr);
} else {
self.address_manager.lock().mark_connection_success(net_addr);
missing_connections -= 1;
progressing = true;
match res {
Ok(_) => {
self.address_manager.lock().mark_connection_success(net_addr);
missing_connections -= 1;
progressing = true;
}
Err(ConnectionError::ProtocolError(ProtocolError::PeerAlreadyExists(_))) => {
// We avoid marking the existing connection as connection failure
debug!("Failed connecting to {:?}, peer already exists", net_addr);
}
Err(err) => {
debug!("Failed connecting to {:?}, err: {}", net_addr, err);
self.address_manager.lock().mark_connection_failure(net_addr);
}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use itertools::Itertools;
use kaspa_consensus_core::api::{ConsensusApi, DynConsensus};
use kaspa_core::{core::Core, service::Service};
use kaspa_core::{core::Core, debug, service::Service};
use parking_lot::RwLock;
use std::{collections::VecDeque, ops::Deref, sync::Arc, thread::JoinHandle};

Expand Down Expand Up @@ -32,6 +32,9 @@ pub trait ConsensusFactory: Sync + Send {

/// Create a new empty staging consensus
fn new_staging_consensus(&self) -> (ConsensusInstance, DynConsensusCtl);

/// Close the factory and cleanup any shared resources used by it
fn close(&self);
}

/// Test-only mock factory
Expand All @@ -45,6 +48,10 @@ impl ConsensusFactory for MockFactory {
fn new_staging_consensus(&self) -> (ConsensusInstance, DynConsensusCtl) {
unimplemented!()
}

fn close(&self) {
unimplemented!()
}
}

/// Defines a trait which handles consensus resets for external parts of the system. We avoid using
Expand Down Expand Up @@ -131,6 +138,11 @@ impl ConsensusManager {
handle.join().unwrap();
g = self.inner.write();
}

// All consensus instances have been shutdown and we are exiting, so close the factory. Internally this closes
// the notification root sender channel, leading to a graceful shutdown of the notification sub-system.
debug!("[Consensus manager] all consensus threads exited");
self.factory.close();
}
}

Expand Down
25 changes: 25 additions & 0 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use kaspa_consensus_core::api::{ConsensusApi, DynConsensus};
use kaspa_utils::sync::rwlock::*;
use std::{ops::Deref, sync::Arc};

#[derive(Clone)]
pub struct SessionOwnedReadGuard(RfRwLockOwnedReadGuard);

pub struct SessionReadGuard<'a>(RfRwLockReadGuard<'a>);
Expand Down Expand Up @@ -62,11 +63,21 @@ impl ConsensusInstance {
Self { session_lock, consensus }
}

/// Returns a consensus session for accessing consensus operations in a bulk.
/// The user can safely assume that consensus state is consistent between operations, that
/// is, no pruning was performed between the calls.
/// The caller is responsible to make sure that the lifetime of this session is not
/// too long (~2 seconds max)
pub async fn session(&self) -> ConsensusSession {
let g = self.session_lock.read().await;
ConsensusSession::new(g, self.consensus.clone())
}

/// Returns an *owned* consensus session type which can be cloned and shared across threads.
/// Otherwise behaves like `self.session()`. The sharing ability is also useful for spawning blocking
/// operations on a different thread using the same session object, see [`ConsensusSessionOwned::spawn_blocking`].
/// The caller is responsible to make sure that the overall lifetime of this session is not
/// too long (~2 seconds max)
pub async fn session_owned(&self) -> ConsensusSessionOwned {
let g = self.session_lock.read_owned().await;
ConsensusSessionOwned::new(g, self.consensus.clone())
Expand All @@ -92,6 +103,9 @@ impl Deref for ConsensusSession<'_> {
}
}

/// An *owned* consensus session type which can be cloned and shared across threads.
/// See method `spawn_blocking` within for context on the usefulness of this type
#[derive(Clone)]
pub struct ConsensusSessionOwned {
_session_guard: SessionOwnedReadGuard,
consensus: DynConsensus,
Expand All @@ -101,6 +115,17 @@ impl ConsensusSessionOwned {
pub fn new(session_guard: SessionOwnedReadGuard, consensus: DynConsensus) -> Self {
Self { _session_guard: session_guard, consensus }
}

/// Uses [`tokio::task::spawn_blocking`] to run the provided consensus closure on a thread where blocking is acceptable.
/// Note that this function is only available on the *owned* session, and requires cloning the session. In fact this
/// function is the main motivation for a separate session type.
pub async fn spawn_blocking<F, R>(self, f: F) -> R
where
F: FnOnce(&dyn ConsensusApi) -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(move || f(self.deref())).await.unwrap()
}
}

impl Deref for ConsensusSessionOwned {
Expand Down
Loading

0 comments on commit 23feef7

Please sign in to comment.