Skip to content

Commit

Permalink
Merge pull request #122 from p2pderivatives/feat/split-block-processi…
Browse files Browse the repository at this point in the history
…ng-from-confirming

Split block processing from confirmation reporting
  • Loading branch information
luckysori authored Sep 21, 2023
2 parents 695cec5 + 44d888e commit 724f598
Show file tree
Hide file tree
Showing 15 changed files with 675 additions and 657 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[workspace]

members = [
"bitcoin-test-utils",
"bitcoin-rpc-provider",
Expand All @@ -14,6 +13,7 @@ members = [
"dlc-sled-storage-provider",
"electrs-blockchain-provider",
]
resolver = "2"

[patch.crates-io]
lightning = { git = "https://github.com/p2pderivatives/rust-lightning/", rev = "c4505a30" }
Expand Down
186 changes: 132 additions & 54 deletions dlc-manager/src/chain_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,36 @@
use std::collections::HashMap;

use bitcoin::{Block, BlockHash, Transaction, Txid};
use bitcoin::{Block, OutPoint, Transaction, Txid};
use dlc_messages::ser_impls::{
read_ecdsa_adaptor_signature, read_hash_map, read_vec, write_ecdsa_adaptor_signature,
write_hash_map, write_vec,
read_ecdsa_adaptor_signature, read_hash_map, write_ecdsa_adaptor_signature, write_hash_map,
};
use lightning::ln::msgs::DecodeError;
use lightning::util::ser::{Readable, Writeable, Writer};
use secp256k1_zkp::EcdsaAdaptorSignature;

use crate::ChannelId;

const NB_SAVED_BLOCK_HASHES: usize = 6;

/// A `ChainMonitor` keeps a list of transaction ids to watch for in the blockchain,
/// and some associated information used to apply an action when the id is seen.
#[derive(Debug, PartialEq, Eq)]
pub struct ChainMonitor {
pub(crate) watched_tx: HashMap<Txid, ChannelInfo>,
pub(crate) watched_tx: HashMap<Txid, WatchState>,
pub(crate) watched_txo: HashMap<OutPoint, WatchState>,
pub(crate) last_height: u64,
pub(crate) last_block_hashes: Vec<BlockHash>,
}

impl_dlc_writeable!(ChainMonitor, { (watched_tx, { cb_writeable, write_hash_map, read_hash_map}), (last_height, writeable), (last_block_hashes, { cb_writeable, write_vec, read_vec}) });
impl_dlc_writeable!(ChainMonitor, { (watched_tx, { cb_writeable, write_hash_map, read_hash_map}), (watched_txo, { cb_writeable, write_hash_map, read_hash_map}), (last_height, writeable) });

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct ChannelInfo {
pub channel_id: ChannelId,
pub tx_type: TxType,
}

impl_dlc_writeable!(ChannelInfo, { (channel_id, writeable), (tx_type, writeable) });

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum TxType {
Revoked {
update_idx: u64,
Expand All @@ -47,6 +44,7 @@ pub(crate) enum TxType {
CollaborativeClose,
SplitTx,
SettleTx,
Cet,
}

impl_dlc_writeable_enum!(TxType,;
Expand All @@ -56,7 +54,7 @@ impl_dlc_writeable_enum!(TxType,;
(is_offer, writeable),
(revoked_tx_type, writeable)
});;
(1, BufferTx), (2, CollaborativeClose), (3, SplitTx), (4, SettleTx)
(1, BufferTx), (2, CollaborativeClose), (3, SplitTx), (4, SettleTx), (5, Cet)
);

#[derive(Clone, Debug, PartialEq, Eq, Copy)]
Expand All @@ -73,8 +71,8 @@ impl ChainMonitor {
pub fn new(init_height: u64) -> Self {
ChainMonitor {
watched_tx: HashMap::new(),
watched_txo: HashMap::new(),
last_height: init_height,
last_block_hashes: Vec::with_capacity(NB_SAVED_BLOCK_HASHES),
}
}

Expand All @@ -84,65 +82,145 @@ impl ChainMonitor {
}

pub(crate) fn add_tx(&mut self, txid: Txid, channel_info: ChannelInfo) {
self.watched_tx.insert(txid, channel_info);
log::debug!("Watching transaction {txid}: {channel_info:?}");
self.watched_tx.insert(txid, WatchState::new(channel_info));

// When we watch a buffer transaction we also want to watch
// the buffer transaction _output_ so that we can detect when
// a CET spends it without having to watch every possible CET
if channel_info.tx_type == TxType::BufferTx {
let outpoint = OutPoint {
txid,
// We can safely assume that the buffer transaction
// only has one output
vout: 0,
};
self.add_txo(
outpoint,
ChannelInfo {
channel_id: channel_info.channel_id,
tx_type: TxType::Cet,
},
);
}
}

fn add_txo(&mut self, outpoint: OutPoint, channel_info: ChannelInfo) {
log::debug!("Watching transaction output {outpoint}: {channel_info:?}");
self.watched_txo
.insert(outpoint, WatchState::new(channel_info));
}

pub(crate) fn cleanup_channel(&mut self, channel_id: ChannelId) {
log::debug!("Cleaning up data related to channel {channel_id:?}");

self.watched_tx
.retain(|_, state| state.channel_id() != channel_id);

self.watched_txo
.retain(|_, state| state.channel_id() != channel_id);
}

pub(crate) fn remove_tx(&mut self, txid: &Txid) {
log::debug!("Stopped watching transaction {txid}");
self.watched_tx.remove(txid);
}

pub(crate) fn cleanup_channel(&mut self, channel_id: ChannelId) {
let to_remove = self
.watched_tx
.iter()
.filter_map(|x| {
if x.1.channel_id == channel_id {
Some(*x.0)
} else {
None
/// Check if any watched transactions are part of the block, confirming them if so.
///
/// # Panics
///
/// Panics if the new block's height is not exactly one more than the last processed height.
pub(crate) fn process_block(&mut self, block: &Block, height: u64) {
assert_eq!(self.last_height + 1, height);

for tx in block.txdata.iter() {
if let Some(state) = self.watched_tx.get_mut(&tx.txid()) {
state.confirm(tx.clone());
}

for txin in tx.input.iter() {
if let Some(state) = self.watched_txo.get_mut(&txin.previous_output) {
state.confirm(tx.clone())
}
})
.collect::<Vec<_>>();
for txid in to_remove {
self.watched_tx.remove(&txid);
}
}

self.last_height += 1;
}

pub(crate) fn process_block(
&self,
block: &Block,
height: u64,
) -> Vec<(Transaction, ChannelInfo)> {
let mut res = Vec::new();
/// All the currently watched transactions which have been confirmed.
pub(crate) fn confirmed_txs(&self) -> Vec<(Transaction, ChannelInfo)> {
(self.watched_tx.values())
.chain(self.watched_txo.values())
.filter_map(|state| match state {
WatchState::Registered { .. } => None,
WatchState::Confirmed {
channel_info,
transaction,
} => Some((transaction.clone(), *channel_info)),
})
.collect()
}
}

assert_eq!(self.last_height + 1, height);
/// The state of a watched transaction or transaction output.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum WatchState {
/// It has been registered but we are not aware of any
/// confirmations.
Registered { channel_info: ChannelInfo },
/// It has received at least one confirmation.
Confirmed {
channel_info: ChannelInfo,
transaction: Transaction,
},
}

for tx in &block.txdata {
let channel_info = self.watched_tx.get(&tx.txid()).or_else(|| {
for txid in tx.input.iter().map(|x| &x.previous_output.txid) {
let info = self.watched_tx.get(txid);
if info.is_some() {
return info;
}
impl_dlc_writeable_enum!(
WatchState,;
(0, Registered, {(channel_info, writeable)}),
(1, Confirmed, {(channel_info, writeable), (transaction, writeable)});;
);

impl WatchState {
fn new(channel_info: ChannelInfo) -> Self {
Self::Registered { channel_info }
}

fn confirm(&mut self, transaction: Transaction) {
match self {
WatchState::Registered { ref channel_info } => {
log::info!(
"Transaction {} confirmed: {channel_info:?}",
transaction.txid()
);

*self = WatchState::Confirmed {
channel_info: *channel_info,
transaction,
}
None
});
if let Some(channel_info) = channel_info {
res.push((tx.clone(), channel_info.clone()));
}
WatchState::Confirmed {
channel_info,
transaction,
} => {
log::error!(
"Transaction {} already confirmed: {channel_info:?}",
transaction.txid()
);
}
}

res
}

/// To be safe this is a separate function from process block to make sure updates are
/// saved before we update the state. It is better to re-process a block than not
/// process it at all.
pub(crate) fn increment_height(&mut self, last_block_hash: &BlockHash) {
self.last_height += 1;
self.last_block_hashes.push(*last_block_hash);
if self.last_block_hashes.len() > NB_SAVED_BLOCK_HASHES {
self.last_block_hashes.remove(0);
fn channel_info(&self) -> ChannelInfo {
match self {
WatchState::Registered { channel_info }
| WatchState::Confirmed { channel_info, .. } => *channel_info,
}
}

fn channel_id(&self) -> ChannelId {
self.channel_info().channel_id
}
}
2 changes: 1 addition & 1 deletion dlc-manager/src/contract/offered_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl OfferedContract {
let contract_info = contract
.contract_infos
.iter()
.zip(oracle_announcements.into_iter())
.zip(oracle_announcements)
.map(|(x, y)| ContractInfo {
contract_descriptor: x.contract_descriptor.clone(),
oracle_announcements: y,
Expand Down
2 changes: 1 addition & 1 deletion dlc-manager/src/conversion_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl From<&OfferedContract> for SerContractInfo {
let mut contract_infos: Vec<ContractInfoInner> = offered_contract
.contract_info
.iter()
.zip(oracle_infos.into_iter())
.zip(oracle_infos)
.map(|(c, o)| ContractInfoInner {
contract_descriptor: (&c.contract_descriptor).into(),
oracle_info: o,
Expand Down
Loading

0 comments on commit 724f598

Please sign in to comment.