diff --git a/lightning-block-sync/src/convert.rs b/lightning-block-sync/src/convert.rs index e3bf9622050..06e43d61dc0 100644 --- a/lightning-block-sync/src/convert.rs +++ b/lightning-block-sync/src/convert.rs @@ -10,23 +10,26 @@ use bitcoin::Transaction; use serde_json; +use bitcoin::hashes::Hash; use std::convert::From; use std::convert::TryFrom; use std::convert::TryInto; +use std::io; use std::str::FromStr; -use bitcoin::hashes::Hash; impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> Result { Ok(self.0) } + type Error = io::Error; + fn try_into(self) -> Result { + Ok(self.0) + } } -/// Conversion from `std::io::Error` into `BlockSourceError`. -impl From for BlockSourceError { - fn from(e: std::io::Error) -> BlockSourceError { +/// Conversion from `io::Error` into `BlockSourceError`. +impl From for BlockSourceError { + fn from(e: io::Error) -> BlockSourceError { match e.kind() { - std::io::ErrorKind::InvalidData => BlockSourceError::persistent(e), - std::io::ErrorKind::InvalidInput => BlockSourceError::persistent(e), + io::ErrorKind::InvalidData => BlockSourceError::persistent(e), + io::ErrorKind::InvalidInput => BlockSourceError::persistent(e), _ => BlockSourceError::transient(e), } } @@ -34,11 +37,11 @@ impl From for BlockSourceError { /// Parses binary data as a block. impl TryInto for BinaryResponse { - type Error = std::io::Error; + type Error = io::Error; - fn try_into(self) -> std::io::Result { + fn try_into(self) -> io::Result { match encode::deserialize(&self.0) { - Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")), + Err(_) => return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid block data")), Ok(block) => Ok(block), } } @@ -46,34 +49,35 @@ impl TryInto for BinaryResponse { /// Parses binary data as a block hash. impl TryInto for BinaryResponse { - type Error = std::io::Error; + type Error = io::Error; - fn try_into(self) -> std::io::Result { - BlockHash::from_slice(&self.0).map_err(|_| - std::io::Error::new(std::io::ErrorKind::InvalidData, "bad block hash length") - ) + fn try_into(self) -> io::Result { + BlockHash::from_slice(&self.0) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad block hash length")) } } /// Converts a JSON value into block header data. The JSON value may be an object representing a /// block header or an array of such objects. In the latter case, the first object is converted. impl TryInto for JsonResponse { - type Error = std::io::Error; + type Error = io::Error; - fn try_into(self) -> std::io::Result { + fn try_into(self) -> io::Result { let header = match self.0 { - serde_json::Value::Array(mut array) if !array.is_empty() => array.drain(..).next().unwrap(), + serde_json::Value::Array(mut array) if !array.is_empty() => { + array.drain(..).next().unwrap() + }, serde_json::Value::Object(_) => self.0, - _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected JSON type")), + _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected JSON type")), }; if !header.is_object() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); + return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON object")); } // Add an empty previousblockhash for the genesis block. match header.try_into() { - Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header data")), + Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid header data")), Ok(header) => Ok(header), } } @@ -83,23 +87,28 @@ impl TryFrom for BlockHeaderData { type Error = (); fn try_from(response: serde_json::Value) -> Result { - macro_rules! get_field { ($name: expr, $ty_access: tt) => { - response.get($name).ok_or(())?.$ty_access().ok_or(())? - } } + macro_rules! get_field { + ($name: expr, $ty_access: tt) => { + response.get($name).ok_or(())?.$ty_access().ok_or(())? + }; + } Ok(BlockHeaderData { header: Header { version: bitcoin::blockdata::block::Version::from_consensus( - get_field!("version", as_i64).try_into().map_err(|_| ())? + get_field!("version", as_i64).try_into().map_err(|_| ())?, ), prev_blockhash: if let Some(hash_str) = response.get("previousblockhash") { - BlockHash::from_str(hash_str.as_str().ok_or(())?).map_err(|_| ())? - } else { BlockHash::all_zeros() }, - merkle_root: TxMerkleNode::from_str(get_field!("merkleroot", as_str)).map_err(|_| ())?, + BlockHash::from_str(hash_str.as_str().ok_or(())?).map_err(|_| ())? + } else { + BlockHash::all_zeros() + }, + merkle_root: TxMerkleNode::from_str(get_field!("merkleroot", as_str)) + .map_err(|_| ())?, time: get_field!("time", as_u64).try_into().map_err(|_| ())?, - bits: bitcoin::CompactTarget::from_consensus( - u32::from_be_bytes(<[u8; 4]>::from_hex(get_field!("bits", as_str)).map_err(|_| ())?) - ), + bits: bitcoin::CompactTarget::from_consensus(u32::from_be_bytes( + <[u8; 4]>::from_hex(get_field!("bits", as_str)).map_err(|_| ())?, + )), nonce: get_field!("nonce", as_u64).try_into().map_err(|_| ())?, }, chainwork: hex_to_work(get_field!("chainwork", as_str)).map_err(|_| ())?, @@ -110,15 +119,15 @@ impl TryFrom for BlockHeaderData { /// Converts a JSON value into a block. Assumes the block is hex-encoded in a JSON string. impl TryInto for JsonResponse { - type Error = std::io::Error; + type Error = io::Error; - fn try_into(self) -> std::io::Result { + fn try_into(self) -> io::Result { match self.0.as_str() { - None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")), + None => Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")), Some(hex_data) => match Vec::::from_hex(hex_data) { - Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")), + Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")), Ok(block_data) => match encode::deserialize(&block_data) { - Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")), + Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid block data")), Ok(block) => Ok(block), }, }, @@ -128,31 +137,35 @@ impl TryInto for JsonResponse { /// Converts a JSON value into the best block hash and optional height. impl TryInto<(BlockHash, Option)> for JsonResponse { - type Error = std::io::Error; + type Error = io::Error; - fn try_into(self) -> std::io::Result<(BlockHash, Option)> { + fn try_into(self) -> io::Result<(BlockHash, Option)> { if !self.0.is_object() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); + return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON object")); } let hash = match &self.0["bestblockhash"] { serde_json::Value::String(hex_data) => match BlockHash::from_str(&hex_data) { - Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")), + Err(_) => { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")) + }, Ok(block_hash) => block_hash, }, - _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")), + _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")), }; let height = match &self.0["blocks"] { serde_json::Value::Null => None, serde_json::Value::Number(height) => match height.as_u64() { - None => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")), + None => return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid height")), Some(height) => match height.try_into() { - Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")), + Err(_) => { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid height")) + }, Ok(height) => Some(height), - } + }, }, - _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON number")), + _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON number")), }; Ok((hash, height)) @@ -160,18 +173,22 @@ impl TryInto<(BlockHash, Option)> for JsonResponse { } impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - let hex_data = self.0.as_str().ok_or(Self::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string" ))?; - Txid::from_str(hex_data).map_err(|err|Self::Error::new(std::io::ErrorKind::InvalidData, err.to_string() )) + type Error = io::Error; + fn try_into(self) -> io::Result { + let hex_data = self + .0 + .as_str() + .ok_or(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string"))?; + Txid::from_str(hex_data) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string())) } } /// Converts a JSON value into a transaction. WATCH OUT! this cannot be used for zero-input transactions /// (e.g. createrawtransaction). See impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { + type Error = io::Error; + fn try_into(self) -> io::Result { let hex_tx = if self.0.is_object() { // result is json encoded match &self.0["hex"] { @@ -185,45 +202,35 @@ impl TryInto for JsonResponse { _ => "Unknown error", }; - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, + return Err(io::Error::new( + io::ErrorKind::InvalidData, format!("transaction couldn't be signed. {}", reason), )); } else { hex_data } - } + }, // result is a complete transaction (e.g. getrawtranaction verbose) _ => hex_data, }, - _ => return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "expected JSON string", - )), + _ => { + return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")); + }, } } else { // result is plain text (e.g. getrawtransaction no verbose) match self.0.as_str() { Some(hex_tx) => hex_tx, None => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "expected JSON string", - )) - } + return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")); + }, } }; match Vec::::from_hex(hex_tx) { - Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "invalid hex data", - )), + Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")), Ok(tx_data) => match encode::deserialize(&tx_data) { - Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "invalid transaction", - )), + Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid transaction")), Ok(tx) => Ok(tx), }, } @@ -231,15 +238,16 @@ impl TryInto for JsonResponse { } impl TryInto for JsonResponse { - type Error = std::io::Error; + type Error = io::Error; - fn try_into(self) -> std::io::Result { + fn try_into(self) -> io::Result { match self.0.as_str() { - None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")), - Some(hex_data) if hex_data.len() != 64 => - Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hash length")), + None => Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")), + Some(hex_data) if hex_data.len() != 64 => { + Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hash length")) + }, Some(hex_data) => BlockHash::from_str(hex_data) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")), + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")), } } } @@ -249,26 +257,35 @@ impl TryInto for JsonResponse { /// that. #[cfg(feature = "rest-client")] pub(crate) struct GetUtxosResponse { - pub(crate) hit_bitmap_nonempty: bool + pub(crate) hit_bitmap_nonempty: bool, } #[cfg(feature = "rest-client")] impl TryInto for JsonResponse { - type Error = std::io::Error; - - fn try_into(self) -> std::io::Result { - let bitmap_str = - self.0.as_object().ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected an object"))? - .get("bitmap").ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "missing bitmap field"))? - .as_str().ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitmap should be an str"))?; - let mut hit_bitmap_nonempty = false; - for c in bitmap_str.chars() { - if c < '0' || c > '9' { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid byte")); - } - if c > '0' { hit_bitmap_nonempty = true; } + type Error = io::Error; + + fn try_into(self) -> io::Result { + let obj_err = || io::Error::new(io::ErrorKind::InvalidData, "expected an object"); + let bitmap_err = || io::Error::new(io::ErrorKind::InvalidData, "missing bitmap field"); + let bitstr_err = || io::Error::new(io::ErrorKind::InvalidData, "bitmap should be an str"); + let bitmap_str = self + .0 + .as_object() + .ok_or_else(obj_err)? + .get("bitmap") + .ok_or_else(bitmap_err)? + .as_str() + .ok_or_else(bitstr_err)?; + let mut hit_bitmap_nonempty = false; + for c in bitmap_str.chars() { + if c < '0' || c > '9' { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid byte")); } - Ok(GetUtxosResponse { hit_bitmap_nonempty }) + if c > '0' { + hit_bitmap_nonempty = true; + } + } + Ok(GetUtxosResponse { hit_bitmap_nonempty }) } } @@ -304,7 +321,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!(42)); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "unexpected JSON type"); }, Ok(_) => panic!("Expected error"), @@ -316,7 +333,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!([42])); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); }, Ok(_) => panic!("Expected error"), @@ -326,16 +343,15 @@ pub(crate) mod tests { #[test] fn into_block_header_from_json_response_with_invalid_header_response() { let block = genesis_block(Network::Bitcoin); - let mut response = JsonResponse(BlockHeaderData { - chainwork: block.header.work(), - height: 0, - header: block.header - }.into()); + let mut response = JsonResponse( + BlockHeaderData { chainwork: block.header.work(), height: 0, header: block.header } + .into(), + ); response.0["chainwork"].take(); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data"); }, Ok(_) => panic!("Expected error"), @@ -345,16 +361,15 @@ pub(crate) mod tests { #[test] fn into_block_header_from_json_response_with_invalid_header_data() { let block = genesis_block(Network::Bitcoin); - let mut response = JsonResponse(BlockHeaderData { - chainwork: block.header.work(), - height: 0, - header: block.header - }.into()); + let mut response = JsonResponse( + BlockHeaderData { chainwork: block.header.work(), height: 0, header: block.header } + .into(), + ); response.0["chainwork"] = serde_json::json!("foobar"); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data"); }, Ok(_) => panic!("Expected error"), @@ -364,11 +379,10 @@ pub(crate) mod tests { #[test] fn into_block_header_from_json_response_with_valid_header() { let block = genesis_block(Network::Bitcoin); - let response = JsonResponse(BlockHeaderData { - chainwork: block.header.work(), - height: 0, - header: block.header - }.into()); + let response = JsonResponse( + BlockHeaderData { chainwork: block.header.work(), height: 0, header: block.header } + .into(), + ); match TryInto::::try_into(response) { Err(e) => panic!("Unexpected error: {:?}", e), @@ -383,18 +397,20 @@ pub(crate) mod tests { #[test] fn into_block_header_from_json_response_with_valid_header_array() { let genesis_block = genesis_block(Network::Bitcoin); - let best_block_header = Header { - prev_blockhash: genesis_block.block_hash(), - ..genesis_block.header - }; + let best_block_header = + Header { prev_blockhash: genesis_block.block_hash(), ..genesis_block.header }; let chainwork = genesis_block.header.work() + best_block_header.work(); let response = JsonResponse(serde_json::json!([ - serde_json::Value::from(BlockHeaderData { - chainwork, height: 1, header: best_block_header, - }), - serde_json::Value::from(BlockHeaderData { - chainwork: genesis_block.header.work(), height: 0, header: genesis_block.header, - }), + serde_json::Value::from(BlockHeaderData { + chainwork, + height: 1, + header: best_block_header, + }), + serde_json::Value::from(BlockHeaderData { + chainwork: genesis_block.header.work(), + height: 0, + header: genesis_block.header, + }), ])); match TryInto::::try_into(response) { @@ -410,11 +426,10 @@ pub(crate) mod tests { #[test] fn into_block_header_from_json_response_without_previous_block_hash() { let block = genesis_block(Network::Bitcoin); - let mut response = JsonResponse(BlockHeaderData { - chainwork: block.header.work(), - height: 0, - header: block.header - }.into()); + let mut response = JsonResponse( + BlockHeaderData { chainwork: block.header.work(), height: 0, header: block.header } + .into(), + ); response.0.as_object_mut().unwrap().remove("previousblockhash"); match TryInto::::try_into(response) { @@ -449,7 +464,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "result": "foo" })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); }, Ok(_) => panic!("Expected error"), @@ -461,7 +476,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foobar")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); }, Ok(_) => panic!("Expected error"), @@ -473,7 +488,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("abcd")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid block data"); }, Ok(_) => panic!("Expected error"), @@ -495,7 +510,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foo")); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); }, Ok(_) => panic!("Expected error"), @@ -507,7 +522,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "bestblockhash": 42 })); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); }, Ok(_) => panic!("Expected error"), @@ -519,7 +534,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "bestblockhash": "foobar"} )); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); }, Ok(_) => panic!("Expected error"), @@ -550,7 +565,7 @@ pub(crate) mod tests { })); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON number"); }, Ok(_) => panic!("Expected error"), @@ -566,7 +581,7 @@ pub(crate) mod tests { })); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid height"); }, Ok(_) => panic!("Expected error"), @@ -594,9 +609,9 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "result": "foo" })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); - } + }, Ok(_) => panic!("Expected error"), } } @@ -606,9 +621,12 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foobar")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "bad hex string length 6 (expected 64)"); - } + assert_eq!(e.kind(), io::ErrorKind::InvalidData); + assert_eq!( + e.get_ref().unwrap().to_string(), + "bad hex string length 6 (expected 64)" + ); + }, Ok(_) => panic!("Expected error"), } } @@ -618,9 +636,12 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("abcd")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "bad hex string length 4 (expected 64)"); - } + assert_eq!(e.kind(), io::ErrorKind::InvalidData); + assert_eq!( + e.get_ref().unwrap().to_string(), + "bad hex string length 4 (expected 64)" + ); + }, Ok(_) => panic!("Expected error"), } } @@ -638,15 +659,15 @@ pub(crate) mod tests { #[test] fn into_txid_from_bitcoind_rpc_json_response() { let mut rpc_response = serde_json::json!( - {"error": "", "id": "770", "result": "7934f775149929a8b742487129a7c3a535dfb612f0b726cc67bc10bc2628f906"} + {"error": "", "id": "770", "result": "7934f775149929a8b742487129a7c3a535dfb612f0b726cc67bc10bc2628f906"} - ); - let r: std::io::Result = JsonResponse(rpc_response.get_mut("result").unwrap().take()) - .try_into(); - assert_eq!( - r.unwrap().to_string(), - "7934f775149929a8b742487129a7c3a535dfb612f0b726cc67bc10bc2628f906" - ); + ); + let r: io::Result = + JsonResponse(rpc_response.get_mut("result").unwrap().take()).try_into(); + assert_eq!( + r.unwrap().to_string(), + "7934f775149929a8b742487129a7c3a535dfb612f0b726cc67bc10bc2628f906" + ); } // TryInto can be used in two ways, first with plain hex response where data is @@ -661,9 +682,9 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foobar")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); - } + }, Ok(_) => panic!("Expected error"), } } @@ -673,9 +694,9 @@ pub(crate) mod tests { let response = JsonResponse(Value::Number(Number::from_f64(1.0).unwrap())); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); - } + }, Ok(_) => panic!("Expected error"), } } @@ -685,9 +706,9 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("abcd")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.kind(), io::ErrorKind::InvalidData); assert_eq!(e.get_ref().unwrap().to_string(), "invalid transaction"); - } + }, Ok(_) => panic!("Expected error"), } } @@ -707,7 +728,8 @@ pub(crate) mod tests { fn into_tx_from_json_response_with_valid_tx_data_hex_field() { let genesis_block = genesis_block(Network::Bitcoin); let target_tx = genesis_block.txdata.get(0).unwrap(); - let response = JsonResponse(serde_json::json!({"hex": encode::serialize_hex(&target_tx)})); + let response = + JsonResponse(serde_json::json!({ "hex": encode::serialize_hex(&target_tx) })); match TryInto::::try_into(response) { Err(e) => panic!("Unexpected error: {:?}", e), Ok(tx) => assert_eq!(&tx, target_tx), @@ -721,12 +743,9 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "error": "foo" })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!( - e.get_ref().unwrap().to_string(), - "expected JSON string" - ); - } + assert_eq!(e.kind(), io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + }, Ok(_) => panic!("Expected error"), } } @@ -736,12 +755,13 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "hex": "foo", "complete": false })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert!( - e.get_ref().unwrap().to_string().contains( - "transaction couldn't be signed") - ); - } + assert_eq!(e.kind(), io::ErrorKind::InvalidData); + assert!(e + .get_ref() + .unwrap() + .to_string() + .contains("transaction couldn't be signed")); + }, Ok(_) => panic!("Expected error"), } } diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 9cd4049679c..657a413fd06 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -6,21 +6,21 @@ use crate::{AsyncBlockSourceResult, BlockData, BlockSource, BlockSourceError}; use bitcoin::blockdata::block::Block; use bitcoin::blockdata::constants::ChainHash; -use bitcoin::blockdata::transaction::{TxOut, OutPoint}; +use bitcoin::blockdata::transaction::{OutPoint, TxOut}; use bitcoin::hash_types::BlockHash; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; -use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; use lightning::util::logger::Logger; -use std::sync::{Arc, Mutex}; use std::collections::VecDeque; use std::future::Future; use std::ops::Deref; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use std::task::Poll; /// A trait which extends [`BlockSource`] and can be queried to fetch the block at a given height @@ -29,12 +29,14 @@ use std::task::Poll; /// Note that while this is implementable for a [`BlockSource`] which returns filtered block data /// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an /// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced. -pub trait UtxoSource : BlockSource + 'static { +pub trait UtxoSource: BlockSource + 'static { /// Fetches the block hash of the block at the given height. /// /// This will, in turn, be passed to to [`BlockSource::get_block`] to fetch the block needed /// for gossip validation. - fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash>; + fn get_block_hash_by_height<'a>( + &'a self, block_height: u32, + ) -> AsyncBlockSourceResult<'a, BlockHash>; /// Returns true if the given output has *not* been spent, i.e. is a member of the current UTXO /// set. @@ -45,7 +47,7 @@ pub trait UtxoSource : BlockSource + 'static { /// /// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which /// delegates to `tokio::spawn()`. -pub trait FutureSpawner : Send + Sync + 'static { +pub trait FutureSpawner: Send + Sync + 'static { /// Spawns the given future as a background task. /// /// This method MUST NOT block on the given future immediately. @@ -65,8 +67,8 @@ impl FutureSpawner for TokioSpawner { /// A trivial future which joins two other futures and polls them at the same time, returning only /// once both complete. pub(crate) struct Joiner< - A: Future), BlockSourceError>> + Unpin, - B: Future> + Unpin, + A: Future), BlockSourceError>> + Unpin, + B: Future> + Unpin, > { pub a: A, pub b: B, @@ -75,16 +77,20 @@ pub(crate) struct Joiner< } impl< - A: Future), BlockSourceError>> + Unpin, - B: Future> + Unpin, -> Joiner { - fn new(a: A, b: B) -> Self { Self { a, b, a_res: None, b_res: None } } + A: Future), BlockSourceError>> + Unpin, + B: Future> + Unpin, + > Joiner +{ + fn new(a: A, b: B) -> Self { + Self { a, b, a_res: None, b_res: None } + } } impl< - A: Future), BlockSourceError>> + Unpin, - B: Future> + Unpin, -> Future for Joiner { + A: Future), BlockSourceError>> + Unpin, + B: Future> + Unpin, + > Future for Joiner +{ type Output = Result<((BlockHash, Option), BlockHash), BlockSourceError>; fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { if self.a_res.is_none() { @@ -107,14 +113,13 @@ impl< } else { return Poll::Ready(Err(res.unwrap_err())); } - }, Poll::Pending => {}, } } if let Some(b_res) = self.b_res { if let Some(a_res) = self.a_res { - return Poll::Ready(Ok((a_res, b_res))) + return Poll::Ready(Ok((a_res, b_res))); } } Poll::Pending @@ -129,7 +134,8 @@ impl< /// value of 1024 should more than suffice), and ensure you have sufficient file descriptors /// available on both Bitcoin Core and your LDK application for each request to hold its own /// connection. -pub struct GossipVerifier where @@ -145,10 +151,9 @@ pub struct GossipVerifier GossipVerifier where +impl + GossipVerifier +where Blocks::Target: UtxoSource, L::Target: Logger, { @@ -157,17 +162,24 @@ impl( - source: Blocks, spawn: S, gossiper: Arc>, Self, L>>, peer_manager: APM - ) -> Self where APM::Target: APeerManager { + source: Blocks, spawn: S, gossiper: Arc>, Self, L>>, + peer_manager: APM, + ) -> Self + where + APM::Target: APeerManager, + { let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events()); Self { - source, spawn, gossiper, peer_manager_wake, + source, + spawn, + gossiper, + peer_manager_wake, block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))), } } async fn retrieve_utxo( - source: Blocks, block_cache: Arc>>, short_channel_id: u64 + source: Blocks, block_cache: Arc>>, short_channel_id: u64, ) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; @@ -175,9 +187,9 @@ impl { { + ($block: expr) => {{ if transaction_index as usize >= $block.txdata.len() { return Err(UtxoLookupError::UnknownTx); } @@ -188,7 +200,7 @@ impl return Err(UtxoLookupError::UnknownTx), BlockData::FullBlock(block) => block, @@ -237,7 +249,7 @@ impl Deref for GossipVerifier where +impl Deref + for GossipVerifier +where Blocks::Target: UtxoSource, L::Target: Logger, { type Target = Self; - fn deref(&self) -> &Self { self } + fn deref(&self) -> &Self { + self + } } - -impl UtxoLookup for GossipVerifier where +impl UtxoLookup + for GossipVerifier +where Blocks::Target: UtxoSource, L::Target: Logger, { diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs index aa0d840adb0..c230b253f65 100644 --- a/lightning-block-sync/src/http.rs +++ b/lightning-block-sync/src/http.rs @@ -50,11 +50,7 @@ pub struct HttpEndpoint { impl HttpEndpoint { /// Creates an endpoint for the given host and default HTTP port. pub fn for_host(host: String) -> Self { - Self { - host, - port: None, - path: String::from("/"), - } + Self { host, port: None, path: String::from("/") } } /// Specifies a port to use with the endpoint. @@ -107,7 +103,10 @@ impl HttpClient { pub fn connect(endpoint: E) -> std::io::Result { let address = match endpoint.to_socket_addrs()?.next() { None => { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + )); }, Some(address) => address, }; @@ -129,12 +128,16 @@ impl HttpClient { /// Returns the response body in `F` format. #[allow(dead_code)] pub async fn get(&mut self, uri: &str, host: &str) -> std::io::Result - where F: TryFrom, Error = std::io::Error> { + where + F: TryFrom, Error = std::io::Error>, + { let request = format!( "GET {} HTTP/1.1\r\n\ Host: {}\r\n\ Connection: keep-alive\r\n\ - \r\n", uri, host); + \r\n", + uri, host + ); let response_body = self.send_request_with_retry(&request).await?; F::try_from(response_body) } @@ -145,8 +148,12 @@ impl HttpClient { /// The request body consists of the provided JSON `content`. Returns the response body in `F` /// format. #[allow(dead_code)] - pub async fn post(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result - where F: TryFrom, Error = std::io::Error> { + pub async fn post( + &mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value, + ) -> std::io::Result + where + F: TryFrom, Error = std::io::Error>, + { let content = content.to_string(); let request = format!( "POST {} HTTP/1.1\r\n\ @@ -156,7 +163,13 @@ impl HttpClient { Content-Type: application/json\r\n\ Content-Length: {}\r\n\ \r\n\ - {}", uri, host, auth, content.len(), content); + {}", + uri, + host, + auth, + content.len(), + content + ); let response_body = self.send_request_with_retry(&request).await?; F::try_from(response_body) } @@ -218,8 +231,10 @@ impl HttpClient { let mut reader = std::io::BufReader::new(limited_stream); macro_rules! read_line { - () => { read_line!(0) }; - ($retry_count: expr) => { { + () => { + read_line!(0) + }; + ($retry_count: expr) => {{ let mut line = String::new(); let mut timeout_count: u64 = 0; let bytes_read = loop { @@ -236,7 +251,7 @@ impl HttpClient { } else { continue; } - } + }, Err(e) => return Err(e), } }; @@ -245,17 +260,23 @@ impl HttpClient { 0 => None, _ => { // Remove trailing CRLF - if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } } + if line.ends_with('\n') { + line.pop(); + if line.ends_with('\r') { + line.pop(); + } + } Some(line) }, } - } } + }}; } // Read and parse status line // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT. - let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs()) - .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; + let status_line = + read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs()) + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; let status = HttpStatus::parse(&status_line)?; // Read and parse relevant headers @@ -263,11 +284,15 @@ impl HttpClient { loop { let line = read_line!() .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?; - if line.is_empty() { break; } + if line.is_empty() { + break; + } let header = HttpHeader::parse(&line)?; if header.has_name("Content-Length") { - let length = header.value.parse() + let length = header + .value + .parse() .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; if let HttpMessageLength::Empty = message_length { message_length = HttpMessageLength::ContentLength(length); @@ -285,10 +310,13 @@ impl HttpClient { let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len(); reader.get_mut().set_limit(read_limit as u64); let contents = match message_length { - HttpMessageLength::Empty => { Vec::new() }, + HttpMessageLength::Empty => Vec::new(), HttpMessageLength::ContentLength(length) => { if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("invalid response length: {} bytes", length))); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid response length: {} bytes", length), + )); } else { let mut content = vec![0; length]; #[cfg(feature = "tokio")] @@ -301,7 +329,9 @@ impl HttpClient { HttpMessageLength::TransferEncoding(coding) => { if !coding.eq_ignore_ascii_case("chunked") { return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, "unsupported transfer coding")) + std::io::ErrorKind::InvalidInput, + "unsupported transfer coding", + )); } else { let mut content = Vec::new(); #[cfg(feature = "tokio")] @@ -323,7 +353,8 @@ impl HttpClient { // Decode the chunk header to obtain the chunk size. let mut buffer = Vec::new(); - let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes()); + let mut decoder = + chunked_transfer::Decoder::new(chunk_header.as_bytes()); decoder.read_to_end(&mut buffer)?; // Read the chunk body. @@ -350,10 +381,7 @@ impl HttpClient { if !status.is_ok() { // TODO: Handle 3xx redirection responses. - let error = HttpError { - status_code: status.code.to_string(), - contents, - }; + let error = HttpError { status_code: status.code.to_string(), contents }; return Err(std::io::Error::new(std::io::ErrorKind::Other, error)); } @@ -391,20 +419,30 @@ impl<'a> HttpStatus<'a> { fn parse(line: &'a String) -> std::io::Result> { let mut tokens = line.splitn(3, ' '); - let http_version = tokens.next() + let http_version = tokens + .next() .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?; - if !http_version.eq_ignore_ascii_case("HTTP/1.1") && - !http_version.eq_ignore_ascii_case("HTTP/1.0") { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version")); + if !http_version.eq_ignore_ascii_case("HTTP/1.1") + && !http_version.eq_ignore_ascii_case("HTTP/1.0") + { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid HTTP-Version", + )); } - let code = tokens.next() + let code = tokens + .next() .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?; if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid Status-Code", + )); } - let _reason = tokens.next() + let _reason = tokens + .next() .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?; Ok(Self { code }) @@ -430,9 +468,11 @@ impl<'a> HttpHeader<'a> { /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2 fn parse(line: &'a String) -> std::io::Result> { let mut tokens = line.splitn(2, ':'); - let name = tokens.next() + let name = tokens + .next() .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?; - let value = tokens.next() + let value = tokens + .next() .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))? .trim_start(); Ok(Self { name, value }) @@ -524,7 +564,7 @@ mod endpoint_tests { assert_eq!(addr, std_addrs.next().unwrap()); } assert!(std_addrs.next().is_none()); - } + }, } } } @@ -559,7 +599,11 @@ pub(crate) mod client_tests { "{}\r\n\ Content-Length: {}\r\n\ \r\n\ - {}", status, body.len(), body) + {}", + status, + body.len(), + body + ) }, MessageBody::ChunkedContent(body) => { let mut chuncked_body = Vec::new(); @@ -572,7 +616,10 @@ pub(crate) mod client_tests { "{}\r\n\ Transfer-Encoding: chunked\r\n\ \r\n\ - {}", status, String::from_utf8(chuncked_body).unwrap()) + {}", + status, + String::from_utf8(chuncked_body).unwrap() + ) }, }; HttpServer::responding_with(response) @@ -606,14 +653,20 @@ pub(crate) mod client_tests { .lines() .take_while(|line| !line.as_ref().unwrap().is_empty()) .count(); - if lines_read == 0 { continue; } + if lines_read == 0 { + continue; + } for chunk in response.as_bytes().chunks(16) { if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) { return; } else { - if let Err(_) = stream.write(chunk) { break; } - if let Err(_) = stream.flush() { break; } + if let Err(_) = stream.write(chunk) { + break; + } + if let Err(_) = stream.flush() { + break; + } } } } @@ -636,8 +689,12 @@ pub(crate) mod client_tests { fn connect_to_unresolvable_host() { match HttpClient::connect(("example.invalid", 80)) { Err(e) => { - assert!(e.to_string().contains("failed to lookup address information") || - e.to_string().contains("No such host"), "{:?}", e); + assert!( + e.to_string().contains("failed to lookup address information") + || e.to_string().contains("No such host"), + "{:?}", + e + ); }, Ok(_) => panic!("Expected error"), } @@ -705,7 +762,9 @@ pub(crate) mod client_tests { let response = format!( "HTTP/1.1 302 Found\r\n\ Location: {}\r\n\ - \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE)); + \r\n", + "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE) + ); let server = HttpServer::responding_with(response); let mut client = HttpClient::connect(&server.endpoint()).unwrap(); @@ -727,7 +786,10 @@ pub(crate) mod client_tests { match client.get::("/foo", "foo.com").await { Err(e) => { assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid response length: 8032001 bytes"); + assert_eq!( + e.get_ref().unwrap().to_string(), + "invalid response length: 8032001 bytes" + ); }, Ok(_) => panic!("Expected error"), } @@ -740,7 +802,8 @@ pub(crate) mod client_tests { "HTTP/1.1 200 OK\r\n\ Transfer-Encoding: gzip\r\n\ \r\n\ - foobar"); + foobar", + ); let server = HttpServer::responding_with(response); let mut client = HttpClient::connect(&server.endpoint()).unwrap(); diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 38c21f2f9cb..4e893e11cf6 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -1,8 +1,8 @@ //! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects //! from disk. -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; +use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; use bitcoin::blockdata::block::Header; use bitcoin::hash_types::BlockHash; @@ -18,12 +18,14 @@ use std::ops::Deref; /// start when there are no chain listeners to sync yet. /// /// [`SpvClient`]: crate::SpvClient -pub async fn validate_best_block_header(block_source: B) -> -BlockSourceResult where B::Target: BlockSource { +pub async fn validate_best_block_header( + block_source: B, +) -> BlockSourceResult +where + B::Target: BlockSource, +{ let (best_block_hash, best_block_height) = block_source.get_best_block().await?; - block_source - .get_header(&best_block_hash, best_block_height).await? - .validate(best_block_hash) + block_source.get_header(&best_block_hash, best_block_height).await?.validate(best_block_hash) } /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each @@ -131,12 +133,17 @@ BlockSourceResult where B::Target: BlockSource { /// [`SpvClient`]: crate::SpvClient /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -pub async fn synchronize_listeners( - block_source: B, - network: Network, - header_cache: &mut C, +pub async fn synchronize_listeners< + B: Deref + Sized + Send + Sync, + C: Cache, + L: chain::Listen + ?Sized, +>( + block_source: B, network: Network, header_cache: &mut C, mut chain_listeners: Vec<(BlockHash, &L)>, -) -> BlockSourceResult where B::Target: BlockSource { +) -> BlockSourceResult +where + B::Target: BlockSource, +{ let best_header = validate_best_block_header(&*block_source).await?; // Fetch the header for the block hash paired with each listener. @@ -144,9 +151,9 @@ pub async fn synchronize_listeners *header, - None => block_source - .get_header(&old_block_hash, None).await? - .validate(old_block_hash)? + None => { + block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? + }, }; chain_listeners_with_old_headers.push((old_header, chain_listener)) } @@ -180,8 +187,10 @@ pub async fn synchronize_listeners Cache for ReadOnlyCache<'a, C> { struct DynamicChainListener<'a, L: chain::Listen + ?Sized>(&'a L); impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L> { - fn filtered_block_connected(&self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32) { + fn filtered_block_connected( + &self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32, + ) { unreachable!() } @@ -234,7 +245,9 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { } } - fn filtered_block_connected(&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32) { + fn filtered_block_connected( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { for (starting_height, chain_listener) in self.0.iter() { if height > *starting_height { chain_listener.filtered_block_connected(header, txdata, height); @@ -249,8 +262,8 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { #[cfg(test)] mod tests { - use crate::test_utils::{Blockchain, MockChainListener}; use super::*; + use crate::test_utils::{Blockchain, MockChainListener}; #[tokio::test] async fn sync_from_same_chain() { @@ -263,8 +276,7 @@ mod tests { let listener_2 = MockChainListener::new() .expect_block_connected(*chain.at_height(3)) .expect_block_connected(*chain.at_height(4)); - let listener_3 = MockChainListener::new() - .expect_block_connected(*chain.at_height(4)); + let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4)); let listeners = vec![ (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen), diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index c54e0544548..2e3178ffc12 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -15,10 +15,8 @@ #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] - #![deny(missing_docs)] #![deny(unsafe_code)] - #![cfg_attr(docsrs, feature(doc_auto_cfg))] #[cfg(any(feature = "rest-client", feature = "rpc-client"))] @@ -58,18 +56,21 @@ use std::ops::Deref; use std::pin::Pin; /// Abstract type for retrieving block headers and data. -pub trait BlockSource : Sync + Send { +pub trait BlockSource: Sync + Send { /// Returns the header for a given hash. A height hint may be provided in case a block source /// cannot easily find headers based on a hash. This is merely a hint and thus the returned /// header must have the same hash as was requested. Otherwise, an error must be returned. /// /// Implementations that cannot find headers based on the hash should return a `Transient` error /// when `height_hint` is `None`. - fn get_header<'a>(&'a self, header_hash: &'a BlockHash, height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData>; + fn get_header<'a>( + &'a self, header_hash: &'a BlockHash, height_hint: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData>; /// Returns the block for a given hash. A headers-only block source should return a `Transient` /// error. - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData>; + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) + -> AsyncBlockSourceResult<'a, BlockData>; /// Returns the hash of the best block and, optionally, its height. /// @@ -86,7 +87,8 @@ pub type BlockSourceResult = Result; // TODO: Replace with BlockSourceResult once `async` trait functions are supported. For details, // see: https://areweasyncyet.rs. /// Result type for asynchronous `BlockSource` requests. -pub type AsyncBlockSourceResult<'a, T> = Pin> + 'a + Send>>; +pub type AsyncBlockSourceResult<'a, T> = + Pin> + 'a + Send>>; /// Error type for `BlockSource` requests. /// @@ -111,20 +113,18 @@ pub enum BlockSourceErrorKind { impl BlockSourceError { /// Creates a new persistent error originated from the given error. pub fn persistent(error: E) -> Self - where E: Into> { - Self { - kind: BlockSourceErrorKind::Persistent, - error: error.into(), - } + where + E: Into>, + { + Self { kind: BlockSourceErrorKind::Persistent, error: error.into() } } /// Creates a new transient error originated from the given error. pub fn transient(error: E) -> Self - where E: Into> { - Self { - kind: BlockSourceErrorKind::Transient, - error: error.into(), - } + where + E: Into>, + { + Self { kind: BlockSourceErrorKind::Transient, error: error.into() } } /// Returns the kind of error. @@ -180,7 +180,9 @@ pub enum BlockData { /// Hence, there is a trade-off between a lower memory footprint and potentially increased network /// I/O as headers are re-fetched during fork detection. pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> -where L::Target: chain::Listen { +where + L::Target: chain::Listen, +{ chain_tip: ValidatedBlockHeader, chain_poller: P, chain_notifier: ChainNotifier<'a, C, L>, @@ -226,7 +228,10 @@ impl Cache for UnboundedCache { } } -impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: chain::Listen { +impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> +where + L::Target: chain::Listen, +{ /// Creates a new SPV client using `chain_tip` as the best known chain tip. /// /// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain @@ -238,9 +243,7 @@ impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: ch /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, - chain_poller: P, - header_cache: &'a mut C, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -273,8 +276,10 @@ impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: ch /// Updates the chain tip, syncing the chain listener with any connected or disconnected /// blocks. Returns whether there were any such blocks. async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool { - match self.chain_notifier.synchronize_listener( - best_chain_tip, &self.chain_tip, &mut self.chain_poller).await + match self + .chain_notifier + .synchronize_listener(best_chain_tip, &self.chain_tip, &mut self.chain_poller) + .await { Ok(_) => { self.chain_tip = best_chain_tip; @@ -292,7 +297,10 @@ impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: ch /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: lightning::chain::Listen -pub struct ChainNotifier<'a, C: Cache, L: Deref> where L::Target: chain::Listen { +pub struct ChainNotifier<'a, C: Cache, L: Deref> +where + L::Target: chain::Listen, +{ /// Cache for looking up headers before fetching from a block source. header_cache: &'a mut C, @@ -318,7 +326,10 @@ struct ChainDifference { connected_blocks: Vec, } -impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Listen { +impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> +where + L::Target: chain::Listen, +{ /// Finds the first common ancestor between `new_header` and `old_header`, disconnecting blocks /// from `old_header` to get to that point and then connecting blocks until `new_header`. /// @@ -327,19 +338,16 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis /// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header /// if and only if the transition from `old_header` to `new_header` is valid. async fn synchronize_listener( - &mut self, - new_header: ValidatedBlockHeader, - old_header: &ValidatedBlockHeader, + &mut self, new_header: ValidatedBlockHeader, old_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { - let difference = self.find_difference(new_header, old_header, chain_poller).await + let difference = self + .find_difference(new_header, old_header, chain_poller) + .await .map_err(|e| (e, None))?; self.disconnect_blocks(difference.disconnected_blocks); - self.connect_blocks( - difference.common_ancestor, - difference.connected_blocks, - chain_poller, - ).await + self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller) + .await } /// Returns the changes needed to produce the chain with `current_header` as its tip from the @@ -347,9 +355,7 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis /// /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. async fn find_difference( - &self, - current_header: ValidatedBlockHeader, - prev_header: &ValidatedBlockHeader, + &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { let mut disconnected_blocks = Vec::new(); @@ -383,9 +389,7 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis /// Returns the previous header for the given header, either by looking it up in the cache or /// fetching it if not found. async fn look_up_previous_header( - &self, - chain_poller: &mut P, - header: &ValidatedBlockHeader, + &self, chain_poller: &mut P, header: &ValidatedBlockHeader, ) -> BlockSourceResult { match self.header_cache.look_up(&header.header.prev_blockhash) { Some(prev_header) => Ok(*prev_header), @@ -405,16 +409,13 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis /// Notifies the chain listeners of connected blocks. async fn connect_blocks( - &mut self, - mut new_tip: ValidatedBlockHeader, - mut connected_blocks: Vec, - chain_poller: &mut P, + &mut self, mut new_tip: ValidatedBlockHeader, + mut connected_blocks: Vec, chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { for header in connected_blocks.drain(..).rev() { let height = header.height; - let block_data = chain_poller - .fetch_block(&header).await - .map_err(|e| (e, Some(new_tip)))?; + let block_data = + chain_poller.fetch_block(&header).await.map_err(|e| (e, Some(new_tip)))?; debug_assert_eq!(block_data.block_hash, header.block_hash); match block_data.deref() { @@ -436,8 +437,8 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis #[cfg(test)] mod spv_client_tests { - use crate::test_utils::{Blockchain, NullChainListener}; use super::*; + use crate::test_utils::{Blockchain, NullChainListener}; use bitcoin::network::Network; @@ -563,8 +564,8 @@ mod spv_client_tests { #[cfg(test)] mod chain_notifier_tests { - use crate::test_utils::{Blockchain, MockChainListener}; use super::*; + use crate::test_utils::{Blockchain, MockChainListener}; use bitcoin::network::Network; @@ -577,10 +578,8 @@ mod chain_notifier_tests { let chain_listener = &MockChainListener::new() .expect_block_connected(*chain.at_height(2)) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { - header_cache: &mut chain.header_cache(0..=1), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut chain.header_cache(0..=1), chain_listener }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), @@ -596,10 +595,8 @@ mod chain_notifier_tests { let new_tip = test_chain.tip(); let old_tip = main_chain.tip(); let chain_listener = &MockChainListener::new(); - let mut notifier = ChainNotifier { - header_cache: &mut main_chain.header_cache(0..=1), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut main_chain.header_cache(0..=1), chain_listener }; let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => { @@ -620,10 +617,8 @@ mod chain_notifier_tests { let chain_listener = &MockChainListener::new() .expect_block_disconnected(*old_tip) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { - header_cache: &mut main_chain.header_cache(0..=2), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut main_chain.header_cache(0..=2), chain_listener }; let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), @@ -643,10 +638,8 @@ mod chain_notifier_tests { .expect_block_disconnected(*old_tip) .expect_block_disconnected(*main_chain.at_height(2)) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { - header_cache: &mut main_chain.header_cache(0..=3), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut main_chain.header_cache(0..=3), chain_listener }; let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), @@ -666,10 +659,8 @@ mod chain_notifier_tests { .expect_block_disconnected(*old_tip) .expect_block_connected(*fork_chain.at_height(2)) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { - header_cache: &mut main_chain.header_cache(0..=2), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut main_chain.header_cache(0..=2), chain_listener }; let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), @@ -684,10 +675,8 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); let chain_listener = &MockChainListener::new(); - let mut notifier = ChainNotifier { - header_cache: &mut chain.header_cache(0..=1), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut chain.header_cache(0..=1), chain_listener }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((_, tip)) => assert_eq!(tip, None), @@ -702,10 +691,8 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); let chain_listener = &MockChainListener::new(); - let mut notifier = ChainNotifier { - header_cache: &mut chain.header_cache(0..=3), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut chain.header_cache(0..=3), chain_listener }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((_, tip)) => assert_eq!(tip, Some(old_tip)), @@ -719,12 +706,9 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); - let chain_listener = &MockChainListener::new() - .expect_block_connected(*chain.at_height(2)); - let mut notifier = ChainNotifier { - header_cache: &mut chain.header_cache(0..=3), - chain_listener, - }; + let chain_listener = &MockChainListener::new().expect_block_connected(*chain.at_height(2)); + let mut notifier = + ChainNotifier { header_cache: &mut chain.header_cache(0..=3), chain_listener }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))), @@ -741,15 +725,12 @@ mod chain_notifier_tests { let chain_listener = &MockChainListener::new() .expect_filtered_block_connected(*chain.at_height(2)) .expect_filtered_block_connected(*new_tip); - let mut notifier = ChainNotifier { - header_cache: &mut chain.header_cache(0..=1), - chain_listener, - }; + let mut notifier = + ChainNotifier { header_cache: &mut chain.header_cache(0..=1), chain_listener }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), Ok(_) => {}, } } - } diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 7f0f74ce5ce..3940b71f760 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -1,6 +1,9 @@ //! Adapters that make one or more [`BlockSource`]s simpler to poll for new chain tip transitions. -use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult}; +use crate::{ + AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, + BlockSourceResult, +}; use bitcoin::hash_types::BlockHash; use bitcoin::network::Network; @@ -17,16 +20,19 @@ use std::ops::Deref; /// [`ChainPoller`]: ../struct.ChainPoller.html pub trait Poll { /// Returns a chain tip in terms of its relationship to the provided chain tip. - fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) -> - AsyncBlockSourceResult<'a, ChainTip>; + fn poll_chain_tip<'a>( + &'a self, best_known_chain_tip: ValidatedBlockHeader, + ) -> AsyncBlockSourceResult<'a, ChainTip>; /// Returns the header that preceded the given header in the chain. - fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) -> - AsyncBlockSourceResult<'a, ValidatedBlockHeader>; + fn look_up_previous_header<'a>( + &'a self, header: &'a ValidatedBlockHeader, + ) -> AsyncBlockSourceResult<'a, ValidatedBlockHeader>; /// Returns the block associated with the given header. - fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) -> - AsyncBlockSourceResult<'a, ValidatedBlock>; + fn fetch_block<'a>( + &'a self, header: &'a ValidatedBlockHeader, + ) -> AsyncBlockSourceResult<'a, ValidatedBlock>; } /// A chain tip relative to another chain tip in terms of block hash and chainwork. @@ -59,9 +65,8 @@ impl Validate for BlockHeaderData { type T = ValidatedBlockHeader; fn validate(self, block_hash: BlockHash) -> BlockSourceResult { - let pow_valid_block_hash = self.header - .validate_pow(self.header.target()) - .map_err(BlockSourceError::persistent)?; + let pow_valid_block_hash = + self.header.validate_pow(self.header.target()).map_err(BlockSourceError::persistent)?; if pow_valid_block_hash != block_hash { return Err(BlockSourceError::persistent("invalid block hash")); @@ -80,9 +85,8 @@ impl Validate for BlockData { BlockData::HeaderOnly(header) => header, }; - let pow_valid_block_hash = header - .validate_pow(header.target()) - .map_err(BlockSourceError::persistent)?; + let pow_valid_block_hash = + header.validate_pow(header.target()).map_err(BlockSourceError::persistent)?; if pow_valid_block_hash != block_hash { return Err(BlockSourceError::persistent("invalid block hash")); @@ -120,7 +124,9 @@ impl std::ops::Deref for ValidatedBlockHeader { impl ValidatedBlockHeader { /// Checks that the header correctly builds on previous_header: the claimed work differential /// matches the actual PoW and the difficulty transition is possible, i.e., within 4x. - fn check_builds_on(&self, previous_header: &ValidatedBlockHeader, network: Network) -> BlockSourceResult<()> { + fn check_builds_on( + &self, previous_header: &ValidatedBlockHeader, network: Network, + ) -> BlockSourceResult<()> { if self.header.prev_blockhash != previous_header.block_hash { return Err(BlockSourceError::persistent("invalid previous block hash")); } @@ -141,28 +147,28 @@ impl ValidatedBlockHeader { let min_target = previous_target.min_difficulty_transition_threshold(); let max_target = previous_target.max_difficulty_transition_threshold(); if target > max_target || target < min_target { - return Err(BlockSourceError::persistent("invalid difficulty transition")) + return Err(BlockSourceError::persistent("invalid difficulty transition")); } } else if self.header.bits != previous_header.header.bits { - return Err(BlockSourceError::persistent("invalid difficulty")) + return Err(BlockSourceError::persistent("invalid difficulty")); } } Ok(()) } - /// Returns the [`BestBlock`] corresponding to this validated block header, which can be passed - /// into [`ChannelManager::new`] as part of its [`ChainParameters`]. Useful for ensuring that - /// the [`SpvClient`] and [`ChannelManager`] are initialized to the same block during a fresh - /// start. - /// - /// [`SpvClient`]: crate::SpvClient - /// [`ChainParameters`]: lightning::ln::channelmanager::ChainParameters - /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager - /// [`ChannelManager::new`]: lightning::ln::channelmanager::ChannelManager::new - pub fn to_best_block(&self) -> BestBlock { - BestBlock::new(self.block_hash, self.inner.height) - } + /// Returns the [`BestBlock`] corresponding to this validated block header, which can be passed + /// into [`ChannelManager::new`] as part of its [`ChainParameters`]. Useful for ensuring that + /// the [`SpvClient`] and [`ChannelManager`] are initialized to the same block during a fresh + /// start. + /// + /// [`SpvClient`]: crate::SpvClient + /// [`ChainParameters`]: lightning::ln::channelmanager::ChainParameters + /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + /// [`ChannelManager::new`]: lightning::ln::channelmanager::ChannelManager::new + pub fn to_best_block(&self) -> BestBlock { + BestBlock::new(self.block_hash, self.inner.height) + } } /// A block with validated data against its transaction list and corresponding block hash. @@ -191,12 +197,12 @@ mod sealed { /// /// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way /// of validating chain data and checking consistency. -pub struct ChainPoller + Sized + Send + Sync, T: BlockSource + ?Sized> { +pub struct ChainPoller + Sized + Send + Sync, T: BlockSource + ?Sized> { block_source: B, network: Network, } -impl + Sized + Send + Sync, T: BlockSource + ?Sized> ChainPoller { +impl + Sized + Send + Sync, T: BlockSource + ?Sized> ChainPoller { /// Creates a new poller for the given block source. /// /// If the `network` parameter is mainnet, then the difficulty between blocks is checked for @@ -206,19 +212,20 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> ChainPol } } -impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll for ChainPoller { - fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) -> - AsyncBlockSourceResult<'a, ChainTip> - { +impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll + for ChainPoller +{ + fn poll_chain_tip<'a>( + &'a self, best_known_chain_tip: ValidatedBlockHeader, + ) -> AsyncBlockSourceResult<'a, ChainTip> { Box::pin(async move { let (block_hash, height) = self.block_source.get_best_block().await?; if block_hash == best_known_chain_tip.header.block_hash() { return Ok(ChainTip::Common); } - let chain_tip = self.block_source - .get_header(&block_hash, height).await? - .validate(block_hash)?; + let chain_tip = + self.block_source.get_header(&block_hash, height).await?.validate(block_hash)?; if chain_tip.chainwork > best_known_chain_tip.chainwork { Ok(ChainTip::Better(chain_tip)) } else { @@ -227,9 +234,9 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll for }) } - fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) -> - AsyncBlockSourceResult<'a, ValidatedBlockHeader> - { + fn look_up_previous_header<'a>( + &'a self, header: &'a ValidatedBlockHeader, + ) -> AsyncBlockSourceResult<'a, ValidatedBlockHeader> { Box::pin(async move { if header.height == 0 { return Err(BlockSourceError::persistent("genesis block reached")); @@ -237,8 +244,10 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll for let previous_hash = &header.header.prev_blockhash; let height = header.height - 1; - let previous_header = self.block_source - .get_header(previous_hash, Some(height)).await? + let previous_header = self + .block_source + .get_header(previous_hash, Some(height)) + .await? .validate(*previous_hash)?; header.check_builds_on(&previous_header, self.network)?; @@ -246,22 +255,20 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll for }) } - fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) -> - AsyncBlockSourceResult<'a, ValidatedBlock> - { + fn fetch_block<'a>( + &'a self, header: &'a ValidatedBlockHeader, + ) -> AsyncBlockSourceResult<'a, ValidatedBlock> { Box::pin(async move { - self.block_source - .get_block(&header.block_hash).await? - .validate(header.block_hash) + self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) }) } } #[cfg(test)] mod tests { - use crate::*; - use crate::test_utils::Blockchain; use super::*; + use crate::test_utils::Blockchain; + use crate::*; #[tokio::test] async fn poll_empty_chain() { @@ -307,7 +314,10 @@ mod tests { match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); - assert_eq!(e.into_inner().as_ref().to_string(), "block target correct but not attained"); + assert_eq!( + e.into_inner().as_ref().to_string(), + "block target correct but not attained" + ); }, Ok(_) => panic!("Expected error"), } diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index 74a460a7ab5..9473f7b1b6a 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -1,13 +1,13 @@ //! Simple REST client implementation which implements [`BlockSource`] against a Bitcoin Core REST //! endpoint. -use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; -use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse}; -use crate::gossip::UtxoSource; use crate::convert::GetUtxosResponse; +use crate::gossip::UtxoSource; +use crate::http::{BinaryResponse, HttpClient, HttpEndpoint, JsonResponse}; +use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; -use bitcoin::OutPoint; use bitcoin::hash_types::BlockHash; +use bitcoin::OutPoint; use std::convert::TryFrom; use std::convert::TryInto; @@ -29,11 +29,16 @@ impl RestClient { /// Requests a resource encoded in `F` format and interpreted as type `T`. pub async fn request_resource(&self, resource_path: &str) -> std::io::Result - where F: TryFrom, Error = std::io::Error> + TryInto { + where + F: TryFrom, Error = std::io::Error> + TryInto, + { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path); - let mut client = if let Some(client) = self.client.lock().unwrap().take() { client } - else { HttpClient::connect(&self.endpoint)? }; + let mut client = if let Some(client) = self.client.lock().unwrap().take() { + client + } else { + HttpClient::connect(&self.endpoint)? + }; let res = client.get::(&uri, &host).await?.try_into(); *self.client.lock().unwrap() = Some(client); res @@ -41,29 +46,37 @@ impl RestClient { } impl BlockSource for RestClient { - fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>( + &'a self, header_hash: &'a BlockHash, _height: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { let resource_path = format!("headers/1/{}.json", header_hash.to_string()); Ok(self.request_resource::(&resource_path).await?) }) } - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { + fn get_block<'a>( + &'a self, header_hash: &'a BlockHash, + ) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { let resource_path = format!("block/{}.bin", header_hash.to_string()); - Ok(BlockData::FullBlock(self.request_resource::(&resource_path).await?)) + Ok(BlockData::FullBlock( + self.request_resource::(&resource_path).await?, + )) }) } fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { - Box::pin(async move { - Ok(self.request_resource::("chaininfo.json").await?) - }) + Box::pin( + async move { Ok(self.request_resource::("chaininfo.json").await?) }, + ) } } impl UtxoSource for RestClient { - fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { + fn get_block_hash_by_height<'a>( + &'a self, block_height: u32, + ) -> AsyncBlockSourceResult<'a, BlockHash> { Box::pin(async move { let resource_path = format!("blockhashbyheight/{}.bin", block_height); Ok(self.request_resource::(&resource_path).await?) @@ -72,7 +85,8 @@ impl UtxoSource for RestClient { fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { Box::pin(async move { - let resource_path = format!("getutxos/{}-{}.json", outpoint.txid.to_string(), outpoint.vout); + let resource_path = + format!("getutxos/{}-{}.json", outpoint.txid.to_string(), outpoint.vout); let utxo_result = self.request_resource::(&resource_path).await?; Ok(utxo_result.hit_bitmap_nonempty) @@ -83,8 +97,8 @@ impl UtxoSource for RestClient { #[cfg(test)] mod tests { use super::*; - use crate::http::BinaryResponse; use crate::http::client_tests::{HttpServer, MessageBody}; + use crate::http::BinaryResponse; use bitcoin::hashes::Hash; /// Parses binary data as a string-encoded `u32`. @@ -97,7 +111,7 @@ mod tests { Ok(s) => match u32::from_str_radix(s, 10) { Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), Ok(n) => Ok(n), - } + }, } } } @@ -140,7 +154,7 @@ mod tests { let server = HttpServer::responding_with_ok(MessageBody::Content( // A real response contains a few more fields, but we actually only look at the // "bitmap" field, so this should suffice for testing - "{\"chainHeight\": 1, \"bitmap\":\"0\",\"utxos\":[]}" + "{\"chainHeight\": 1, \"bitmap\":\"0\",\"utxos\":[]}", )); let client = RestClient::new(server.endpoint()).unwrap(); @@ -154,7 +168,7 @@ mod tests { let server = HttpServer::responding_with_ok(MessageBody::Content( // A real response contains lots more data, but we actually only look at the "bitmap" // field, so this should suffice for testing - "{\"chainHeight\": 1, \"bitmap\":\"1\",\"utxos\":[]}" + "{\"chainHeight\": 1, \"bitmap\":\"1\",\"utxos\":[]}", )); let client = RestClient::new(server.endpoint()).unwrap(); diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index d296088ae7e..8032d3fccec 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -1,9 +1,9 @@ //! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC //! endpoint. -use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; -use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; use crate::gossip::UtxoSource; +use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; +use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; use bitcoin::hash_types::BlockHash; use bitcoin::OutPoint; @@ -28,9 +28,9 @@ pub struct RpcError { } impl fmt::Display for RpcError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "RPC error {}: {}", self.code, self.message) - } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RPC error {}: {}", self.code, self.message) + } } impl Error for RpcError {} @@ -63,8 +63,12 @@ impl RpcClient { /// /// When an `Err` is returned, [`std::io::Error::into_inner`] may contain an [`RpcError`] if /// [`std::io::Error::kind`] is [`std::io::ErrorKind::Other`]. - pub async fn call_method(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result - where JsonResponse: TryFrom, Error = std::io::Error> + TryInto { + pub async fn call_method( + &self, method: &str, params: &[serde_json::Value], + ) -> std::io::Result + where + JsonResponse: TryFrom, Error = std::io::Error> + TryInto, + { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = self.endpoint.path(); let content = serde_json::json!({ @@ -73,9 +77,13 @@ impl RpcClient { "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let mut client = if let Some(client) = self.client.lock().unwrap().take() { client } - else { HttpClient::connect(&self.endpoint)? }; - let http_response = client.post::(&uri, &host, &self.basic_auth, content).await; + let mut client = if let Some(client) = self.client.lock().unwrap().take() { + client + } else { + HttpClient::connect(&self.endpoint)? + }; + let http_response = + client.post::(&uri, &host, &self.basic_auth, content).await; *self.client.lock().unwrap() = Some(client); let mut response = match http_response { @@ -93,23 +101,30 @@ impl RpcClient { }; if !response.is_object() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "expected JSON object", + )); } let error = &response["error"]; if !error.is_null() { // TODO: Examine error code for a more precise std::io::ErrorKind. - let rpc_error = RpcError { - code: error["code"].as_i64().unwrap_or(-1), - message: error["message"].as_str().unwrap_or("unknown error").to_string() + let rpc_error = RpcError { + code: error["code"].as_i64().unwrap_or(-1), + message: error["message"].as_str().unwrap_or("unknown error").to_string(), }; return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error)); } let result = match response.get_mut("result") { Some(result) => result.take(), - None => - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")), + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "expected JSON result", + )) + }, }; JsonResponse(result).try_into() @@ -117,14 +132,18 @@ impl RpcClient { } impl BlockSource for RpcClient { - fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>( + &'a self, header_hash: &'a BlockHash, _height: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_string()); Ok(self.call_method("getblockheader", &[header_hash]).await?) }) } - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { + fn get_block<'a>( + &'a self, header_hash: &'a BlockHash, + ) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_string()); let verbosity = serde_json::json!(0); @@ -133,14 +152,14 @@ impl BlockSource for RpcClient { } fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { - Box::pin(async move { - Ok(self.call_method("getblockchaininfo", &[]).await?) - }) + Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) }) } } impl UtxoSource for RpcClient { - fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { + fn get_block_hash_by_height<'a>( + &'a self, block_height: u32, + ) -> AsyncBlockSourceResult<'a, BlockHash> { Box::pin(async move { let height_param = serde_json::json!(block_height); Ok(self.call_method("getblockhash", &[height_param]).await?) @@ -152,8 +171,8 @@ impl UtxoSource for RpcClient { let txid_param = serde_json::json!(outpoint.txid.to_string()); let vout_param = serde_json::json!(outpoint.vout); let include_mempool = serde_json::json!(false); - let utxo_opt: serde_json::Value = self.call_method( - "gettxout", &[txid_param, vout_param, include_mempool]).await?; + let utxo_opt: serde_json::Value = + self.call_method("gettxout", &[txid_param, vout_param, include_mempool]).await?; Ok(!utxo_opt.is_null()) }) } @@ -229,7 +248,7 @@ mod tests { #[tokio::test] async fn call_method_returning_missing_result() { - let response = serde_json::json!({ }); + let response = serde_json::json!({}); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 84c65f244f0..506c2183e6b 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,13 +1,16 @@ -use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache}; use crate::poll::{Validate, ValidatedBlockHeader}; +use crate::{ + AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, + UnboundedCache, +}; use bitcoin::blockdata::block::{Block, Header, Version}; use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::hash_types::{BlockHash, TxMerkleNode}; use bitcoin::network::Network; -use bitcoin::Transaction; use bitcoin::transaction; +use bitcoin::Transaction; use lightning::chain; @@ -48,7 +51,7 @@ impl Blockchain { version: transaction::Version(0), lock_time: LockTime::ZERO, input: vec![], - output: vec![] + output: vec![], }; let merkle_root = TxMerkleNode::from_raw_hash(coinbase.txid().to_raw_hash()); self.blocks.push(Block { @@ -135,7 +138,9 @@ impl Blockchain { } impl BlockSource for Blockchain { - fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>( + &'a self, header_hash: &'a BlockHash, _height_hint: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { if self.without_headers { return Err(BlockSourceError::persistent("header not found")); @@ -155,7 +160,9 @@ impl BlockSource for Blockchain { }) } - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { + fn get_block<'a>( + &'a self, header_hash: &'a BlockHash, + ) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { for (height, block) in self.blocks.iter().enumerate() { if block.header.block_hash() == *header_hash { @@ -192,7 +199,10 @@ impl BlockSource for Blockchain { pub struct NullChainListener; impl chain::Listen for NullChainListener { - fn filtered_block_connected(&self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32) {} + fn filtered_block_connected( + &self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32, + ) { + } fn block_disconnected(&self, _header: &Header, _height: u32) {} } @@ -240,7 +250,9 @@ impl chain::Listen for MockChainListener { } } - fn filtered_block_connected(&self, header: &Header, _txdata: &chain::transaction::TransactionData, height: u32) { + fn filtered_block_connected( + &self, header: &Header, _txdata: &chain::transaction::TransactionData, height: u32, + ) { match self.expected_filtered_blocks_connected.borrow_mut().pop_front() { None => { panic!("Unexpected filtered block connected: {:?}", header.block_hash()); diff --git a/lightning-block-sync/src/utils.rs b/lightning-block-sync/src/utils.rs index 98a720a2eb2..746dcb23394 100644 --- a/lightning-block-sync/src/utils.rs +++ b/lightning-block-sync/src/utils.rs @@ -32,13 +32,19 @@ mod tests { #[test] fn hex_to_work_odd_length_str() { let hex = String::from_utf8(vec![b'0'; 65]).unwrap(); - assert_eq!(hex_to_work(&hex), Err(HexToArrayError::Conversion(HexToBytesError::OddLengthString(65)))); + assert_eq!( + hex_to_work(&hex), + Err(HexToArrayError::Conversion(HexToBytesError::OddLengthString(65))) + ); } #[test] fn hex_to_work_invalid_char() { let hex = String::from_utf8(vec![b'G'; 64]).unwrap(); - assert_eq!(hex_to_work(&hex), Err(HexToArrayError::Conversion(HexToBytesError::InvalidChar(b'G')))); + assert_eq!( + hex_to_work(&hex), + Err(HexToArrayError::Conversion(HexToBytesError::InvalidChar(b'G'))) + ); } #[test] diff --git a/rustfmt_excluded_files b/rustfmt_excluded_files index 500330020b3..db6c393815e 100644 --- a/rustfmt_excluded_files +++ b/rustfmt_excluded_files @@ -1,15 +1,3 @@ -./lightning-background-processor/src/lib.rs -./lightning-block-sync/src/convert.rs -./lightning-block-sync/src/gossip.rs -./lightning-block-sync/src/http.rs -./lightning-block-sync/src/init.rs -./lightning-block-sync/src/lib.rs -./lightning-block-sync/src/poll.rs -./lightning-block-sync/src/rest.rs -./lightning-block-sync/src/rpc.rs -./lightning-block-sync/src/test_utils.rs -./lightning-block-sync/src/utils.rs -./lightning-custom-message/src/lib.rs ./lightning-invoice/fuzz/fuzz_targets/serde_data_part.rs ./lightning-invoice/src/de.rs ./lightning-invoice/src/lib.rs