Skip to content

Commit

Permalink
Merge pull request #27 from rustaceanrob/addr-filter
Browse files Browse the repository at this point in the history
multi: [chain] better err handle, [peer] less addr filter
  • Loading branch information
rustaceanrob authored Jun 30, 2024
2 parents 520b77d + 8a13efa commit 13e711a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 64 deletions.
11 changes: 3 additions & 8 deletions CHECKLIST.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
- [x] Persist to storage
- [x] Determine if the block hash or height should be the primary key
- [x] Speed up writes with pointers
- [x] Add "write volatile" to write over heights
- [x] Add method to write over heights in reorg
- [x] Move headers to DB when the `BTreeMap` is large
- [x] Exponential backoff for locators

#### Filters
Expand Down Expand Up @@ -134,10 +135,4 @@
- [x] 1.63, stable, beta, nightly
- [x] Format and clippy
- [ ] Regtest sync with Bitcoin Core
- [x] On PR

#### Bindings

- [ ] Add UniFFI to repository
- [ ] Build UDL
- [ ] Build for Python
- [x] On PR
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ bitcoin = { version = "0.32.0", features = [
"std",
"serde",
], default-features = false }
# Enable the tokio-console task and poll observations
# console-subscriber = "0.2.0"
rand = "0.8.0"
thiserror = { version = "1" }
tokio = { version = "1", default-features = false, features = [
Expand Down Expand Up @@ -49,6 +47,8 @@ tracing-subscriber = "0.3"
tokio = { version = "1", default-features = false, features = [
"full",
] } # add feature "tracing" to use the console
# Enable the tokio-console task and poll observations
# console-subscriber = "0.3.0"

[lib]
name = "kyoto"
Expand Down
5 changes: 4 additions & 1 deletion example/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use kyoto::chain::checkpoints::SIGNET_HEADER_CP;
use kyoto::db::memory::peers::StatelessPeerStore;
use kyoto::db::sqlite::headers::SqliteHeaderDb;
use kyoto::node::messages::NodeMessage;
use kyoto::BlockHash;
use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder};
Expand Down Expand Up @@ -32,6 +33,8 @@ async fn main() {
let peer = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
// Limited devices may not save any peers to disk
let peer_store = StatelessPeerStore::new();
// To handle reorgs, it is still recommended to store block headers
let header_store = SqliteHeaderDb::new(bitcoin::Network::Signet, None).unwrap();
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
Expand All @@ -47,7 +50,7 @@ async fn main() {
// We only maintain a list of 32 peers in memory
.peer_db_size(32)
// Build without the default databases
.build_with_databases(peer_store, ())
.build_with_databases(peer_store, header_store)
.await;
// Run the node
tokio::task::spawn(async move { node.run().await });
Expand Down
61 changes: 28 additions & 33 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,36 +548,32 @@ impl Chain {
}
}
// Did we request up to this stop hash. We should have caught if this was a repeated message.
if let Some(prev_stophash) = self.cf_header_chain.last_stop_hash_request() {
if prev_stophash.ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
}
} else {
// If we never asked for a stophash before this was unsolitited
return Err(CFHeaderSyncError::UnexpectedCFHeaderMessage);
let prev_stophash = self
.cf_header_chain
.last_stop_hash_request()
.ok_or(CFHeaderSyncError::UnexpectedCFHeaderMessage)?;
if prev_stophash.ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
}
// Did they send us the right amount of headers
let expected_stop_header =
let stop_hash =
// This call may or may not retrieve the hash from disk
self.blockhash_at_height(self.cf_header_chain.height() + batch.len() as u32).await;
if let Some(stop_header) = expected_stop_header {
if stop_header.ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
}
} else {
return Err(CFHeaderSyncError::HeaderChainIndexOverflow);
self.blockhash_at_height(self.cf_header_chain.height() + batch.len() as u32)
.await
.ok_or(CFHeaderSyncError::HeaderChainIndexOverflow)?;
if stop_hash.ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
}
Ok(())
}

// We need to make this public for new peers that connect to us throughout syncing the filter headers
pub(crate) async fn next_cf_header_message(&mut self) -> GetCFHeaders {
let stop_hash_index = self.cf_header_chain.height() + CF_HEADER_BATCH_SIZE + 1;
let stop_hash = if let Some(hash) = self.blockhash_at_height(stop_hash_index).await {
hash
} else {
self.tip()
};
let stop_hash = self
.blockhash_at_height(stop_hash_index)
.await
.unwrap_or(self.tip());
self.cf_header_chain.set_last_stop_hash(stop_hash);
GetCFHeaders {
filter_type: 0x00,
Expand Down Expand Up @@ -622,29 +618,28 @@ impl Chain {
.await;
}
self.filter_chain.put_hash(filter_message.block_hash).await;
if let Some(stop_hash) = self.filter_chain.last_stop_hash_request() {
if filter_message.block_hash.eq(stop_hash) {
if !self.is_filters_synced() {
Ok(Some(self.next_filter_message().await))
} else {
Ok(None)
}
let stop_hash = self
.filter_chain
.last_stop_hash_request()
.ok_or(CFilterSyncError::UnrequestedStophash)?;
if filter_message.block_hash.eq(&stop_hash) {
if !self.is_filters_synced() {
Ok(Some(self.next_filter_message().await))
} else {
Ok(None)
}
} else {
Err(CFilterSyncError::UnrequestedStophash)
Ok(None)
}
}

// Next filter message, if there is one
pub(crate) async fn next_filter_message(&mut self) -> GetCFilters {
let stop_hash_index = self.filter_chain.height() + FILTER_BATCH_SIZE + 1;
let stop_hash = if let Some(hash) = self.blockhash_at_height(stop_hash_index).await {
hash
} else {
self.tip()
};
let stop_hash = self
.blockhash_at_height(stop_hash_index)
.await
.unwrap_or(self.tip());
self.dialog
.chain_update(
self.height(),
Expand Down
22 changes: 2 additions & 20 deletions src/peers/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,10 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
})),
NetworkMessage::Verack => Some(PeerMessage::Verack),
NetworkMessage::Addr(addresses) => {
let last_month = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs()
- ONE_MONTH;
let addresses: Vec<Address> = addresses
.iter()
.filter(|f| {
f.1.services.has(ServiceFlags::COMPACT_FILTERS)
&& f.1.services.has(ServiceFlags::WITNESS)
})
.filter(|f| f.1.services.has(ServiceFlags::COMPACT_FILTERS))
.filter(|f| f.1.socket_addr().is_ok())
.filter(|f| f.0 > last_month as u32)
.map(|(_, addr)| addr.clone())
.collect();
Some(PeerMessage::Addr(addresses))
Expand Down Expand Up @@ -183,19 +174,10 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
NetworkMessage::FeeFilter(_) => None,
NetworkMessage::WtxidRelay => None,
NetworkMessage::AddrV2(addresses) => {
let last_month = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs()
- ONE_MONTH;
let addresses: Vec<Address> = addresses
.iter()
.filter(|f| {
f.services.has(ServiceFlags::COMPACT_FILTERS)
&& f.services.has(ServiceFlags::WITNESS)
})
.filter(|f| f.services.has(ServiceFlags::COMPACT_FILTERS))
.filter(|f| f.socket_addr().is_ok())
.filter(|f| f.time > last_month as u32)
.map(|addr| match addr.socket_addr().unwrap().ip() {
std::net::IpAddr::V4(ip) => Address {
services: addr.services,
Expand Down

0 comments on commit 13e711a

Please sign in to comment.