Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Rework block notification #92

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 35 additions & 14 deletions crates/floresta-chain/src/pruned_utreexo/chain_state.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
extern crate alloc;
#[cfg(not(feature = "no-std"))]
extern crate std;

use super::{
chain_state_builder::ChainStateBuilder,
chainparams::ChainParams,
chainstore::{DiskBlockHeader, KvChainStore},
consensus::Consensus,
error::{BlockValidationErrors, BlockchainError},
BlockchainInterface, ChainStore, Notification, UpdatableChainstate,
BlockchainInterface, ChainStore, UpdatableChainstate,
};
use crate::prelude::*;
use crate::{read_lock, write_lock, Network};
use alloc::{borrow::ToOwned, fmt::format, string::ToString, vec::Vec};
use async_std::channel::Sender;
use alloc::{borrow::ToOwned, fmt::format, string::ToString, sync::Arc, vec::Vec};
#[cfg(feature = "bitcoinconsensus")]
use bitcoin::bitcoinconsensus;

Expand All @@ -23,11 +25,28 @@ use bitcoin::{
};
#[cfg(feature = "bitcoinconsensus")]
use core::ffi::c_uint;
use futures::executor::block_on;
use floresta_common::Channel;
use log::{info, trace};
use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof, stump::Stump};
use spin::RwLock;

pub trait BlockConsumer: Sync + Send + 'static {
fn consume_block(&self, block: &Block, height: u32);
}

impl BlockConsumer for Channel<(Block, u32)> {
fn consume_block(&self, block: &Block, height: u32) {
self.send((block.to_owned(), height));
}
}

#[cfg(not(feature = "no-std"))]
impl BlockConsumer for std::sync::mpsc::Sender<(Block, u32)> {
fn consume_block(&self, block: &Block, height: u32) {
let _ = self.send((block.to_owned(), height));
}
}

pub struct ChainStateInner<PersistedState: ChainStore> {
/// The acc we use for validation.
acc: Stump,
Expand All @@ -39,10 +58,13 @@ pub struct ChainStateInner<PersistedState: ChainStore> {
/// writen to broadcast_queue, and the ChainStateBackend can use it's own logic to actually
/// broadcast the tx.
broadcast_queue: Vec<Transaction>,
/// We may have more than one consumer, that access our data through [BlockchainInterface],
/// they might need to be notified about new data coming in, like blocks. They do so by calling
/// `subscribe` and passing a [async_std::channel::Sender]. We save all Senders here.
subscribers: Vec<Sender<Notification>>,
/// We may have multiple mudules that needs to receive and process blocks as they come, to
/// be notified of new blocks, a module should implement the [BlockConsumer] trait, and
/// subscribe by passing an [Arc] of itself to chainstate.
/// When a new block is accepted (as valid) we call `consume_block` from [BlockConsumer].
/// If a mudule just wants pass in a channel, [Sender] implements [BlockConsumer], and can
/// be used during subscription (just keep the [Receiver] side.
subscribers: Vec<Arc<dyn BlockConsumer>>,
/// Fee estimation for 1, 10 and 20 blocks
fee_estimation: (f64, f64, f64),
/// Are we in Initial Block Download?
Expand Down Expand Up @@ -418,12 +440,11 @@ impl<PersistedState: ChainStore> ChainState<PersistedState> {
Ok(())
}
#[allow(clippy::await_holding_lock)]
async fn notify(&self, what: Notification) {
//TODO: Use async-std::RwLock not std::RwLock
fn notify(&self, block: &Block, height: u32) {
let inner = self.inner.read();
let subs = inner.subscribers.iter();
for client in subs {
let _ = client.send(what.clone()).await;
client.consume_block(block, height);
}
}

Expand Down Expand Up @@ -763,7 +784,7 @@ impl<PersistedState: ChainStore> BlockchainInterface for ChainState<PersistedSta
inner.best_block.rescan_index = Some(start_height);
Ok(())
}
fn subscribe(&self, tx: Sender<Notification>) {
fn subscribe(&self, tx: Arc<dyn BlockConsumer>) {
let mut inner = self.inner.write();
inner.subscribers.push(tx);
}
Expand Down Expand Up @@ -846,7 +867,7 @@ impl<PersistedState: ChainStore> UpdatableChainstate for ChainState<PersistedSta
fn process_rescan_block(&self, block: &Block) -> Result<(), BlockchainError> {
let header = self.get_disk_block_header(&block.block_hash())?;
let height = header.height().expect("Recaning in an invalid tip");
block_on(self.notify(Notification::NewBlock((block.to_owned(), height))));
self.notify(block, height);
if self.get_height().unwrap() == height {
info!("Rescan completed");
write_lock!(self).best_block.rescan_index = None;
Expand Down Expand Up @@ -901,7 +922,7 @@ impl<PersistedState: ChainStore> UpdatableChainstate for ChainState<PersistedSta
self.save_acc()?;

// Notify others we have a new block
block_on(self.notify(Notification::NewBlock((block.to_owned(), height))));
self.notify(block, height);
Ok(height)
}

Expand Down
12 changes: 9 additions & 3 deletions crates/floresta-chain/src/pruned_utreexo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
extern crate alloc;

pub mod chain_state;
pub mod chain_state_builder;
pub mod chainparams;
Expand All @@ -8,8 +9,10 @@ pub mod error;
pub mod partial_chain;
pub mod udata;

use crate::{prelude::*, BestChain, BlockchainError, DatabaseError, DiskBlockHeader};
use async_std::channel::Sender;
use crate::{
prelude::*, BestChain, BlockConsumer, BlockchainError, DatabaseError, DiskBlockHeader,
};
use alloc::sync::Arc;
use bitcoin::{hashes::sha256, Block, BlockHash, BlockHeader, OutPoint, Transaction, TxOut};
use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof};

Expand All @@ -35,7 +38,10 @@ pub trait BlockchainInterface {
fn get_block_header(&self, hash: &BlockHash) -> Result<BlockHeader, Self::Error>;
/// Register for receiving notifications for some event. Right now it only works for
/// new blocks, but may work with transactions in the future too.
fn subscribe(&self, tx: Sender<Notification>);
/// if a module performs some heavy-lifting on the block's data, it should pass in a
/// vector or a channel where data can be transfered to the atual worker, otherwise
/// chainstate will be stuck for as long as you have work to do.
fn subscribe(&self, tx: Arc<dyn BlockConsumer>);
/// Tells whether or not we are on ibd
fn is_in_idb(&self) -> bool;
/// Returns the list of unbroadcasted transactions.
Expand Down
1 change: 1 addition & 0 deletions crates/floresta-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ readme = "README.md"
sha2 = "^0.10.6"
bitcoin = "0.29.2"
miniscript = { git = "https://github.com/douglaz/rust-miniscript.git", optional = true, branch = "master-2023-03-30" }
spin = "0.9.8"

[features]
default = ["descriptors"]
Expand Down
4 changes: 4 additions & 0 deletions crates/floresta-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use prelude::*;

use sha2::Digest;
pub mod constants;
pub mod spsc;

pub use spsc::Channel;

pub fn get_hash_from_u8(data: &[u8]) -> sha256::Hash {
let hash = sha2::Sha256::new().chain_update(data).finalize();
Expand All @@ -19,6 +22,7 @@ pub fn get_spk_hash(spk: &Script) -> sha256::Hash {
hash.reverse();
sha256::Hash::from_slice(hash.as_slice()).expect("Engines shouldn't be Err")
}

#[cfg(feature = "descriptors")]
pub fn parse_descriptors(
descriptors: &[String],
Expand Down
104 changes: 104 additions & 0 deletions crates/floresta-common/src/spsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! A no-std Single Producer, Single Consumer channel for unidirectional message exchange between
//! modules. This module don't use anything from the standard lib and can be easily used in no-std
//! enviroments. We only use mem::take from [core].

use crate::prelude::Vec;
use core::mem::take;

/// A (Send + Sync) single producer, single consumer channel to notify modules about things.
/// The api is super minimalistic to reduce external dependecies, including from the std-lib
///
/// One notable difference from the standard mspc channel is that this channel's ends are't
/// two different types, while this is possible, there's no reason to do that. Specially
/// considering that to get a good compile-time asurance that both ends will not be shared, the
/// channel must not be [Send], but this is one of the main requirements to use this channel in
/// async code. Moreover, if two worker threads are meant to be identical threads balancing their
/// work, it might be beneficial to use this same channel as a de-facto single producer, multiple
/// consumer channel for work distribution.
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = spsc::Channel::new();
///
/// // Send something
/// channel.send(1);
/// // Read the same thing back
/// assert_eq!(channel.recv().next(), Some(1));
/// ```
#[derive(Debug, Default)]
pub struct Channel<T> {
/// The data pending for read
content: spin::Mutex<Vec<T>>,
}

impl<T> Channel<T> {
/// Creates a new channel
///
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = spsc::Channel::new();
///
/// channel.send(1);
/// assert_eq!(channel.recv().next(), Some(1));
/// ```
pub fn new() -> Self {
Channel {
content: spin::Mutex::new(Vec::new()),
}
}
/// Sends some data through a channel
///
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = spsc::Channel::new();
///
/// channel.send(1);
/// assert_eq!(channel.recv().next(), Some(1));
/// ```
pub fn send(&self, data: T) {
self.content.lock().push(data);
}
/// Reads from a channel
///
/// This method returns an iterator over all alements inside a [Channel]
pub fn recv(&self) -> RecvIter<T> {
let inner = take(&mut *self.content.lock());
RecvIter { inner }
}
}

/// An iterator issued every time someone calls `recv`.
///
/// This iterator takes all itens available for reading in a channel
/// and lets the consumer iterate over them, without acquiring the lock
/// every time (the mutex is only locked when `recv` is called).
///
/// # Example
/// ```
/// use floresta_common::spsc;
/// let channel = spsc::Channel::new();
///
/// channel.send(0);
/// channel.send(1);
///
/// for (i, el) in channel.recv().enumerate() {
/// assert_eq!(i, el);
/// }
/// // A second read should create an empty iterator
/// assert_eq!(channel.recv().next(), None);
/// ```
pub struct RecvIter<T> {
inner: Vec<T>,
}

impl<T> Iterator for RecvIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.inner.is_empty() {
return None;
}
Some(self.inner.remove(0))
}
}
93 changes: 46 additions & 47 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use crate::request::Request;
use crate::{get_arg, json_rpc_res};
use bitcoin::hashes::hex::FromHex;
use floresta_chain::pruned_utreexo::BlockchainInterface;
use floresta_chain::Notification;
use floresta_common::spsc::Channel;
use floresta_common::{get_hash_from_u8, get_spk_hash};
use floresta_watch_only::kv_database::KvDatabase;
use floresta_watch_only::{AddressCache, CachedTransaction};
use futures::{select, FutureExt};

use async_std::sync::RwLock;
use async_std::{
Expand Down Expand Up @@ -349,57 +348,57 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}

pub async fn main_loop(mut self) -> Result<(), crate::error::Error> {
let (tx, mut rx) = unbounded::<Notification>();
self.chain.subscribe(tx);
let blocks = Channel::new();
let blocks = Arc::new(blocks);

self.chain.subscribe(blocks.clone());

loop {
select! {
notification = rx.next().fuse() => {
if let Some(notification) = notification {
self.handle_notification(notification).await;
}
},
message = self.peer_accept.next().fuse() => {
if let Some(message) = message {
self.handle_message(message).await?;
}
}
};
for (block, height) in blocks.recv() {
self.handle_block(block, height).await;
}
let request = async_std::future::timeout(
std::time::Duration::from_secs(1),
self.peer_accept.recv(),
)
.await;

if let Ok(Ok(message)) = request {
self.handle_message(message).await?;
}
}
}
async fn handle_notification(&mut self, notification: Notification) {
match notification {
Notification::NewBlock((block, height)) => {
let result = json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [{
"height": height,
"hex": serialize(&block.header).to_hex()
}]
});
if !self.chain.is_in_idb() || height % 1000 == 0 {
let lock = self.address_cache.write().await;
lock.bump_height(height);
}
if self.chain.get_height().unwrap() == height {
for peer in &mut self.peers.values() {
let res = peer
.write(serde_json::to_string(&result).unwrap().as_bytes())
.await;
if res.is_err() {
info!("Could not write to peer {:?}", peer);
}
}
}
let transactions = self
.address_cache
.write()
.await
.block_process(&block, height);

self.wallet_notify(&transactions).await;
async fn handle_block(&mut self, block: bitcoin::Block, height: u32) {
let result = json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [{
"height": height,
"hex": serialize(&block.header).to_hex()
}]
});
if !self.chain.is_in_idb() || height % 1000 == 0 {
let lock = self.address_cache.write().await;
lock.bump_height(height);
}
if self.chain.get_height().unwrap() == height {
for peer in &mut self.peers.values() {
let res = peer
.write(serde_json::to_string(&result).unwrap().as_bytes())
.await;
if res.is_err() {
info!("Could not write to peer {:?}", peer);
}
}
}
let transactions = self
.address_cache
.write()
.await
.block_process(&block, height);

self.wallet_notify(&transactions).await;
}
async fn handle_message(&mut self, message: Message) -> Result<(), crate::error::Error> {
match message {
Expand Down
Loading