Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: add rescan #4

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion example/rescan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() {
.unwrap(),
))
// The number of connections we would like to maintain
.num_required_peers(2)
.num_required_peers(3)
// Create the node and client
.build_node()
.await;
Expand Down Expand Up @@ -75,6 +75,7 @@ async fn main() {
sender.add_scripts(new_scripts).await.unwrap();
// // Tell the node to look for these new scripts
sender.rescan().await.unwrap();
tracing::info!("Starting rescan");
loop {
if let Ok(message) = receiver.recv().await {
match message {
Expand Down
5 changes: 5 additions & 0 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,11 @@ impl Chain {
self.scripts.insert(script);
}
}

//
pub(crate) async fn clear_filters(&mut self) {
self.filter_chain.clear_cache().await;
}
}

#[cfg(test)]
Expand Down
14 changes: 0 additions & 14 deletions src/filters/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,4 @@ impl FilterChain {
pub(crate) fn last_stop_hash_request(&mut self) -> &Option<BlockHash> {
&self.prev_stophash_request
}

// pub(crate) fn filter_at_height(&self, height: usize) -> Option<Filter> {
// let adjusted_height = self.adjusted_height(height);
// match adjusted_height {
// Some(height) => {
// if let Some(filter) = self.chain.get(height) {
// Some(filter.clone())
// } else {
// None
// }
// }
// None => None,
// }
// }
}
42 changes: 32 additions & 10 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Node {
node_map.send_random(block_request).await;
}
// If we have a transaction to broadcast and we are connected to peers, we should broadcast it
if node_map.live().ge(&1) && !tx_broadcaster.is_empty() {
if node_map.live().ge(&self.required_peers) && !tx_broadcaster.is_empty() {
let transaction = tx_broadcaster.next().unwrap();
match transaction.broadcast_policy {
TxBroadcastPolicy::AllPeers => {
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Node {
PeerMessage::Filter(filter) => {
match self.handle_filter(peer_thread.nonce, filter).await {
Some(response) => {
node_map.broadcast(response).await;
node_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
}
Expand Down Expand Up @@ -302,7 +302,11 @@ impl Node {
ClientMessage::Shutdown => return Ok(()),
ClientMessage::Broadcast(transaction) => tx_broadcaster.add(transaction),
ClientMessage::AddScripts(scripts) => self.add_scripts(scripts).await,
ClientMessage::Rescan => return Ok(()),
ClientMessage::Rescan => {
if let Some(response) = self.rescan().await {
node_map.broadcast(response).await;
}
},
}
}
}
Expand Down Expand Up @@ -483,13 +487,13 @@ impl Node {
}
}
CFHeaderSyncResult::Dispute(_) => {
// Request the block from the peer
// TODO: Request the filter and block from the peer
self.dialog
.send_warning(
"Found a conflict while peers are sending filter headers".into(),
)
.await;
None
Some(MainThreadMessage::Disconnect)
}
},
Err(e) => {
Expand Down Expand Up @@ -587,6 +591,29 @@ impl Node {
}
}

// Add more scripts to the chain to look for. Does not imply a rescan.
async fn add_scripts(&mut self, scripts: HashSet<ScriptBuf>) {
let mut chain = self.chain.lock().await;
chain.put_scripts(scripts);
}

// Clear the filter hash cache and redownload the filters.
async fn rescan(&mut self) -> Option<MainThreadMessage> {
let mut state = self.state.write().await;
let mut chain = self.chain.lock().await;
match *state {
NodeState::Behind => None,
NodeState::HeadersSynced => None,
_ => {
chain.clear_filters().await;
*state = NodeState::FilterHeadersSynced;
Some(MainThreadMessage::GetFilters(
chain.next_filter_message().await,
))
}
}
}

// First we seach the whitelist for peers that we trust. Then, depending on the state
// we either need to catch up on block headers or we may start requesting filters and blocks.
// When requesting filters, we try to select peers that have signaled for CF support.
Expand Down Expand Up @@ -666,9 +693,4 @@ impl Node {
}
}
}

async fn add_scripts(&mut self, scripts: HashSet<ScriptBuf>) {
let mut chain = self.chain.lock().await;
chain.put_scripts(scripts);
}
}
Loading