Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use super::WalletSyncStatus;
use super::{periodically_archive_fully_resolved_monitors, WalletSyncStatus};

use crate::config::{
BitcoindRestClientConfig, Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
Expand Down Expand Up @@ -306,6 +306,19 @@ impl BitcoindChainSource {
})?;
}

let res = self
.poll_and_update_listeners_inner(channel_manager, chain_monitor, output_sweeper)
.await;

self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);

res
}

async fn poll_and_update_listeners_inner(
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
) -> Result<(), Error> {
let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone();
let chain_tip = if let Some(tip) = latest_chain_tip_opt {
tip
Expand All @@ -317,9 +330,7 @@ impl BitcoindChainSource {
},
Err(e) => {
log_error!(self.logger, "Failed to poll for chain data: {:?}", e);
let res = Err(Error::TxSyncFailed);
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup here, does that mean that the previous commit would send redundant updates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure what you mean? We previously propagated manually whenever erroring out. But now we can just bubble up the error as we'll propagate to subscribers in the wrapping method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but before the change in this commit (this specific commit), was the propagate method being called twice? I am wondering if all commits in the stack are leaving the code in a correct state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, that might be true. Happy to squash the cleanup if you prefer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now went ahead and did so

return res;
return Err(Error::TxSyncFailed);
},
}
};
Expand All @@ -329,7 +340,7 @@ impl BitcoindChainSource {
let chain_listener = ChainListener {
onchain_wallet: Arc::clone(&self.onchain_wallet),
channel_manager: Arc::clone(&channel_manager),
chain_monitor,
chain_monitor: Arc::clone(&chain_monitor),
output_sweeper,
};
let mut spv_client =
Expand All @@ -344,13 +355,19 @@ impl BitcoindChainSource {
now.elapsed().unwrap().as_millis()
);
*self.latest_chain_tip.write().unwrap() = Some(tip);

periodically_archive_fully_resolved_monitors(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context question: channel manager already knows the best block and I assume it is updated somehow, but still we need an external trigger here to archive? It seems responsibility is now with two units instead of one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far we left when/how to archive to the users in LDK. I guess we could consider making it part of the background processor eventually, but I don't think ChannelManager could do it (in absence of KVStore access, etc)

Copy link
Contributor

@joostjager joostjager Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I was confused a bit about the two sources of block information being used (here the connection event, and then the height in cm). But if this is how it is done for now, it's ok.

Arc::clone(&channel_manager),
chain_monitor,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
Arc::clone(&self.node_metrics),
)?;
},
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to poll for chain data: {:?}", e);
let res = Err(Error::TxSyncFailed);
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
return res;
return Err(Error::TxSyncFailed);
},
}

Expand All @@ -376,9 +393,7 @@ impl BitcoindChainSource {
},
Err(e) => {
log_error!(self.logger, "Failed to poll for mempool transactions: {:?}", e);
let res = Err(Error::TxSyncFailed);
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
return res;
return Err(Error::TxSyncFailed);
},
}

Expand All @@ -388,24 +403,13 @@ impl BitcoindChainSource {
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;

let write_res = write_node_metrics(
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
);
match write_res {
Ok(()) => (),
Err(e) => {
log_error!(self.logger, "Failed to persist node metrics: {}", e);
let res = Err(Error::PersistenceFailed);
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
return res;
},
}
)?;

let res = Ok(());
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
res
Ok(())
}

pub(super) async fn update_fee_rate_estimates(&self) -> Result<(), Error> {
Expand Down
84 changes: 50 additions & 34 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,6 @@ impl ElectrumChainSource {
}

pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> {
let electrum_client: Arc<ElectrumRuntimeClient> =
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
Arc::clone(client)
} else {
debug_assert!(
false,
"We should have started the chain source before syncing the onchain wallet"
);
return Err(Error::FeerateEstimationUpdateFailed);
};
let receiver_res = {
let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap();
status_lock.register_or_subscribe_pending_sync()
Expand All @@ -126,6 +116,24 @@ impl ElectrumChainSource {
})?;
}

let res = self.sync_onchain_wallet_inner().await;

self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);

res
}

async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> {
let electrum_client: Arc<ElectrumRuntimeClient> =
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
Arc::clone(client)
} else {
debug_assert!(
false,
"We should have started the chain source before syncing the onchain wallet"
);
return Err(Error::FeerateEstimationUpdateFailed);
};
// If this is our first sync, do a full scan with the configured gap limit.
// Otherwise just do an incremental sync.
let incremental_sync =
Expand Down Expand Up @@ -179,35 +187,13 @@ impl ElectrumChainSource {
apply_wallet_update(update_res, now)
};

self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);

res
}

pub(crate) async fn sync_lightning_wallet(
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
) -> Result<(), Error> {
let electrum_client: Arc<ElectrumRuntimeClient> =
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
Arc::clone(client)
} else {
debug_assert!(
false,
"We should have started the chain source before syncing the lightning wallet"
);
return Err(Error::TxSyncFailed);
};

let sync_cman = Arc::clone(&channel_manager);
let sync_cmon = Arc::clone(&chain_monitor);
let sync_sweeper = Arc::clone(&output_sweeper);
let confirmables = vec![
sync_cman as Arc<dyn Confirm + Sync + Send>,
sync_cmon as Arc<dyn Confirm + Sync + Send>,
sync_sweeper as Arc<dyn Confirm + Sync + Send>,
];

let receiver_res = {
let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap();
status_lock.register_or_subscribe_pending_sync()
Expand All @@ -221,6 +207,38 @@ impl ElectrumChainSource {
})?;
}

let res =
self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await;

self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);

res
}

async fn sync_lightning_wallet_inner(
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
) -> Result<(), Error> {
let sync_cman = Arc::clone(&channel_manager);
let sync_cmon = Arc::clone(&chain_monitor);
let sync_sweeper = Arc::clone(&output_sweeper);
let confirmables = vec![
sync_cman as Arc<dyn Confirm + Sync + Send>,
sync_cmon as Arc<dyn Confirm + Sync + Send>,
sync_sweeper as Arc<dyn Confirm + Sync + Send>,
];

let electrum_client: Arc<ElectrumRuntimeClient> =
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
Arc::clone(client)
} else {
debug_assert!(
false,
"We should have started the chain source before syncing the lightning wallet"
);
return Err(Error::TxSyncFailed);
};

let res = electrum_client.sync_confirmables(confirmables).await;

if let Ok(_) = res {
Expand All @@ -245,8 +263,6 @@ impl ElectrumChainSource {
)?;
}

self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);

res
}

Expand Down
Loading
Loading