Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crate: put sql db behind feature #8

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dns-lookup = "2.0.0"
# Enable the tokio-console task and poll observations
# console-subscriber = "0.2.0"
rand = "0.8.0"
rusqlite = { version = "0.31.0", features = ["bundled"] }
rusqlite = { version = "0.31.0", features = ["bundled"], optional = true }
thiserror = { version = "1" }
tokio = { version = "1", default-features = false, features = [
"rt-multi-thread",
Expand All @@ -33,6 +33,10 @@ tokio = { version = "1", default-features = false, features = [
"macros",
] }

[features]
default = ["database"]
database = ["rusqlite"]

[dev-dependencies]
hex = { version = "0.4.0" }
tracing = "0.1"
Expand All @@ -41,6 +45,10 @@ tokio = { version = "1", default-features = false, features = [
"full",
] } # add feature "tracing" to use the console

[lib]
name = "kyoto"
path = "src/lib.rs"

[[example]]
name = "signet"
path = "example/signet.rs"
Expand All @@ -49,6 +57,6 @@ path = "example/signet.rs"
name = "rescan"
path = "example/rescan.rs"

[lib]
name = "kyoto"
path = "src/lib.rs"
[[example]]
name = "memory"
path = "example/memory.rs"
74 changes: 74 additions & 0 deletions example/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use bitcoin::BlockHash;
use kyoto::node::messages::NodeMessage;
use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder};
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

#[tokio::main]
async fn main() {
// Add third-party logging
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Add Bitcoin scripts to scan the blockchain for
let address = bitcoin::Address::from_str("tb1q9pvjqz5u5sdgpatg3wn0ce438u5cyv85lly0pc")
.unwrap()
.require_network(bitcoin::Network::Signet)
.unwrap()
.into();
let mut addresses = HashSet::new();
addresses.insert(address);
// Define a peer to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, mut client) = builder
// Add the peer
.add_peers(vec![(peer, 38333)])
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
.anchor_checkpoint(HeaderCheckpoint::new(
190_000,
BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2")
.unwrap(),
))
// The number of connections we would like to maintain
.num_required_peers(1)
// Create the node and client without the usual SQL databases
.build_node_with_custom_databases((), ())
.await;
// Run the node
tokio::task::spawn(async move { node.run().await });
// Split the client into components that send messages and listen to messages.
// With this construction, different parts of the program can take ownership of
// specific tasks.
let (mut sender, mut receiver) = client.split();
// Continually listen for events until the node is synced to its peers.
loop {
if let Ok(message) = receiver.recv().await {
match message {
NodeMessage::Dialog(d) => tracing::info!("{}", d),
NodeMessage::Warning(e) => tracing::warn!("{}", e),
NodeMessage::Transaction(t) => drop(t),
NodeMessage::Block(b) => drop(b),
NodeMessage::BlocksDisconnected(r) => {
let _ = r;
}
NodeMessage::TxBroadcastFailure => {
tracing::error!("The transaction could not be broadcast.")
}
NodeMessage::Synced(tip) => {
tracing::info!("Synced chain up to block {}", tip.height,);
tracing::info!("Chain tip: {}", tip.hash.to_string(),);
break;
}
}
}
}
let _ = sender.shutdown().await;
tracing::info!("Shutting down");
}
7 changes: 3 additions & 4 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ async fn main() {
let mut addresses = HashSet::new();
addresses.insert(address);
// Add preferred peers to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
// let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
let peer = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
Expand All @@ -33,8 +32,8 @@ async fn main() {
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
.anchor_checkpoint(HeaderCheckpoint::new(
180_000,
BlockHash::from_str("0000000870f15246ba23c16e370a7ffb1fc8a3dcf8cb4492882ed4b0e3d4cd26")
190_000,
BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2")
.unwrap(),
))
// The number of connections we would like to maintain
Expand Down
13 changes: 11 additions & 2 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ use std::net::IpAddr;

use bitcoin::p2p::ServiceFlags;

pub(crate) mod error;
/// Errors a database backend may produce.
pub mod error;
pub(crate) mod peer_man;
#[cfg(feature = "database")]
pub(crate) mod sqlite;
pub(crate) mod traits;
/// Traits that define the header and peer databases.
pub mod traits;

/// A peer that will be saved to the [`traits::PeerStore`].
#[derive(Debug, Clone)]
pub struct PersistedPeer {
/// Canonical IP address of this peer.
pub addr: IpAddr,
/// The port believed to be listening for connections.
pub port: u16,
/// The services this peer may offer.
pub services: ServiceFlags,
/// Have we tried this peer before.
pub tried: bool,
/// Did we ban this peer for faulty behavior.
pub banned: bool,
}

Expand Down
26 changes: 13 additions & 13 deletions src/db/peer_man.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,10 @@ impl PeerManager {
// DNS fails if there is an insufficient number of peers
for peer in new_peers {
db_lock
.update(PersistedPeer::new(
peer,
self.default_port,
ServiceFlags::NONE,
false,
false,
))
.update(
PersistedPeer::new(peer, self.default_port, ServiceFlags::NONE, false, false),
true,
)
.await
.map_err(PeerManagerError::Database)?;
}
Expand Down Expand Up @@ -117,13 +114,16 @@ impl PeerManager {
) -> Result<(), PeerManagerError> {
let mut db_lock = self.db.lock().await;
db_lock
.update(PersistedPeer::new(
addr,
port.unwrap_or(self.default_port),
services.unwrap_or(ServiceFlags::NONE),
.update(
PersistedPeer::new(
addr,
port.unwrap_or(self.default_port),
services.unwrap_or(ServiceFlags::NONE),
tried,
ban,
),
tried,
ban,
))
)
.await
.map_err(PeerManagerError::Database)?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/db/sqlite/peer_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl SqlitePeerDb {

#[async_trait]
impl PeerStore for SqlitePeerDb {
async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError> {
async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError> {
let lock = self.conn.lock().await;
let stmt = if !peer.tried {
let stmt = if !replace {
"INSERT OR IGNORE INTO peers (ip_addr, port, service_flags, tried, banned) VALUES (?1, ?2, ?3, ?4, ?5)"
} else {
"INSERT OR REPLACE INTO peers (ip_addr, port, service_flags, tried, banned) VALUES (?1, ?2, ?3, ?4, ?5)"
Expand Down
17 changes: 13 additions & 4 deletions src/db/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@ use crate::prelude::default_port_from_network;

use super::{error::DatabaseError, PersistedPeer};

/// Methods required to persist the chain of block headers.
#[async_trait]
pub(crate) trait HeaderStore {
pub trait HeaderStore {
/// Load all headers with heights *strictly after* the specified anchor height.
async fn load(&mut self, anchor_height: u32) -> Result<BTreeMap<u32, Header>, DatabaseError>;

/// Write an indexed map of block headers to the database, ignoring if they already exist.
async fn write<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
) -> Result<(), DatabaseError>;

/// Write the headers to the database, replacing headers over the specified height.
async fn write_over<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
height: u32,
) -> Result<(), DatabaseError>;

/// Return the height of a block hash in the database, if it exists.
async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result<Option<u32>, DatabaseError>;
}

Expand Down Expand Up @@ -58,18 +63,22 @@ impl HeaderStore for () {
}
}

/// Methods that define a list of peers on the Bitcoin P2P network.
#[async_trait]
pub(crate) trait PeerStore {
async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError>;
pub trait PeerStore {
/// Add a peer to the database, defining if it should be replaced or not.
async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError>;

/// Get any peer from the database, selected at random.
async fn random(&mut self) -> Result<PersistedPeer, DatabaseError>;

/// The number of peers in the database that are not marked as banned.
async fn num_unbanned(&mut self) -> Result<u32, DatabaseError>;
}

#[async_trait]
impl PeerStore for () {
async fn update(&mut self, _peer: PersistedPeer) -> Result<(), DatabaseError> {
async fn update(&mut self, _peer: PersistedPeer, _replace: bool) -> Result<(), DatabaseError> {
Ok(())
}

Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
//! and vetted.

#![allow(dead_code)]
/// Strucutres related to the blockchain.
/// Strucutres and checkpoints related to the blockchain.
pub mod chain;
mod db;
/// Traits and structures that define the data persistence required for a node.
pub mod db;
mod filters;
/// Tools to build and run a compact block filters node.
pub mod node;
Expand Down
22 changes: 20 additions & 2 deletions src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{collections::HashSet, net::IpAddr, path::PathBuf};

use bitcoin::{Network, ScriptBuf};

use crate::chain::checkpoints::HeaderCheckpoint;
use crate::{
chain::checkpoints::HeaderCheckpoint,
db::traits::{HeaderStore, PeerStore},
};

use super::{client::Client, config::NodeConfig, node::Node};

Expand Down Expand Up @@ -55,8 +58,23 @@ impl NodeBuilder {
}

/// Consume the node builder and receive a [`Node`] and [`Client`].
#[cfg(feature = "database")]
pub async fn build_node(&self) -> (Node, Client) {
Node::new_from_config(&self.config, self.network)
use crate::db::sqlite::{header_db::SqliteHeaderDb, peer_db::SqlitePeerDb};
let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone()).unwrap();
let header_store =
SqliteHeaderDb::new(self.network, self.config.data_path.clone()).unwrap();
Node::new_from_config(&self.config, self.network, peer_store, header_store)
.await
.unwrap()
}

pub async fn build_node_with_custom_databases(
&self,
peer_store: impl PeerStore + Send + Sync + 'static,
header_store: impl HeaderStore + Send + Sync + 'static,
) -> (Node, Client) {
Node::new_from_config(&self.config, self.network, peer_store, header_store)
.await
.unwrap()
}
Expand Down
Loading
Loading