Skip to content

Commit

Permalink
Merge pull request #31 from rustaceanrob/load-headers
Browse files Browse the repository at this point in the history
chain: load headers outside of constructors
  • Loading branch information
rustaceanrob authored Jul 7, 2024
2 parents 040405c + 371e8c2 commit 872012e
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 148 deletions.
3 changes: 1 addition & 2 deletions example/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ async fn main() {
// We only maintain a list of 32 peers in memory
.peer_db_size(32)
// Build without the default databases
.build_with_databases(peer_store, header_store)
.await;
.build_with_databases(peer_store, header_store);
// Run the node
tokio::task::spawn(async move { node.run().await });
// Split the client into components that send messages and listen to messages.
Expand Down
3 changes: 1 addition & 2 deletions example/rescan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ async fn main() {
// The number of connections we would like to maintain
.num_required_peers(1)
// Create the node and client, choosing not to store headers
.build_node()
.await;
.build_node();
// Run the node and wait for the sync message;
tokio::task::spawn(async move { node.run().await });
tracing::info!("Running the node and waiting for a sync message. Please wait a minute!");
Expand Down
7 changes: 3 additions & 4 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ async fn main() {
// The number of connections we would like to maintain
.num_required_peers(2)
// Create the node and client
.build_node()
.await;
.build_node();
// Check if the node is running. Another part of the program may be giving us the node.
if !node.is_running() {
tokio::task::spawn(async move { node.run().await });
Expand Down Expand Up @@ -77,8 +76,8 @@ async fn main() {
let recent = update.recent_history;
tracing::info!("Recent history:");
for (height, hash) in recent {
tracing::info!("Synced chain up to block {}", height);
tracing::info!("Chain tip: {}", hash.block_hash());
tracing::info!("Height: {}", height);
tracing::info!("Hash: {}", hash.block_hash());
}
break;
}
Expand Down
138 changes: 73 additions & 65 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,58 +53,20 @@ pub(crate) struct Chain {
}

impl Chain {
pub(crate) async fn new(
pub(crate) fn new(
network: &Network,
scripts: HashSet<ScriptBuf>,
anchor: HeaderCheckpoint,
mut checkpoints: HeaderCheckpoints,
mut dialog: Dialog,
mut db: impl HeaderStore + Send + Sync + 'static,
checkpoints: HeaderCheckpoints,
dialog: Dialog,
db: impl HeaderStore + Send + Sync + 'static,
quorum_required: usize,
) -> Result<Self, HeaderPersistenceError> {
) -> Self {
let params = params_from_network(network);
let mut loaded_headers = db
.load(anchor.height)
.await
.map_err(HeaderPersistenceError::Database)?;
if loaded_headers.len().gt(&0) {
if loaded_headers
.values()
.take(1)
.copied()
.collect::<Vec<Header>>()
.first()
.unwrap()
.prev_blockhash
.ne(&anchor.hash)
{
dialog
.send_warning("Checkpoint anchor mismatch".into())
.await;
// The header chain did not align, so just start from the anchor
loaded_headers = BTreeMap::new();
} else if loaded_headers
.iter()
.zip(loaded_headers.iter().skip(1))
.any(|(first, second)| first.1.block_hash().ne(&second.1.prev_blockhash))
{
dialog
.send_warning("Blockhash pointer mismatch".into())
.await;
return Err(HeaderPersistenceError::HeadersDoNotLink);
}
loaded_headers.iter().for_each(|header| {
if let Some(checkpoint) = checkpoints.next() {
if header.1.block_hash().eq(&checkpoint.hash) {
checkpoints.advance()
}
}
})
};
let header_chain = HeaderChain::new(anchor, loaded_headers);
let header_chain = HeaderChain::new(anchor);
let cf_header_chain = CFHeaderChain::new(anchor, quorum_required);
let filter_chain = FilterChain::new(anchor);
Ok(Chain {
Chain {
header_chain,
checkpoints,
params,
Expand All @@ -115,7 +77,7 @@ impl Chain {
scripts,
block_queue: BlockQueue::new(),
dialog,
})
}
}

// Top of the chain
Expand Down Expand Up @@ -274,6 +236,53 @@ impl Chain {
}
}

// Load in the headers
pub(crate) async fn load_headers(&mut self) -> Result<(), HeaderPersistenceError> {
let loaded_headers = self
.db
.lock()
.await
.load_after(self.height())
.await
.map_err(HeaderPersistenceError::Database)?;
if loaded_headers.len().gt(&0) {
if loaded_headers
.values()
.take(1)
.copied()
.collect::<Vec<Header>>()
.first()
.unwrap()
.prev_blockhash
.ne(&self.tip())
{
self.dialog
.send_warning("Unlinkable anchor. The headers stored in the database have no connection to this configured anchor.".into())
.await;
// The header chain did not align, so just start from the anchor
return Err(HeaderPersistenceError::CannotLocateHistory);
} else if loaded_headers
.iter()
.zip(loaded_headers.iter().skip(1))
.any(|(first, second)| first.1.block_hash().ne(&second.1.prev_blockhash))
{
self.dialog
.send_warning("Blockhash pointer mismatch".into())
.await;
return Err(HeaderPersistenceError::HeadersDoNotLink);
}
loaded_headers.iter().for_each(|header| {
if let Some(checkpoint) = self.checkpoints.next() {
if header.1.block_hash().eq(&checkpoint.hash) {
self.checkpoints.advance()
}
}
})
};
self.header_chain.set_headers(loaded_headers);
Ok(())
}

// If the number of headers in memory gets too large, move some of them to the disk
pub(crate) async fn manage_memory(&mut self) {
if self.header_chain.inner_len() > MAX_HEADER_SIZE {
Expand Down Expand Up @@ -460,35 +469,36 @@ impl Chain {
// This call occurs if we sync to a block that is later reorganized out of the chain,
// but we have restarted our node in between these events.
async fn load_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> {
let mut db_lock = self.db.lock().await;
let prev_hash = header_batch.first().prev_blockhash;
let maybe_height = db_lock
.height_of(&prev_hash)
.await
.map_err(|_| HeaderSyncError::DbError)?;
let maybe_height = {
let mut db_lock = self.db.lock().await;
db_lock
.height_of(&prev_hash)
.await
.map_err(|_| HeaderSyncError::DbError)?
};
match maybe_height {
Some(height) => {
// This is a very generous check to ensure a peer cannot get us to load an
// absurd amount of headers into RAM. Because headers come in batches of 2,000,
// we wouldn't accept a fork of a depth more than around 2,000 anyway.
// The only reorgs that have ever been recorded are of depth 1.
if self.height() - height > MAX_REORG_DEPTH {
Err(HeaderSyncError::FloatingHeaders)
return Err(HeaderSyncError::FloatingHeaders);
} else {
let older_anchor = HeaderCheckpoint::new(height, prev_hash);
let loaded_headers = db_lock
.load(older_anchor.height)
.await
.map_err(|_| HeaderSyncError::DbError)?;
self.header_chain = HeaderChain::new(older_anchor, loaded_headers);
self.header_chain = HeaderChain::new(older_anchor);
self.cf_header_chain =
CFHeaderChain::new(older_anchor, self.cf_header_chain.quorum_required());
self.filter_chain = FilterChain::new(older_anchor);
Ok(())
}
}
None => Err(HeaderSyncError::FloatingHeaders),
None => return Err(HeaderSyncError::FloatingHeaders),
}
self.load_headers()
.await
.map_err(|_| HeaderSyncError::DbError)?;
Ok(())
}

// Sync the compact filter headers, possibly encountering conflicts
Expand Down Expand Up @@ -750,7 +760,7 @@ mod tests {

use super::Chain;

async fn new_regtest(anchor: HeaderCheckpoint) -> Chain {
fn new_regtest(anchor: HeaderCheckpoint) -> Chain {
let (sender, _) = tokio::sync::broadcast::channel::<NodeMessage>(1);
let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest);
checkpoints.prune_up_to(anchor);
Expand All @@ -763,8 +773,6 @@ mod tests {
(),
1,
)
.await
.unwrap()
}

#[tokio::test]
Expand All @@ -774,7 +782,7 @@ mod tests {
BlockHash::from_str("62c28f380692524a3a8f1fc66252bc0eb31d6b6a127d2263bdcbee172529fe16")
.unwrap(),
);
let mut chain = new_regtest(gen).await;
let mut chain = new_regtest(gen);
let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap();
let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap();
let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap();
Expand Down Expand Up @@ -822,7 +830,7 @@ mod tests {
BlockHash::from_str("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206")
.unwrap(),
);
let mut chain = new_regtest(gen).await;
let mut chain = new_regtest(gen);
let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f047eb4d0fe76345e307d0e020a079cedfa37101ee7ac84575cf829a611b0f84bc4805e66ffff7f2001000000").unwrap()).unwrap();
let block_2: Header = deserialize(&hex::decode("00000020299e41732deb76d869fcdb5f72518d3784e99482f572afb73068d52134f1f75e1f20f5da8d18661d0f13aa3db8fff0f53598f7d61f56988a6d66573394b2c6ffc5805e66ffff7f2001000000").unwrap()).unwrap();
let block_3: Header = deserialize(&hex::decode("00000020b96feaa82716f11befeb608724acee4743e0920639a70f35f1637a88b8b6ea3471f1dbedc283ce6a43a87ed3c8e6326dae8d3dbacce1b2daba08e508054ffdb697815e66ffff7f2001000000").unwrap()).unwrap();
Expand Down Expand Up @@ -850,7 +858,7 @@ mod tests {
BlockHash::from_str("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206")
.unwrap(),
);
let mut chain = new_regtest(gen).await;
let mut chain = new_regtest(gen);
let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb57241f855e66ffff7f2000000000").unwrap()).unwrap();
let block_2: Header = deserialize(&hex::decode("00000020c81cedd6a989939936f31448e49d010a13c2e750acf02d3fa73c9c7ecfb9476e798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c420855e66ffff7f2000000000").unwrap()).unwrap();
let batch_1 = vec![block_1, block_2];
Expand Down
3 changes: 3 additions & 0 deletions src/chain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum HeaderPersistenceError {
/// Some predefined checkpoint does not match.
#[error("The headers loaded do not match a known checkpoint.")]
MismatchedCheckpoints,
/// A user tried to retrieve headers too far in the past for what is in their database.
#[error("The configured anchor checkpoint is too far in the past compared to previous syncs. The database cannot reconstruct the chain.")]
CannotLocateHistory,
/// A database error.
#[error("The headers could not be loaded from sqlite.")]
Database(DatabaseError),
Expand Down
Loading

0 comments on commit 872012e

Please sign in to comment.