Skip to content

Commit

Permalink
Merge pull request #235 from rustaceanrob/block-request-12-5
Browse files Browse the repository at this point in the history
Request `IndexedBlock` directly from `Client`
  • Loading branch information
rustaceanrob authored Dec 6, 2024
2 parents a3b9b36 + d1259eb commit 183d397
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 58 deletions.
15 changes: 9 additions & 6 deletions example/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
use kyoto::core::messages::NodeMessage;
use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder};
use kyoto::{AddrV2, Address, Network, ServiceFlags, TrustedPeer};
use kyoto::{AddrV2, Address, BlockHash, Network, ServiceFlags, TrustedPeer};
use std::collections::HashSet;
use std::{net::Ipv4Addr, str::FromStr};

const NETWORK: Network = Network::Signet;
const RECOVERY_HEIGHT: u32 = 170_000;
const RECOVERY_HEIGHT: u32 = 201_000;
const RECOVERY_HASH: &str = "0000002238d05b522875f9edc4c9f418dd89ccfde7e4c305e8448a87a5dc71b7";
const ADDR: &str = "tb1q9pvjqz5u5sdgpatg3wn0ce438u5cyv85lly0pc";

#[tokio::main]
Expand All @@ -18,7 +19,8 @@ async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Use a predefined checkpoint
let checkpoint = HeaderCheckpoint::closest_checkpoint_below_height(RECOVERY_HEIGHT, NETWORK);
let checkpoint =
HeaderCheckpoint::new(RECOVERY_HEIGHT, BlockHash::from_str(RECOVERY_HASH).unwrap());
// Add Bitcoin scripts to scan the blockchain for
let address = Address::from_str(ADDR)
.unwrap()
Expand Down Expand Up @@ -61,11 +63,12 @@ async fn main() {
tracing::info!("Connected to all required peers");
}
NodeMessage::IndexedFilter(mut filter) => {
let height = filter.height();
tracing::info!("Checking filter {}", height);
if filter.contains_any(&addresses).await {
let hash = filter.block_hash();
let hash = *filter.block_hash();
tracing::info!("Found script at {}!", hash);
let indexed_block = client.request_block(hash).await.unwrap();
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
tracing::info!("Coinbase transaction ID: {}", coinbase);
break;
}
}
Expand Down
5 changes: 4 additions & 1 deletion example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ async fn main() {
NodeMessage::Dialog(d) => tracing::info!("{d}"),
NodeMessage::Warning(e) => tracing::warn!("{e}"),
NodeMessage::StateChange(s) => tracing::info!("State update: {s}"),
NodeMessage::Block(b) => drop(b),
NodeMessage::Block(b) => {
let block = b.block.block_hash();
tracing::info!("Received block: {block}");
}
NodeMessage::BlocksDisconnected(r) => {
let _ = r;
}
Expand Down
119 changes: 92 additions & 27 deletions src/chain/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ use std::collections::VecDeque;
use bitcoin::BlockHash;
use tokio::time::Instant;

#[cfg(feature = "filter-control")]
use crate::core::messages::BlockRequest;
use crate::core::messages::BlockSender;

const SPAM_LIMIT: u64 = 5;

#[derive(Debug)]
pub(crate) struct BlockQueue {
queue: VecDeque<BlockHash>,
want: Option<BlockHash>,
queue: VecDeque<Request>,
want: Option<Request>,
last_req: Instant,
}

Expand All @@ -21,61 +25,107 @@ impl BlockQueue {
}
}

pub(crate) fn add(&mut self, block: BlockHash) {
if !self.contains(&block) {
self.queue.push_front(block)
pub(crate) fn add(&mut self, request: impl Into<Request>) {
let request: Request = request.into();
if !self.contains(&request.hash) {
self.queue.push_front(request)
}
}

pub(crate) fn contains(&mut self, block: &BlockHash) -> bool {
self.queue.contains(block) || self.want.map_or(false, |hash| hash.eq(block))
self.queue.iter().any(|request| request.hash.eq(block))
|| self
.want
.as_ref()
.map_or(false, |request| request.hash.eq(block))
}

pub(crate) fn pop(&mut self) -> Option<BlockHash> {
match self.want {
Some(block) => {
match self.want.as_mut() {
Some(request) => {
if Instant::now().duration_since(self.last_req).as_secs() < SPAM_LIMIT {
None
} else {
self.last_req = Instant::now();
Some(block)
Some(request.hash)
}
}
None => {
self.last_req = Instant::now();
let block = self.queue.pop_back();
self.want = block;
block
let request = self.queue.pop_back();
let hash = request.as_ref().map(|request| request.hash);
self.want = request;
hash
}
}
}

pub(crate) fn need(&self, block: &BlockHash) -> bool {
self.want.map_or(false, |hash| hash.eq(block))
self.want
.as_ref()
.map_or(false, |request| request.hash.eq(block))
}

pub(crate) fn receive(&mut self, hash: &BlockHash) {
if let Some(want) = self.want {
if want.eq(hash) {
pub(crate) fn receive(&mut self, hash: &BlockHash) -> Option<BlockSender> {
if let Some(request) = self.want.take() {
if request.hash.eq(hash) {
self.want = None;
return request.sender;
} else {
self.want = Some(request);
return None;
}
}
None
}

pub(crate) fn complete(&self) -> bool {
self.want.is_none() && self.queue.is_empty()
}

pub(crate) fn remove(&mut self, hashes: &[BlockHash]) {
self.queue.retain(|hash| !hashes.contains(hash));
if let Some(want) = self.want {
if hashes.contains(&want) {
self.queue.retain(|request| !hashes.contains(&request.hash));
if let Some(want) = self.want.as_ref() {
if hashes.contains(&want.hash) {
self.want = None;
}
}
}
}

#[derive(Debug)]
pub(crate) struct Request {
hash: BlockHash,
sender: Option<BlockSender>,
}

impl Request {
fn new(hash: BlockHash) -> Self {
Self { hash, sender: None }
}

#[cfg(feature = "filter-control")]
fn from_block_request(block_request: BlockRequest) -> Self {
Self {
hash: block_request.hash,
sender: Some(block_request.oneshot),
}
}
}

impl From<BlockHash> for Request {
fn from(value: BlockHash) -> Self {
Request::new(value)
}
}

#[cfg(feature = "filter-control")]
impl From<BlockRequest> for Request {
fn from(value: BlockRequest) -> Self {
Request::from_block_request(value)
}
}

#[cfg(test)]
mod test {
use std::str::FromStr;
Expand All @@ -101,12 +151,18 @@ mod test {
assert_eq!(queue.queue.len(), 3);
assert_eq!(queue.pop(), Some(hash_1));
assert_eq!(queue.pop(), None);
assert_eq!(queue.want, Some(hash_1));
assert_eq!(
queue.want.as_ref().map(|request| request.hash),
Some(hash_1)
);
assert!(queue.need(&hash_1));
queue.receive(&hash_1);
assert_eq!(queue.want, None);
assert_eq!(queue.want.as_ref().map(|request| request.hash), None);
assert_eq!(queue.pop(), Some(hash_2));
assert_eq!(queue.want, Some(hash_2));
assert_eq!(
queue.want.as_ref().map(|request| request.hash),
Some(hash_2)
);
assert!(queue.need(&hash_2));
queue.receive(&hash_2);
assert_eq!(queue.pop(), Some(hash_3));
Expand Down Expand Up @@ -145,13 +201,19 @@ mod test {
assert_eq!(queue.pop(), Some(hash_1));
tokio::time::sleep(Duration::from_secs(6)).await;
assert_eq!(queue.pop(), Some(hash_1));
assert_eq!(queue.want, Some(hash_1));
assert_eq!(
queue.want.as_ref().map(|request| request.hash),
Some(hash_1)
);
assert!(queue.need(&hash_1));
queue.receive(&hash_1);
assert!(!queue.need(&hash_1));
assert_eq!(queue.want, None);
assert_eq!(queue.want.as_ref().map(|request| request.hash), None);
assert_eq!(queue.pop(), Some(hash_2));
assert_eq!(queue.want, Some(hash_2));
assert_eq!(
queue.want.as_ref().map(|request| request.hash),
Some(hash_2)
);
assert!(queue.need(&hash_2));
queue.receive(&hash_2);
assert_eq!(queue.pop(), Some(hash_3));
Expand Down Expand Up @@ -187,10 +249,13 @@ mod test {
queue.add(hash_1);
assert_eq!(queue.queue.len(), 3);
assert_eq!(queue.pop(), Some(hash_1));
assert_eq!(queue.want, Some(hash_1));
assert_eq!(
queue.want.as_ref().map(|request| request.hash),
Some(hash_1)
);
queue.remove(&[hash_1]);
assert!(!queue.need(&hash_1));
assert_eq!(queue.want, None);
assert_eq!(queue.want.as_ref().map(|request| request.hash), None);
queue.remove(&[hash_2]);
assert_eq!(queue.queue.len(), 1);
assert_eq!(queue.pop(), Some(hash_3));
Expand Down
37 changes: 31 additions & 6 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use super::{
header_chain::HeaderChain,
};
#[cfg(feature = "filter-control")]
use crate::core::error::FetchBlockError;
#[cfg(feature = "filter-control")]
use crate::core::messages::BlockRequest;
#[cfg(feature = "filter-control")]
use crate::IndexedFilter;
use crate::{
chain::header_batch::HeadersBatch,
Expand Down Expand Up @@ -844,10 +848,20 @@ impl<H: HeaderStore> Chain<H> {
if !block.check_merkle_root() {
return Err(BlockScanError::InvalidMerkleRoot);
}
self.block_queue.receive(&block_hash);
self.dialog
.send_data(NodeMessage::Block(IndexedBlock::new(height, block)))
.await;
let sender = self.block_queue.receive(&block_hash);
match sender {
Some(sender) => {
let send_result = sender.send(Ok(IndexedBlock::new(height, block)));
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await
};
}
None => {
self.dialog
.send_data(NodeMessage::Block(IndexedBlock::new(height, block)))
.await;
}
}
Ok(())
}

Expand All @@ -866,8 +880,19 @@ impl<H: HeaderStore> Chain<H> {

// Explicitly request a block
#[cfg(feature = "filter-control")]
pub(crate) fn get_block(&mut self, hash: BlockHash) {
self.block_queue.add(hash)
pub(crate) async fn get_block(&mut self, request: BlockRequest) {
let height_opt = self.height_of_hash(request.hash).await;
if height_opt.is_none() {
let err_reponse = request.oneshot.send(Err(FetchBlockError::UnknownHash));
if err_reponse.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await;
}
} else {
self.dialog
.send_dialog(format!("Adding block {} to queue", request.hash))
.await;
self.block_queue.add(request)
}
}

// Reset the compact filter queue because we received a new block
Expand Down
8 changes: 6 additions & 2 deletions src/core/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::{collections::HashSet, path::PathBuf, time::Duration};
#[cfg(not(feature = "filter-control"))]
use std::collections::HashSet;
use std::{path::PathBuf, time::Duration};

use bitcoin::{Network, ScriptBuf};
use bitcoin::Network;
#[cfg(not(feature = "filter-control"))]
use bitcoin::ScriptBuf;

use super::{client::Client, config::NodeConfig, node::Node, FilterSyncPolicy};
#[cfg(feature = "database")]
Expand Down
36 changes: 23 additions & 13 deletions src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bitcoin::block::Header;
#[cfg(feature = "filter-control")]
use bitcoin::BlockHash;
#[cfg(not(feature = "filter-control"))]
use bitcoin::ScriptBuf;
use std::time::Duration;
use tokio::sync::broadcast;
Expand All @@ -9,6 +10,8 @@ use tokio::sync::mpsc::Sender;

use crate::{IndexedBlock, TrustedPeer, TxBroadcast};

#[cfg(feature = "filter-control")]
use super::{error::FetchBlockError, messages::BlockRequest};
use super::{
error::{ClientError, FetchHeaderError},
messages::{ClientMessage, HeaderRequest, NodeMessage, SyncUpdate},
Expand Down Expand Up @@ -238,6 +241,26 @@ macro_rules! impl_core_client {
.map_err(|_| FetchHeaderError::RecvError)?
}

/// Request a block be fetched.
///
/// # Errors
///
/// If the node has stopped running.
#[cfg(feature = "filter-control")]
pub async fn request_block(
&self,
block_hash: BlockHash,
) -> Result<IndexedBlock, FetchBlockError> {
let (tx, rx) =
tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
let message = BlockRequest::new(tx, block_hash);
self.ntx
.send(ClientMessage::GetBlock(message))
.await
.map_err(|_| FetchBlockError::SendError)?;
rx.await.map_err(|_| FetchBlockError::RecvError)?
}

/// Starting at the configured anchor checkpoint, look for block inclusions with newly added scripts.
///
/// # Errors
Expand Down Expand Up @@ -289,19 +312,6 @@ macro_rules! impl_core_client {
.await
.map_err(|_| ClientError::SendError)
}

/// Request a block be fetched. The block will be emitted as a [`NodeMessage::Block`](crate::core::messages::NodeMessage) event.
///
/// # Errors
///
/// If the node has stopped running.
#[cfg(feature = "filter-control")]
pub async fn request_block(&self, block_hash: BlockHash) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::GetBlock(block_hash))
.await
.map_err(|_| ClientError::SendError)
}
}
};
}
Expand Down
Loading

0 comments on commit 183d397

Please sign in to comment.