diff --git a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs index 481fb993..61bac329 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs @@ -1,16 +1,18 @@ extern crate alloc; +#[cfg(not(feature = "no-std"))] +extern crate std; + use super::{ chain_state_builder::ChainStateBuilder, chainparams::ChainParams, chainstore::{DiskBlockHeader, KvChainStore}, consensus::Consensus, error::{BlockValidationErrors, BlockchainError}, - BlockchainInterface, ChainStore, Notification, UpdatableChainstate, + BlockchainInterface, ChainStore, UpdatableChainstate, }; use crate::prelude::*; use crate::{read_lock, write_lock, Network}; -use alloc::{borrow::ToOwned, fmt::format, string::ToString, vec::Vec}; -use async_std::channel::Sender; +use alloc::{borrow::ToOwned, fmt::format, string::ToString, sync::Arc, vec::Vec}; #[cfg(feature = "bitcoinconsensus")] use bitcoin::bitcoinconsensus; @@ -21,13 +23,30 @@ use bitcoin::{ util::uint::Uint256, Block, BlockHash, BlockHeader, OutPoint, Transaction, TxOut, }; +use floresta_common::Channel; #[cfg(feature = "bitcoinconsensus")] use core::ffi::c_uint; -use futures::executor::block_on; use log::{info, trace}; use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof, stump::Stump}; use spin::RwLock; +pub trait BlockConsumer: Sync + Send + 'static { + fn consume_block(&self, block: &Block, height: u32); +} + +impl BlockConsumer for Channel<(Block, u32)> { + fn consume_block(&self, block: &Block, height: u32) { + self.send((block.to_owned(), height)); + } +} + +#[cfg(not(feature = "no-std"))] +impl BlockConsumer for std::sync::mpsc::Sender<(Block, u32)> { + fn consume_block(&self, block: &Block, height: u32) { + let _ = self.send((block.to_owned(), height)); + } +} + pub struct ChainStateInner { /// The acc we use for validation. acc: Stump, @@ -39,10 +58,13 @@ pub struct ChainStateInner { /// writen to broadcast_queue, and the ChainStateBackend can use it's own logic to actually /// broadcast the tx. broadcast_queue: Vec, - /// We may have more than one consumer, that access our data through [BlockchainInterface], - /// they might need to be notified about new data coming in, like blocks. They do so by calling - /// `subscribe` and passing a [async_std::channel::Sender]. We save all Senders here. - subscribers: Vec>, + /// We may have multiple mudules that needs to receive and process blocks as they come, to + /// be notified of new blocks, a module should implement the [BlockConsumer] trait, and + /// subscribe by passing an [Arc] of itself to chainstate. + /// When a new block is accepted (as valid) we call `consume_block` from [BlockConsumer]. + /// If a mudule just wants pass in a channel, [Sender] implements [BlockConsumer], and can + /// be used during subscription (just keep the [Receiver] side. + subscribers: Vec>, /// Fee estimation for 1, 10 and 20 blocks fee_estimation: (f64, f64, f64), /// Are we in Initial Block Download? @@ -418,12 +440,11 @@ impl ChainState { Ok(()) } #[allow(clippy::await_holding_lock)] - async fn notify(&self, what: Notification) { - //TODO: Use async-std::RwLock not std::RwLock + fn notify(&self, block: &Block, height: u32) { let inner = self.inner.read(); let subs = inner.subscribers.iter(); for client in subs { - let _ = client.send(what.clone()).await; + client.consume_block(block, height); } } @@ -763,7 +784,7 @@ impl BlockchainInterface for ChainState) { + fn subscribe(&self, tx: Arc) { let mut inner = self.inner.write(); inner.subscribers.push(tx); } @@ -846,7 +867,7 @@ impl UpdatableChainstate for ChainState Result<(), BlockchainError> { let header = self.get_disk_block_header(&block.block_hash())?; let height = header.height().expect("Recaning in an invalid tip"); - block_on(self.notify(Notification::NewBlock((block.to_owned(), height)))); + self.notify(block, height); if self.get_height().unwrap() == height { info!("Rescan completed"); write_lock!(self).best_block.rescan_index = None; @@ -901,7 +922,7 @@ impl UpdatableChainstate for ChainState Result; /// Register for receiving notifications for some event. Right now it only works for /// new blocks, but may work with transactions in the future too. - fn subscribe(&self, tx: Sender); + /// if a module performs some heavy-lifting on the block's data, it should pass in a + /// vector or a channel where data can be transfered to the atual worker, otherwise + /// chainstate will be stuck for as long as you have work to do. + fn subscribe(&self, tx: Arc); /// Tells whether or not we are on ibd fn is_in_idb(&self) -> bool; /// Returns the list of unbroadcasted transactions. diff --git a/crates/floresta-common/Cargo.toml b/crates/floresta-common/Cargo.toml index 9c6531bf..5d8c5bb5 100644 --- a/crates/floresta-common/Cargo.toml +++ b/crates/floresta-common/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" sha2 = "^0.10.6" bitcoin = "0.29.2" miniscript = { git = "https://github.com/douglaz/rust-miniscript.git", optional = true, branch = "master-2023-03-30" } +spin = "0.9.8" [features] default = ["descriptors"] diff --git a/crates/floresta-common/src/lib.rs b/crates/floresta-common/src/lib.rs index 7ffa11f0..0ea4ce27 100644 --- a/crates/floresta-common/src/lib.rs +++ b/crates/floresta-common/src/lib.rs @@ -7,6 +7,9 @@ use prelude::*; use sha2::Digest; pub mod constants; +pub mod spsc; + +pub use spsc::Channel; pub fn get_hash_from_u8(data: &[u8]) -> sha256::Hash { let hash = sha2::Sha256::new().chain_update(data).finalize(); @@ -19,6 +22,7 @@ pub fn get_spk_hash(spk: &Script) -> sha256::Hash { hash.reverse(); sha256::Hash::from_slice(hash.as_slice()).expect("Engines shouldn't be Err") } + #[cfg(feature = "descriptors")] pub fn parse_descriptors( descriptors: &[String], diff --git a/crates/floresta-common/src/spsc.rs b/crates/floresta-common/src/spsc.rs new file mode 100644 index 00000000..650de03f --- /dev/null +++ b/crates/floresta-common/src/spsc.rs @@ -0,0 +1,101 @@ +//! A no-std Single Producer, Single Consumer channel for unidirectional message exchange between +//! modules. This module don't use anything from the standard lib and can be easily used in no-std +//! enviroments. We only use mem::take from [core]. + +use core::mem::take; +use crate::prelude::Vec; + +/// A (Send + Sync) single producer, single consumer channel to notify modules about things. +/// The api is super minimalistic to reduce external dependecies, including from the std-lib +/// +/// One notable difference from the standard mspc channel is that this channel's ends are't +/// two different types, while this is possible, there's no reason to do that. Specially +/// considering that to get a good compile-time asurance that both ends will not be shared, the +/// channel must not be [Send], but this is one of the main requirements to use this channel in +/// async code. Moreover, if two worker threads are meant to be identical threads balancing their +/// work, it might be beneficial to use this same channel as a de-facto single producer, multiple +/// consumer channel for work distribution. +/// # Example +/// ``` +/// use floresta_common::spsc; +/// let channel = spsc::Channel::new(); +/// +/// // Send something +/// channel.send(1); +/// // Read the same thing back +/// assert_eq!(channel.recv().next(), Some(1)); +/// ``` + +pub struct Channel { + /// The data pending for read + content: spin::Mutex>, +} + +impl Channel { + /// Creates a new channel + /// + /// # Example + /// ``` + /// use floresta_common::spsc; + /// let channel = block_stream::Channel::new(); + /// + /// channel.send(1); + /// assert_eq!(channel.recv().next(), Some(1)); + /// ``` + pub fn new() -> Self { + Channel { + content: spin::Mutex::new(Vec::new()), + } + } + /// Sends some data through a channel + /// + /// # Example + /// ``` + /// use floresta_common::spsc; + /// let channel = block_stream::Channel::new(); + /// + /// channel.send(1); + /// assert_eq!(channel.recv().next(), Some(1)); + /// ``` + pub fn send(&self, data: T) { + self.content.lock().push(data); + } + /// Reads from a channel + /// + /// This method returns an iterator over all alements inside a [Channel] + pub fn recv(&self) -> RecvIter { + let inner = take(&mut *self.content.lock()); + RecvIter { inner } + } +} + +/// An iterator issued every time someone calls `recv`. +/// +/// This iterator takes all itens available for reading in a channel +/// and lets the consumer iterate over them, without acquiring the lock +/// every time (the mutex is only locked when `recv` is called). +/// +/// # Example +/// ``` +/// use floresta_common::spsc; +/// let channel = spsc::Channel::new(); +/// +/// channel.send(0); +/// channel.send(1); +/// +/// for (i, el) in channel.recv().enumerate() { +/// assert_eq!(i, el); +/// } +/// // A second read should create an empty iterator +/// assert_eq!(channel.recv().next(), None); +/// ``` +pub struct RecvIter { + inner: Vec, +} + +impl Iterator for RecvIter { + type Item = T; + fn next(&mut self) -> Option { + self.inner.pop() + } +} diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index dea8e6b6..101368d5 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -2,11 +2,10 @@ use crate::request::Request; use crate::{get_arg, json_rpc_res}; use bitcoin::hashes::hex::FromHex; use floresta_chain::pruned_utreexo::BlockchainInterface; -use floresta_chain::Notification; +use floresta_common::spsc::Channel; use floresta_common::{get_hash_from_u8, get_spk_hash}; use floresta_watch_only::kv_database::KvDatabase; use floresta_watch_only::{AddressCache, CachedTransaction}; -use futures::{select, FutureExt}; use async_std::sync::RwLock; use async_std::{ @@ -349,57 +348,57 @@ impl ElectrumServer { } pub async fn main_loop(mut self) -> Result<(), crate::error::Error> { - let (tx, mut rx) = unbounded::(); - self.chain.subscribe(tx); + let blocks = Channel::new(); + let blocks = Arc::new(blocks); + + self.chain.subscribe(blocks.clone()); + loop { - select! { - notification = rx.next().fuse() => { - if let Some(notification) = notification { - self.handle_notification(notification).await; - } - }, - message = self.peer_accept.next().fuse() => { - if let Some(message) = message { - self.handle_message(message).await?; - } - } - }; + for (block, height) in blocks.recv() { + self.handle_block(block, height).await; + } + let request = async_std::future::timeout( + std::time::Duration::from_secs(1), + self.peer_accept.recv(), + ) + .await; + + if let Ok(Ok(message)) = request { + self.handle_message(message).await?; + } } } - async fn handle_notification(&mut self, notification: Notification) { - match notification { - Notification::NewBlock((block, height)) => { - let result = json!({ - "jsonrpc": "2.0", - "method": "blockchain.headers.subscribe", - "params": [{ - "height": height, - "hex": serialize(&block.header).to_hex() - }] - }); - if !self.chain.is_in_idb() || height % 1000 == 0 { - let lock = self.address_cache.write().await; - lock.bump_height(height); - } - if self.chain.get_height().unwrap() == height { - for peer in &mut self.peers.values() { - let res = peer - .write(serde_json::to_string(&result).unwrap().as_bytes()) - .await; - if res.is_err() { - info!("Could not write to peer {:?}", peer); - } - } - } - let transactions = self - .address_cache - .write() - .await - .block_process(&block, height); - self.wallet_notify(&transactions).await; + async fn handle_block(&mut self, block: bitcoin::Block, height: u32) { + let result = json!({ + "jsonrpc": "2.0", + "method": "blockchain.headers.subscribe", + "params": [{ + "height": height, + "hex": serialize(&block.header).to_hex() + }] + }); + if !self.chain.is_in_idb() || height % 1000 == 0 { + let lock = self.address_cache.write().await; + lock.bump_height(height); + } + if self.chain.get_height().unwrap() == height { + for peer in &mut self.peers.values() { + let res = peer + .write(serde_json::to_string(&result).unwrap().as_bytes()) + .await; + if res.is_err() { + info!("Could not write to peer {:?}", peer); + } } } + let transactions = self + .address_cache + .write() + .await + .block_process(&block, height); + + self.wallet_notify(&transactions).await; } async fn handle_message(&mut self, message: Message) -> Result<(), crate::error::Error> { match message {