Skip to content

Commit

Permalink
Merge pull request #42 from oreoslabs/developer
Browse files Browse the repository at this point in the history
Developer
  • Loading branch information
whohideonbug authored Jan 9, 2025
2 parents c06033d + 6309c66 commit 21404f8
Show file tree
Hide file tree
Showing 42 changed files with 1,350 additions and 1,306 deletions.
1,045 changes: 354 additions & 691 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
resolver = "2"
members = [
"crates/networking",
"crates/constants",
"crates/db_handler",
"crates/oreo_errors",
"crates/server",
"crates/utils",
"crates/prover",
"crates/dworker",
"crates/dservice",
"crates/chain_loader", "crates/transfer-tool",
"crates/chain_loader",
"crates/params",
]
4 changes: 2 additions & 2 deletions crates/chain_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tokio = { version = "1.35.1", features = ["full"] }
tracing = "0.1.40"
utils = { path = "../utils" }
networking = { path = "../networking" }
constants = { path = "../constants" }
oreo_errors = { path = "../oreo_errors" }
db_handler = { path = "../db_handler" }
serde_json = "1.0.117"
serde_json = "1.0.117"
params = { path = "../params" }
58 changes: 0 additions & 58 deletions crates/chain_loader/src/lib.rs

This file was deleted.

142 changes: 124 additions & 18 deletions crates/chain_loader/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,140 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use anyhow::Result;
use chain_loader::load_checkpoint;
use clap::Parser;
use db_handler::{DBHandler, DbConfig, PgHandler};
use utils::{handle_signals, initialize_logger};
use db_handler::{load_db, DBTransaction, InnerBlock, Json};
use networking::{rpc_abi::RpcBlock, rpc_handler::RpcHandler};
use params::{mainnet::Mainnet, network::Network, testnet::Testnet};
use tokio::{sync::oneshot, time::sleep};
use tracing::{error, info};
use utils::{blocks_range, initialize_logger, initialize_logger_filter, EnvFilter};

#[derive(Parser, Debug, Clone)]
struct Command {
struct ChainLoader {
/// The path to db config file
#[clap(long)]
pub dbconfig: String,
dbconfig: String,
/// Set your logger level
#[clap(short, long, default_value = "0")]
pub verbosity: u8,
verbosity: u8,
/// The Ironfish rpc node to connect to
#[clap(short, long, default_value = "127.0.0.1:9092")]
pub node: String,
node: String,
/// The network id, 0 for mainnet, 1 for testnet.
#[clap(long)]
network: u8,
}

impl ChainLoader {
async fn start<N: Network>(&self, shut_down: Arc<AtomicBool>) -> Result<()> {
let rpc_handler = RpcHandler::new(self.node.clone());
let genesis_block = rpc_handler
.get_latest_block()
.unwrap()
.data
.genesis_block_identifier;
if genesis_block.hash.to_lowercase() != N::GENESIS_BLOCK_HASH.to_lowercase() {
panic!("Genesis block doesn't match");
}

let db_handler = { load_db(self.dbconfig.clone()).unwrap() };

for group in blocks_range(1..N::LOCAL_BLOCKS_CHECKPOINT + 1, N::PRIMARY_BATCH) {
if shut_down.load(Ordering::Relaxed) {
info!("Chainloader should exit now");
break;
}
if db_handler
.get_blocks(group.start as i64, group.end as i64)
.await
.is_ok()
{
continue;
}
let results = {
loop {
match rpc_handler.get_blocks(group.start, group.end) {
Ok(res) => break res,
Err(e) => {
error!("Failed to get rpc blocks {}", e);
}
}
sleep(Duration::from_secs(1)).await;
}
};
let blocks: Vec<RpcBlock> = results
.data
.blocks
.into_iter()
.map(|item| item.block)
.collect();
let inner_blocks = blocks
.into_iter()
.map(|rpc| InnerBlock {
hash: rpc.hash.clone(),
sequence: rpc.sequence as i64,
transactions: Json(
rpc.transactions
.into_iter()
.map(|tx| DBTransaction {
hash: tx.hash,
serialized_notes: tx
.notes
.into_iter()
.map(|n| n.serialized)
.collect(),
})
.collect(),
),
})
.collect();
if group.end % 1000 == 0 {
info!(
"save blocks from {} to {} in local db",
group.start, group.end
);
}
let _ = db_handler.save_blocks(inner_blocks).await;
}
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Command::parse();
let Command {
dbconfig,
verbosity,
node,
} = args;
initialize_logger(verbosity);
handle_signals().await?;
let db_config = DbConfig::load(dbconfig).unwrap();
let db_handler = PgHandler::from_config(&db_config);
load_checkpoint(node, db_handler).await?;
let loader = ChainLoader::parse();
initialize_logger(loader.verbosity);
initialize_logger_filter(EnvFilter::from_default_env());
let shut_down = Arc::new(AtomicBool::new(false));
handle_signals(shut_down.clone()).await;
match loader.network {
Mainnet::ID => loader.start::<Mainnet>(shut_down).await?,
Testnet::ID => loader.start::<Testnet>(shut_down).await?,
_ => panic!("Invalid network used"),
}
Ok(())
}

async fn handle_signals(shut_down: Arc<AtomicBool>) {
let (router, handler) = oneshot::channel();
tokio::spawn(async move {
let _ = router.send(());
match tokio::signal::ctrl_c().await {
Ok(()) => {
shut_down.store(true, Ordering::SeqCst);
info!("Shutdown signal received, exit after 3 seconds");
sleep(Duration::from_secs(3)).await;
std::process::exit(0);
}
Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error),
}
});
let _ = handler.await;
info!("Shutdown handler installed");
}
8 changes: 0 additions & 8 deletions crates/constants/Cargo.toml

This file was deleted.

15 changes: 0 additions & 15 deletions crates/constants/src/lib.rs

This file was deleted.

2 changes: 1 addition & 1 deletion crates/db_handler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ sqlx = { version = "0.7.0", features = ["runtime-tokio-rustls", "postgres"] }
substring = "1.4.5"
tracing = "0.1.40"
oreo_errors = { path = "../oreo_errors" }
constants = { path = "../constants" }
redis = { version = "0.25.2", features = [
"tokio-comp",
"tokio-native-tls-comp",
] }
params = { path = "../params" }

[dev-dependencies]
sqlx-db-tester = "0.4.0"
Expand Down
10 changes: 10 additions & 0 deletions crates/db_handler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use tracing::info;

use std::{fs, path::Path};

use crate::DBType;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DbConfig {
pub host: String,
Expand Down Expand Up @@ -44,6 +46,14 @@ impl DbConfig {
pub fn url(&self) -> String {
format!("{}/{}", self.server_url(), self.dbname)
}

pub fn protocol(&self) -> DBType {
match self.protocol.as_str() {
"postgres" => DBType::Postgres,
"redis" => DBType::Redis,
_ => DBType::Unknown,
}
}
}

impl DbConfig {
Expand Down
46 changes: 43 additions & 3 deletions crates/db_handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@ mod config;
mod pg_handler;
mod redis_handler;

use std::{path::Path, str::FromStr};

use anyhow::anyhow;
pub use config::DbConfig;
use futures::executor::block_on;
pub use pg_handler::*;
pub use redis_handler::*;

pub use sqlx::types::Json;

use oreo_errors::OreoError;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use sqlx::{postgres::PgConnectOptions, ConnectOptions, FromRow, PgPool};
use substring::Substring;

#[async_trait::async_trait]
pub trait DBHandler {
/// Initialize a DB handler
fn from_config(config: &DbConfig) -> Self;
//// DB type: postgres and redis for now
fn db_type(&self) -> String;
/// Save account in db and return account name
async fn save_account(&self, account: Account, worker_id: u32) -> Result<String, OreoError>;
/// Get account name from db
Expand Down Expand Up @@ -75,3 +79,39 @@ pub struct BonusAddress {
pub address: String,
pub paid: bool,
}

pub enum DBType {
Postgres,
Redis,
Unknown,
}

impl DbConfig {
pub fn build(&self) -> anyhow::Result<Box<dyn DBHandler + Send + Sync>> {
match self.protocol() {
DBType::Postgres => {
let url = self.server_url();
let options = PgConnectOptions::from_str(&url)
.unwrap()
.disable_statement_logging()
.clone();
let pool = block_on(async { PgPool::connect_with(options).await });
match pool {
Ok(pool) => Ok(Box::new(PgHandler::new(pool))),
Err(e) => Err(anyhow!("Failed to connect pgsql {}", e)),
}
}
DBType::Redis => {
let client = RedisClient::connect(&self.server_url(), self.default_pool_size)?;
Ok(Box::new(client))
}
DBType::Unknown => {
panic!("Invalid database used")
}
}
}
}

pub fn load_db(filename: impl AsRef<Path>) -> anyhow::Result<Box<dyn DBHandler + Send + Sync>> {
DbConfig::load(filename)?.build()
}
Loading

0 comments on commit 21404f8

Please sign in to comment.