Skip to content

Commit

Permalink
WIP: Rework block notification
Browse files Browse the repository at this point in the history
  • Loading branch information
Davidson-Souza committed Nov 9, 2023
1 parent 99bdc88 commit 42231ea
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 64 deletions.
49 changes: 35 additions & 14 deletions crates/floresta-chain/src/pruned_utreexo/chain_state.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
extern crate alloc;
#[cfg(not(feature = "no-std"))]
extern crate std;

use super::{
chain_state_builder::ChainStateBuilder,
chainparams::ChainParams,
chainstore::{DiskBlockHeader, KvChainStore},
consensus::Consensus,
error::{BlockValidationErrors, BlockchainError},
BlockchainInterface, ChainStore, Notification, UpdatableChainstate,
BlockchainInterface, ChainStore, UpdatableChainstate,
};
use crate::prelude::*;
use crate::{read_lock, write_lock, Network};
use alloc::{borrow::ToOwned, fmt::format, string::ToString, vec::Vec};
use async_std::channel::Sender;
use alloc::{borrow::ToOwned, fmt::format, string::ToString, sync::Arc, vec::Vec};
#[cfg(feature = "bitcoinconsensus")]
use bitcoin::bitcoinconsensus;

Expand All @@ -21,13 +23,30 @@ use bitcoin::{
util::uint::Uint256,
Block, BlockHash, BlockHeader, OutPoint, Transaction, TxOut,
};
use floresta_common::Channel;
#[cfg(feature = "bitcoinconsensus")]
use core::ffi::c_uint;
use futures::executor::block_on;
use log::{info, trace};
use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof, stump::Stump};
use spin::RwLock;

pub trait BlockConsumer: Sync + Send + 'static {
fn consume_block(&self, block: &Block, height: u32);
}

impl BlockConsumer for Channel<(Block, u32)> {
fn consume_block(&self, block: &Block, height: u32) {
self.send((block.to_owned(), height));
}
}

#[cfg(not(feature = "no-std"))]
impl BlockConsumer for std::sync::mpsc::Sender<(Block, u32)> {
fn consume_block(&self, block: &Block, height: u32) {
let _ = self.send((block.to_owned(), height));
}
}

pub struct ChainStateInner<PersistedState: ChainStore> {
/// The acc we use for validation.
acc: Stump,
Expand All @@ -39,10 +58,13 @@ pub struct ChainStateInner<PersistedState: ChainStore> {
/// writen to broadcast_queue, and the ChainStateBackend can use it's own logic to actually
/// broadcast the tx.
broadcast_queue: Vec<Transaction>,
/// We may have more than one consumer, that access our data through [BlockchainInterface],
/// they might need to be notified about new data coming in, like blocks. They do so by calling
/// `subscribe` and passing a [async_std::channel::Sender]. We save all Senders here.
subscribers: Vec<Sender<Notification>>,
/// We may have multiple mudules that needs to receive and process blocks as they come, to
/// be notified of new blocks, a module should implement the [BlockConsumer] trait, and
/// subscribe by passing an [Arc] of itself to chainstate.
/// When a new block is accepted (as valid) we call `consume_block` from [BlockConsumer].
/// If a mudule just wants pass in a channel, [Sender] implements [BlockConsumer], and can
/// be used during subscription (just keep the [Receiver] side.
subscribers: Vec<Arc<dyn BlockConsumer>>,
/// Fee estimation for 1, 10 and 20 blocks
fee_estimation: (f64, f64, f64),
/// Are we in Initial Block Download?
Expand Down Expand Up @@ -418,12 +440,11 @@ impl<PersistedState: ChainStore> ChainState<PersistedState> {
Ok(())
}
#[allow(clippy::await_holding_lock)]
async fn notify(&self, what: Notification) {
//TODO: Use async-std::RwLock not std::RwLock
fn notify(&self, block: &Block, height: u32) {
let inner = self.inner.read();
let subs = inner.subscribers.iter();
for client in subs {
let _ = client.send(what.clone()).await;
client.consume_block(block, height);
}
}

Expand Down Expand Up @@ -763,7 +784,7 @@ impl<PersistedState: ChainStore> BlockchainInterface for ChainState<PersistedSta
inner.best_block.rescan_index = Some(start_height);
Ok(())
}
fn subscribe(&self, tx: Sender<Notification>) {
fn subscribe(&self, tx: Arc<dyn BlockConsumer>) {
let mut inner = self.inner.write();
inner.subscribers.push(tx);
}
Expand Down Expand Up @@ -846,7 +867,7 @@ impl<PersistedState: ChainStore> UpdatableChainstate for ChainState<PersistedSta
fn process_rescan_block(&self, block: &Block) -> Result<(), BlockchainError> {
let header = self.get_disk_block_header(&block.block_hash())?;
let height = header.height().expect("Recaning in an invalid tip");
block_on(self.notify(Notification::NewBlock((block.to_owned(), height))));
self.notify(block, height);
if self.get_height().unwrap() == height {
info!("Rescan completed");
write_lock!(self).best_block.rescan_index = None;
Expand Down Expand Up @@ -901,7 +922,7 @@ impl<PersistedState: ChainStore> UpdatableChainstate for ChainState<PersistedSta
self.save_acc()?;

// Notify others we have a new block
block_on(self.notify(Notification::NewBlock((block.to_owned(), height))));
self.notify(block, height);
Ok(height)
}

Expand Down
12 changes: 9 additions & 3 deletions crates/floresta-chain/src/pruned_utreexo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
extern crate alloc;

pub mod chain_state;
pub mod chain_state_builder;
pub mod chainparams;
Expand All @@ -8,8 +9,10 @@ pub mod error;
pub mod partial_chain;
pub mod udata;

use crate::{prelude::*, BestChain, BlockchainError, DatabaseError, DiskBlockHeader};
use async_std::channel::Sender;
use crate::{
prelude::*, BestChain, BlockConsumer, BlockchainError, DatabaseError, DiskBlockHeader,
};
use alloc::sync::Arc;
use bitcoin::{hashes::sha256, Block, BlockHash, BlockHeader, OutPoint, Transaction, TxOut};
use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof};

Expand All @@ -35,7 +38,10 @@ pub trait BlockchainInterface {
fn get_block_header(&self, hash: &BlockHash) -> Result<BlockHeader, Self::Error>;
/// Register for receiving notifications for some event. Right now it only works for
/// new blocks, but may work with transactions in the future too.
fn subscribe(&self, tx: Sender<Notification>);
/// if a module performs some heavy-lifting on the block's data, it should pass in a
/// vector or a channel where data can be transfered to the atual worker, otherwise
/// chainstate will be stuck for as long as you have work to do.
fn subscribe(&self, tx: Arc<dyn BlockConsumer>);
/// Tells whether or not we are on ibd
fn is_in_idb(&self) -> bool;
/// Returns the list of unbroadcasted transactions.
Expand Down
1 change: 1 addition & 0 deletions crates/floresta-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ readme = "README.md"
sha2 = "^0.10.6"
bitcoin = "0.29.2"
miniscript = { git = "https://github.com/douglaz/rust-miniscript.git", optional = true, branch = "master-2023-03-30" }
spin = "0.9.8"

[features]
default = ["descriptors"]
Expand Down
4 changes: 4 additions & 0 deletions crates/floresta-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use prelude::*;

use sha2::Digest;
pub mod constants;
pub mod spsc;

pub use spsc::Channel;

pub fn get_hash_from_u8(data: &[u8]) -> sha256::Hash {
let hash = sha2::Sha256::new().chain_update(data).finalize();
Expand All @@ -19,6 +22,7 @@ pub fn get_spk_hash(spk: &Script) -> sha256::Hash {
hash.reverse();
sha256::Hash::from_slice(hash.as_slice()).expect("Engines shouldn't be Err")
}

#[cfg(feature = "descriptors")]
pub fn parse_descriptors(
descriptors: &[String],
Expand Down
101 changes: 101 additions & 0 deletions crates/floresta-common/src/spsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! A no-std Single Producer, Single Consumer channel for unidirectional message exchange between
//! modules. This module don't use anything from the standard lib and can be easily used in no-std
//! enviroments. We only use mem::take from [core].
use core::mem::take;
use crate::prelude::Vec;

/// A (Send + Sync) single producer, single consumer channel to notify modules about things.
/// The api is super minimalistic to reduce external dependecies, including from the std-lib
///
/// One notable difference from the standard mspc channel is that this channel's ends are't
/// two different types, while this is possible, there's no reason to do that. Specially
/// considering that to get a good compile-time asurance that both ends will not be shared, the
/// channel must not be [Send], but this is one of the main requirements to use this channel in
/// async code. Moreover, if two worker threads are meant to be identical threads balancing their
/// work, it might be beneficial to use this same channel as a de-facto single producer, multiple
/// consumer channel for work distribution.
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = spsc::Channel::new();
///
/// // Send something
/// channel.send(1);
/// // Read the same thing back
/// assert_eq!(channel.recv().next(), Some(1));
/// ```
pub struct Channel<T> {
/// The data pending for read
content: spin::Mutex<Vec<T>>,
}

impl<T> Channel<T> {
/// Creates a new channel
///
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = block_stream::Channel::new();
///
/// channel.send(1);
/// assert_eq!(channel.recv().next(), Some(1));
/// ```
pub fn new() -> Self {
Channel {
content: spin::Mutex::new(Vec::new()),
}
}
/// Sends some data through a channel
///
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = block_stream::Channel::new();
///
/// channel.send(1);
/// assert_eq!(channel.recv().next(), Some(1));
/// ```
pub fn send(&self, data: T) {
self.content.lock().push(data);
}
/// Reads from a channel
///
/// This method returns an iterator over all alements inside a [Channel]
pub fn recv(&self) -> RecvIter<T> {
let inner = take(&mut *self.content.lock());
RecvIter { inner }
}
}

/// An iterator issued every time someone calls `recv`.
///
/// This iterator takes all itens available for reading in a channel
/// and lets the consumer iterate over them, without acquiring the lock
/// every time (the mutex is only locked when `recv` is called).
///
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = spsc::Channel::new();
///
/// channel.send(0);
/// channel.send(1);
///
/// for (i, el) in channel.recv().enumerate() {
/// assert_eq!(i, el);
/// }
/// // A second read should create an empty iterator
/// assert_eq!(channel.recv().next(), None);
/// ```
pub struct RecvIter<T> {
inner: Vec<T>,
}

impl<T> Iterator for RecvIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.inner.pop()
}
}
93 changes: 46 additions & 47 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use crate::request::Request;
use crate::{get_arg, json_rpc_res};
use bitcoin::hashes::hex::FromHex;
use floresta_chain::pruned_utreexo::BlockchainInterface;
use floresta_chain::Notification;
use floresta_common::spsc::Channel;
use floresta_common::{get_hash_from_u8, get_spk_hash};
use floresta_watch_only::kv_database::KvDatabase;
use floresta_watch_only::{AddressCache, CachedTransaction};
use futures::{select, FutureExt};

use async_std::sync::RwLock;
use async_std::{
Expand Down Expand Up @@ -349,57 +348,57 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}

pub async fn main_loop(mut self) -> Result<(), crate::error::Error> {
let (tx, mut rx) = unbounded::<Notification>();
self.chain.subscribe(tx);
let blocks = Channel::new();
let blocks = Arc::new(blocks);

self.chain.subscribe(blocks.clone());

loop {
select! {
notification = rx.next().fuse() => {
if let Some(notification) = notification {
self.handle_notification(notification).await;
}
},
message = self.peer_accept.next().fuse() => {
if let Some(message) = message {
self.handle_message(message).await?;
}
}
};
for (block, height) in blocks.recv() {
self.handle_block(block, height).await;
}
let request = async_std::future::timeout(
std::time::Duration::from_secs(1),
self.peer_accept.recv(),
)
.await;

if let Ok(Ok(message)) = request {
self.handle_message(message).await?;
}
}
}
async fn handle_notification(&mut self, notification: Notification) {
match notification {
Notification::NewBlock((block, height)) => {
let result = json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [{
"height": height,
"hex": serialize(&block.header).to_hex()
}]
});
if !self.chain.is_in_idb() || height % 1000 == 0 {
let lock = self.address_cache.write().await;
lock.bump_height(height);
}
if self.chain.get_height().unwrap() == height {
for peer in &mut self.peers.values() {
let res = peer
.write(serde_json::to_string(&result).unwrap().as_bytes())
.await;
if res.is_err() {
info!("Could not write to peer {:?}", peer);
}
}
}
let transactions = self
.address_cache
.write()
.await
.block_process(&block, height);

self.wallet_notify(&transactions).await;
async fn handle_block(&mut self, block: bitcoin::Block, height: u32) {
let result = json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [{
"height": height,
"hex": serialize(&block.header).to_hex()
}]
});
if !self.chain.is_in_idb() || height % 1000 == 0 {
let lock = self.address_cache.write().await;
lock.bump_height(height);
}
if self.chain.get_height().unwrap() == height {
for peer in &mut self.peers.values() {
let res = peer
.write(serde_json::to_string(&result).unwrap().as_bytes())
.await;
if res.is_err() {
info!("Could not write to peer {:?}", peer);
}
}
}
let transactions = self
.address_cache
.write()
.await
.block_process(&block, height);

self.wallet_notify(&transactions).await;
}
async fn handle_message(&mut self, message: Message) -> Result<(), crate::error::Error> {
match message {
Expand Down

0 comments on commit 42231ea

Please sign in to comment.