Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce status method allowing to query the Node's status #272

Merged
merged 3 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ interface LDKNode {
void start();
[Throws=NodeError]
void stop();
NodeStatus status();
Config config();
Event? next_event();
Event wait_next_event();
[Async]
Expand Down Expand Up @@ -97,7 +99,6 @@ interface LDKNode {
[Throws=NodeError]
string sign_message([ByRef]sequence<u8> msg);
boolean verify_signature([ByRef]sequence<u8> msg, [ByRef]string sig, [ByRef]PublicKey pkey);
boolean is_running();
};

[Error]
Expand Down Expand Up @@ -137,6 +138,22 @@ enum NodeError {
"LiquidityFeeTooHigh",
};

dictionary NodeStatus {
boolean is_running;
boolean is_listening;
BestBlock current_best_block;
u64? latest_wallet_sync_timestamp;
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
};

dictionary BestBlock {
BlockHash block_hash;
u32 height;
};

[Error]
enum BuildError {
"InvalidSeedBytes",
Expand Down
14 changes: 14 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use std::fmt;
use std::fs;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;

Expand Down Expand Up @@ -945,6 +946,13 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(

let (stop_sender, _) = tokio::sync::watch::channel(());

let is_listening = Arc::new(AtomicBool::new(false));
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None));
let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None));
let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None));

Ok(Node {
runtime,
stop_sender,
Expand All @@ -968,6 +976,12 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
scorer,
peer_store,
payment_store,
is_listening,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
})
}

Expand Down
148 changes: 128 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub use error::Error as NodeError;
use error::Error;

pub use event::Event;
pub use types::ChannelConfig;
pub use types::{BestBlock, ChannelConfig};

pub use io::utils::generate_entropy_mnemonic;

Expand Down Expand Up @@ -167,8 +167,9 @@ use rand::Rng;

use std::default::Default;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

#[cfg(feature = "uniffi")]
uniffi::include_scaffolding!("ldk_node");
Expand Down Expand Up @@ -199,6 +200,12 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
scorer: Arc<Mutex<Scorer>>,
peer_store: Arc<PeerStore<K, Arc<FilesystemLogger>>>,
payment_store: Arc<PaymentStore<K, Arc<FilesystemLogger>>>,
is_listening: Arc<AtomicBool>,
latest_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_onchain_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_fee_rate_cache_update_timestamp: Arc<RwLock<Option<u64>>>,
latest_rgs_snapshot_timestamp: Arc<RwLock<Option<u64>>>,
latest_node_announcement_broadcast_timestamp: Arc<RwLock<Option<u64>>>,
tnull marked this conversation as resolved.
Show resolved Hide resolved
}

impl<K: KVStore + Sync + Send + 'static> Node<K> {
Expand All @@ -222,6 +229,8 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
// Block to ensure we update our fee rate cache once on startup
let fee_estimator = Arc::clone(&self.fee_estimator);
let sync_logger = Arc::clone(&self.logger);
let sync_fee_rate_update_timestamp =
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let runtime_ref = &runtime;
tokio::task::block_in_place(move || {
runtime_ref.block_on(async move {
Expand All @@ -233,6 +242,9 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
"Initial fee rate cache update finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
Ok(())
},
Err(e) => {
Expand All @@ -246,6 +258,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
// Setup wallet sync
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
Expand All @@ -267,11 +280,16 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => log_trace!(
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
),
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
sync_logger,
Expand All @@ -289,6 +307,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {

let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs =
self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
Expand All @@ -307,11 +326,16 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
_ = fee_rate_update_interval.tick() => {
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => log_trace!(
Ok(()) => {
log_trace!(
fee_update_logger,
"Background update of fee rate cache finished in {}ms.",
now.elapsed().as_millis()
),
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
fee_update_logger,
Expand All @@ -330,6 +354,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
Expand All @@ -350,11 +375,16 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
];
let now = Instant::now();
match tx_sync.sync(confirmables).await {
Ok(()) => log_trace!(
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
),
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
}
Expand All @@ -368,6 +398,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let gossip_source = Arc::clone(&self.gossip_source);
let gossip_sync_store = Arc::clone(&self.kv_store);
let gossip_sync_logger = Arc::clone(&self.logger);
let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp);
let mut stop_gossip_sync = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
Expand Down Expand Up @@ -395,6 +426,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
log_error!(gossip_sync_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
*gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64);
}
Err(e) => log_error!(
gossip_sync_logger,
Expand All @@ -413,6 +445,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);
let listening_indicator = Arc::clone(&self.is_listening);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());

Expand All @@ -431,6 +464,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
}

runtime.spawn(async move {
{
let listener =
tokio::net::TcpListener::bind(&*bind_addrs).await
.unwrap_or_else(|e| {
Expand All @@ -440,11 +474,13 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
);
});

listening_indicator.store(true, Ordering::Release);

loop {
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
tokio::select! {
_ = stop_listen.changed() => {
return;
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
Expand All @@ -458,6 +494,9 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
}
}
}
}

listening_indicator.store(false, Ordering::Release);
});
}

Expand Down Expand Up @@ -508,6 +547,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let bcast_config = Arc::clone(&self.config);
let bcast_store = Arc::clone(&self.kv_store);
let bcast_logger = Arc::clone(&self.logger);
let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp);
let mut stop_bcast = self.stop_sender.subscribe();
runtime.spawn(async move {
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
Expand Down Expand Up @@ -553,12 +593,17 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {

bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);

let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
.unwrap_or_else(|e| {
log_error!(bcast_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt;

if let Some(unix_time_secs) = unix_time_secs_opt {
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
.unwrap_or_else(|e| {
log_error!(bcast_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
}
}
}
}
Expand Down Expand Up @@ -662,11 +707,6 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
Ok(())
}

/// Returns whether the [`Node`] is running.
pub fn is_running(&self) -> bool {
self.runtime.read().unwrap().is_some()
}

/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
///
/// After this returns most API methods will return [`Error::NotRunning`].
Expand Down Expand Up @@ -697,6 +737,37 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
Ok(())
}

/// Returns the status of the [`Node`].
pub fn status(&self) -> NodeStatus {
let is_running = self.runtime.read().unwrap().is_some();
let is_listening = self.is_listening.load(Ordering::Acquire);
let current_best_block = self.channel_manager.current_best_block().into();
let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap();
let latest_onchain_wallet_sync_timestamp =
*self.latest_onchain_wallet_sync_timestamp.read().unwrap();
let latest_fee_rate_cache_update_timestamp =
*self.latest_fee_rate_cache_update_timestamp.read().unwrap();
let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap();
let latest_node_announcement_broadcast_timestamp =
*self.latest_node_announcement_broadcast_timestamp.read().unwrap();

NodeStatus {
is_running,
is_listening,
current_best_block,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
}
}

/// Returns the config with which the [`Node`] was initialized.
pub fn config(&self) -> Config {
self.config.as_ref().clone()
}

/// Returns the next event in the event queue, if currently available.
///
/// Will return `Some(..)` if an event is available and `None` otherwise.
Expand Down Expand Up @@ -1746,6 +1817,43 @@ impl<K: KVStore + Sync + Send + 'static> Drop for Node<K> {
}
}

/// Represents the status of the [`Node`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NodeStatus {
/// Indicates whether the [`Node`] is running.
pub is_running: bool,
/// Indicates whether the [`Node`] is listening for incoming connections on the addresses
/// configured via [`Config::listening_addresses`].
pub is_listening: bool,
/// The best block to which our Lightning wallet is currently synced.
pub current_best_block: BestBlock,
tnull marked this conversation as resolved.
Show resolved Hide resolved
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
/// our Lightning wallet to the chain tip.
///
/// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized.
pub latest_wallet_sync_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
/// our on-chain wallet to the chain tip.
///
/// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized.
pub latest_onchain_wallet_sync_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully update
/// our fee rate cache.
///
/// Will be `None` if the cache hasn't been updated since the [`Node`] was initialized.
pub latest_fee_rate_cache_update_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when the last rapid gossip sync
/// (RGS) snapshot we successfully applied was generated.
///
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated since the [`Node`] was initialized.
pub latest_rgs_snapshot_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
/// announcement.
///
/// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized.
pub latest_node_announcement_broadcast_timestamp: Option<u64>,
Comment on lines +1834 to +1854
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use Duration for these?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if anything we'd want to make them SystemTime (or chrono::DateTime for that matter), but we'll need to convert to timestamps for the bindings anyways. Same goes for the upcoming inclusion of a last_changed timestamp in the PaymentDetails, where an additional blocker is that we don't implement Readable/Writeable for SystemTime upstream.
So I currently lean towards using UNIX timestamps in the API as everything else seems to complicate things further?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Duration is a timestamp when interpreted as relative to the Unix epoch. It just gives you more precision than u64 seconds and may be more useful interface to work with. But fair enough regarding bindings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, I agree in Rust a Duration would be preferable and I see that we even implement Writeable/Readable for it. However, in the bindings it would be weird because I can either translate it to a u64 where it becomes unclear what unit to use: in this context seconds would make sense, but otherwise we might want to translate it to a u64 representing nanos or so. The other way would to expose it as an interface Duration that mirrors the Rust type. However, in Kotlin/Swift/Python introducing a general Duration type wouldn't make a lot of sense. Technically Uniffi would have the capability to translate a value on the bindings side once more, which would allow us to use the idiomatic Swift/Kotlin/Python types. I'll think about it some more. Good thing is that these fields aren't persisted, i.e., until we do we can easily reconsider/improve them.

}

async fn connect_peer_if_necessary<K: KVStore + Sync + Send + 'static>(
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager<K>>,
logger: Arc<FilesystemLogger>,
Expand Down
Loading
Loading