diff --git a/lightning/src/chain/keysinterface.rs b/lightning/src/chain/keysinterface.rs index e2e4403b528..544df015a8f 100644 --- a/lightning/src/chain/keysinterface.rs +++ b/lightning/src/chain/keysinterface.rs @@ -88,6 +88,57 @@ pub enum SpendableOutputDescriptor { } } +impl Writeable for SpendableOutputDescriptor { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &SpendableOutputDescriptor::StaticOutput { ref outpoint, ref output } => { + 0u8.write(writer)?; + outpoint.write(writer)?; + output.write(writer)?; + }, + &SpendableOutputDescriptor::DynamicOutputP2WSH { ref outpoint, ref key, ref witness_script, ref to_self_delay, ref output } => { + 1u8.write(writer)?; + outpoint.write(writer)?; + key.write(writer)?; + witness_script.write(writer)?; + to_self_delay.write(writer)?; + output.write(writer)?; + }, + &SpendableOutputDescriptor::DynamicOutputP2WPKH { ref outpoint, ref key, ref output } => { + 2u8.write(writer)?; + outpoint.write(writer)?; + key.write(writer)?; + output.write(writer)?; + }, + } + Ok(()) + } +} + +impl Readable for SpendableOutputDescriptor { + fn read(reader: &mut R) -> Result { + match Readable::read(reader)? { + 0u8 => Ok(SpendableOutputDescriptor::StaticOutput { + outpoint: Readable::read(reader)?, + output: Readable::read(reader)?, + }), + 1u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WSH { + outpoint: Readable::read(reader)?, + key: Readable::read(reader)?, + witness_script: Readable::read(reader)?, + to_self_delay: Readable::read(reader)?, + output: Readable::read(reader)?, + }), + 2u8 => Ok(SpendableOutputDescriptor::DynamicOutputP2WPKH { + outpoint: Readable::read(reader)?, + key: Readable::read(reader)?, + output: Readable::read(reader)?, + }), + _ => Err(DecodeError::InvalidValue), + } + } +} + /// A trait to describe an object which can get user secrets and key material. pub trait KeysInterface: Send + Sync { /// A type which implements ChannelKeys which will be returned by get_channel_keys. diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 7e83c3f1221..1bc8c76b49e 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -37,7 +37,7 @@ use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInter use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; use util::logger::Logger; -use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48}; +use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48}; use util::{byte_utils, events}; use std::collections::{HashMap, hash_map, HashSet}; @@ -222,7 +222,6 @@ pub struct SimpleManyChannelMonitor>>, chain_monitor: Arc, broadcaster: T, - pending_events: Mutex>, logger: Arc, fee_estimator: F } @@ -234,16 +233,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { let block_hash = header.bitcoin_hash(); - let mut new_events: Vec = Vec::with_capacity(0); { let mut monitors = self.monitors.lock().unwrap(); for monitor in monitors.values_mut() { - let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator); - if spendable_outputs.len() > 0 { - new_events.push(events::Event::SpendableOutputs { - outputs: spendable_outputs, - }); - } + let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator); for (ref txid, ref outputs) in txn_outputs { for (idx, output) in outputs.iter().enumerate() { @@ -252,8 +245,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + } } } - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.append(&mut new_events); } fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { @@ -276,7 +267,6 @@ impl Vec { - let mut pending_events = self.pending_events.lock().unwrap(); - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut *pending_events); - ret + let mut pending_events = Vec::new(); + for chan in self.monitors.lock().unwrap().values_mut() { + pending_events.append(&mut chan.get_and_clear_pending_events()); + } + pending_events } } @@ -792,6 +783,11 @@ impl Readable for ChannelMonitorUpdateStep { /// /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. +/// +/// Pending Events or updated HTLCs which have not yet been read out by +/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and +/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events +/// gotten are fully handled before re-serializing the new state. pub struct ChannelMonitor { latest_update_id: u64, commitment_transaction_number_obscure_factor: u64, @@ -835,6 +831,7 @@ pub struct ChannelMonitor { payment_preimages: HashMap, pending_htlcs_updated: Vec, + pending_events: Vec, destination_script: Script, // Thanks to data loss protection, we may be able to claim our non-htlc funds @@ -948,6 +945,7 @@ impl PartialEq for ChannelMonitor { self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx || self.payment_preimages != other.payment_preimages || self.pending_htlcs_updated != other.pending_htlcs_updated || + self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly self.destination_script != other.destination_script || self.to_remote_rescue != other.to_remote_rescue || self.pending_claim_requests != other.pending_claim_requests || @@ -1135,6 +1133,11 @@ impl ChannelMonitor { data.write(writer)?; } + writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?; + for event in self.pending_events.iter() { + event.write(writer)?; + } + self.last_block_hash.write(writer)?; self.destination_script.write(writer)?; if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue { @@ -1267,6 +1270,7 @@ impl ChannelMonitor { payment_preimages: HashMap::new(), pending_htlcs_updated: Vec::new(), + pending_events: Vec::new(), destination_script: destination_script.clone(), to_remote_rescue: None, @@ -1560,6 +1564,18 @@ impl ChannelMonitor { ret } + /// Gets the list of pending events which were generated by previous actions, clearing the list + /// in the process. + /// + /// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to + /// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do + /// no internal locking in ChannelMonitors. + pub fn get_and_clear_pending_events(&mut self) -> Vec { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_events); + ret + } + /// Can only fail if idx is < get_min_seen_secret pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> { self.commitment_secrets.get_secret(idx) @@ -2534,7 +2550,7 @@ impl ChannelMonitor { /// Eventually this should be pub and, roughly, implement ChainListener, however this requires /// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of /// on-chain. - fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> (Vec<(Sha256dHash, Vec)>, Vec) + fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec)> where B::Target: BroadcasterInterface, F::Target: FeeEstimator { @@ -2767,7 +2783,14 @@ impl ChannelMonitor { for &(ref txid, ref output_scripts) in watch_outputs.iter() { self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect()); } - (watch_outputs, spendable_outputs) + + if spendable_outputs.len() > 0 { + self.pending_events.push(events::Event::SpendableOutputs { + outputs: spendable_outputs, + }); + } + + watch_outputs } fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F) @@ -3369,6 +3392,14 @@ impl> ReadableArgs())); + for _ in 0..pending_events_len { + if let Some(event) = MaybeReadable::read(reader)? { + pending_events.push(event); + } + } + let last_block_hash: Sha256dHash = Readable::read(reader)?; let destination_script = Readable::read(reader)?; let to_remote_rescue = match >::read(reader)? { @@ -3471,6 +3502,7 @@ impl> ReadableArgs(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self { + &Event::FundingGenerationReady { .. } => { + 0u8.write(writer)?; + // We never write out FundingGenerationReady events as, upon disconnection, peers + // drop any channels which have not yet exchanged funding_signed. + }, + &Event::FundingBroadcastSafe { ref funding_txo, ref user_channel_id } => { + 1u8.write(writer)?; + funding_txo.write(writer)?; + user_channel_id.write(writer)?; + }, + &Event::PaymentReceived { ref payment_hash, ref amt } => { + 2u8.write(writer)?; + payment_hash.write(writer)?; + amt.write(writer)?; + }, + &Event::PaymentSent { ref payment_preimage } => { + 3u8.write(writer)?; + payment_preimage.write(writer)?; + }, + &Event::PaymentFailed { ref payment_hash, ref rejected_by_dest, + #[cfg(test)] + ref error_code, + } => { + 4u8.write(writer)?; + payment_hash.write(writer)?; + rejected_by_dest.write(writer)?; + #[cfg(test)] + error_code.write(writer)?; + }, + &Event::PendingHTLCsForwardable { time_forwardable: _ } => { + 5u8.write(writer)?; + // We don't write the time_fordwardable out at all, as we presume when the user + // deserializes us at least that much time has elapsed. + }, + &Event::SpendableOutputs { ref outputs } => { + 6u8.write(writer)?; + (outputs.len() as u64).write(writer)?; + for output in outputs.iter() { + output.write(writer)?; + } + }, + } + Ok(()) + } +} +impl MaybeReadable for Event { + fn read(reader: &mut R) -> Result, msgs::DecodeError> { + match Readable::read(reader)? { + 0u8 => Ok(None), + 1u8 => Ok(Some(Event::FundingBroadcastSafe { + funding_txo: Readable::read(reader)?, + user_channel_id: Readable::read(reader)?, + })), + 2u8 => Ok(Some(Event::PaymentReceived { + payment_hash: Readable::read(reader)?, + amt: Readable::read(reader)?, + })), + 3u8 => Ok(Some(Event::PaymentSent { + payment_preimage: Readable::read(reader)?, + })), + 4u8 => Ok(Some(Event::PaymentFailed { + payment_hash: Readable::read(reader)?, + rejected_by_dest: Readable::read(reader)?, + #[cfg(test)] + error_code: Readable::read(reader)?, + })), + 5u8 => Ok(Some(Event::PendingHTLCsForwardable { + time_forwardable: Duration::from_secs(0) + })), + 6u8 => { + let outputs_len: u64 = Readable::read(reader)?; + let mut outputs = Vec::new(); + for _ in 0..outputs_len { + outputs.push(Readable::read(reader)?); + } + Ok(Some(Event::SpendableOutputs { outputs })) + }, + _ => Err(msgs::DecodeError::InvalidValue) + } + } +} + /// An event generated by ChannelManager which indicates a message should be sent to a peer (or /// broadcast to most peers). /// These events are handled by PeerManager::process_events if you are using a PeerManager. diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 44fdd1c0cf4..96936fe9541 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -11,7 +11,7 @@ use std::cmp; use secp256k1::Signature; use secp256k1::key::{PublicKey, SecretKey}; use bitcoin::blockdata::script::Script; -use bitcoin::blockdata::transaction::{OutPoint, Transaction}; +use bitcoin::blockdata::transaction::{OutPoint, Transaction, TxOut}; use bitcoin::consensus; use bitcoin::consensus::Encodable; use bitcoin_hashes::sha256d::Hash as Sha256dHash; @@ -191,6 +191,15 @@ pub trait ReadableArgs fn read(reader: &mut R, params: P) -> Result; } +/// A trait that various rust-lightning types implement allowing them to (maybe) be read in from a Read +pub trait MaybeReadable + where Self: Sized, + R: Read +{ + /// Reads a Self in from the given Read + fn read(reader: &mut R) -> Result, DecodeError>; +} + pub(crate) struct U48(pub u64); impl Writeable for U48 { #[inline] @@ -627,26 +636,32 @@ impl Readable for OutPoint { } } -impl Writeable for Transaction { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match self.consensus_encode(WriterWriteAdaptor(writer)) { - Ok(_) => Ok(()), - Err(consensus::encode::Error::Io(e)) => Err(e), - Err(_) => panic!("We shouldn't get a consensus::encode::Error unless our Write generated an std::io::Error"), +macro_rules! impl_consensus_ser { + ($bitcoin_type: ty) => { + impl Writeable for $bitcoin_type { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + match self.consensus_encode(WriterWriteAdaptor(writer)) { + Ok(_) => Ok(()), + Err(consensus::encode::Error::Io(e)) => Err(e), + Err(_) => panic!("We shouldn't get a consensus::encode::Error unless our Write generated an std::io::Error"), + } + } } - } -} -impl Readable for Transaction { - fn read(r: &mut R) -> Result { - match consensus::encode::Decodable::consensus_decode(r) { - Ok(t) => Ok(t), - Err(consensus::encode::Error::Io(ref e)) if e.kind() == ::std::io::ErrorKind::UnexpectedEof => Err(DecodeError::ShortRead), - Err(consensus::encode::Error::Io(e)) => Err(DecodeError::Io(e)), - Err(_) => Err(DecodeError::InvalidValue), + impl Readable for $bitcoin_type { + fn read(r: &mut R) -> Result { + match consensus::encode::Decodable::consensus_decode(r) { + Ok(t) => Ok(t), + Err(consensus::encode::Error::Io(ref e)) if e.kind() == ::std::io::ErrorKind::UnexpectedEof => Err(DecodeError::ShortRead), + Err(consensus::encode::Error::Io(e)) => Err(DecodeError::Io(e)), + Err(_) => Err(DecodeError::InvalidValue), + } + } } } } +impl_consensus_ser!(Transaction); +impl_consensus_ser!(TxOut); impl> Readable for Mutex { fn read(r: &mut R) -> Result {