Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add anyhow error handling #339

Merged
merged 4 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions catalyst-gateway/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions catalyst-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ tokio = "1"
dotenvy = "0.15"
local-ip-address = "0.5.7"
gethostname = "0.4.3"
hex = "0.4.3"
async-recursion = "1.0.5"
pallas = "0.23.0"
anyhow = "1.0.71"
cardano-chain-follower= { git = "https://github.com/input-output-hk/hermes.git", version="0.0.1" }

[workspace.lints.rust]
warnings = "deny"
Expand Down
26 changes: 7 additions & 19 deletions catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,25 @@ repository.workspace = true
workspace = true

[dependencies]

bb8 = { workspace = true }
bb8-postgres = { workspace = true }
tokio-postgres = { workspace = true, features = [
"with-chrono-0_4",
"with-serde_json-1",
"with-time-0_3",
] }

clap = { workspace = true, features = ["derive", "env"] }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "time"] }

serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }

tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }
thiserror = { workspace = true }

rust_decimal = { workspace = true, features = [
"serde-with-float",
"db-tokio-postgres",
] }

chrono = { workspace = true }

poem = { workspace = true, features = [
"embed",
"prometheus",
Expand All @@ -56,8 +49,6 @@ poem-openapi = { workspace = true, features = [
"chrono",
] }
poem-extensions = { workspace = true }

# Metrics - Poem
prometheus = { workspace = true }
cryptoxide = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
Expand All @@ -68,13 +59,10 @@ panic-message = { workspace = true }
cpu-time = { workspace = true }
ulid = { workspace = true, features = ["serde", "uuid"] }
rust-embed = { workspace = true }
local-ip-address.workspace = true
gethostname.workspace = true

hex = "0.4.3"
async-recursion = "1.0.5"


cardano-chain-follower= { git = "https://github.com/input-output-hk/hermes.git", version="0.0.1"}

pallas = { version = "0.23.0" }
local-ip-address = { workspace = true }
gethostname = { workspace = true }
hex = { workspace = true }
async-recursion = { workspace = true }
pallas = { workspace = true }
cardano-chain-follower= { workspace = true }
anyhow = { workspace = true }
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Cli {
/// - Failed to initialize the logger with the specified log level.
/// - Failed to create a new `State` with the provided database URL.
/// - Failed to run the service on the specified address.
pub(crate) async fn exec(self) -> Result<(), Box<dyn std::error::Error>> {
pub(crate) async fn exec(self) -> anyhow::Result<()> {
match self {
Self::Run(settings) => {
logger::init(settings.log_level)?;
Expand Down
29 changes: 18 additions & 11 deletions catalyst-gateway/bin/src/follower.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Logic for orchestrating followers
use std::{error::Error, path::PathBuf, str::FromStr, sync::Arc};
use std::{path::PathBuf, str::FromStr, sync::Arc};

/// Handler for follower tasks, allows for control over spawned follower threads
pub type ManageTasks = JoinHandle<()>;
Expand Down Expand Up @@ -30,7 +30,7 @@ const DATA_NOT_STALE: i64 = 1;
pub(crate) async fn start_followers(
configs: (Vec<NetworkMeta>, FollowerMeta), db: Arc<EventDB>, data_refresh_tick: u64,
check_config_tick: u64, machine_id: String,
) -> Result<(), Box<dyn Error>> {
) -> anyhow::Result<()> {
// spawn followers and obtain thread handlers for control and future cancellation
let follower_tasks = spawn_followers(
configs.clone(),
Expand Down Expand Up @@ -76,7 +76,7 @@ pub(crate) async fn start_followers(
)
.await?;
},
None => return Err("Config has been deleted...".into()),
None => return Err(anyhow::anyhow!("Config has been deleted...")),
}

Ok(())
Expand All @@ -86,7 +86,7 @@ pub(crate) async fn start_followers(
async fn spawn_followers(
configs: (Vec<NetworkMeta>, FollowerMeta), db: Arc<EventDB>, data_refresh_tick: u64,
machine_id: String,
) -> Result<Vec<ManageTasks>, Box<dyn Error>> {
) -> anyhow::Result<Vec<ManageTasks>> {
let snapshot_path = configs.1.mithril_snapshot_path;

let mut follower_tasks = Vec::new();
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn spawn_followers(
/// it left off. If there was no previous follower, start indexing from genesis point.
async fn find_last_update_point(
db: Arc<EventDB>, network: &String,
) -> Result<(Option<SlotNumber>, Option<BlockHash>, Option<BlockTime>), Box<dyn Error>> {
) -> anyhow::Result<(Option<SlotNumber>, Option<BlockHash>, Option<BlockTime>)> {
let (slot_no, block_hash, last_updated) =
match db.last_updated_metadata(network.to_string()).await {
Ok((slot_no, block_hash, last_updated)) => {
Expand All @@ -176,11 +176,11 @@ async fn find_last_update_point(
async fn init_follower(
network: Network, relay: &str, start_from: (Option<SlotNumber>, Option<BlockHash>),
db: Arc<EventDB>, machine_id: MachineId, snapshot: &str,
) -> Result<ManageTasks, Box<dyn Error>> {
) -> anyhow::Result<ManageTasks> {
let mut follower = follower_connection(start_from, snapshot, network, relay).await?;

let genesis_values =
network_genesis_values(&network).ok_or("Obtaining genesis values failed")?;
let genesis_values = network_genesis_values(&network)
.ok_or(anyhow::anyhow!("Obtaining genesis values failed"))?;

let task = tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn init_follower(
async fn follower_connection(
start_from: (Option<SlotNumber>, Option<BlockHash>), snapshot: &str, network: Network,
relay: &str,
) -> Result<Follower, Box<dyn Error>> {
) -> anyhow::Result<Follower> {
let mut follower_cfg = if start_from.0.is_none() || start_from.1.is_none() {
// start from genesis, no previous followers, hence no starting points.
FollowerConfigBuilder::default()
Expand All @@ -320,8 +320,15 @@ async fn follower_connection(
// start from given point
FollowerConfigBuilder::default()
.follow_from(Point::new(
start_from.0.ok_or("Slot number not present")?.try_into()?,
hex::decode(start_from.1.ok_or("Block Hash not present")?)?,
start_from
.0
.ok_or(anyhow::anyhow!("Slot number not present"))?
.try_into()?,
hex::decode(
start_from
.1
.ok_or(anyhow::anyhow!("Block Hash not present"))?,
)?,
))
.mithril_snapshot_path(PathBuf::from(snapshot))
.build()
Expand Down
7 changes: 4 additions & 3 deletions catalyst-gateway/bin/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Setup for logging for the service.
use clap::ValueEnum;
use tracing::{level_filters::LevelFilter, subscriber::SetGlobalDefaultError};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
fmt::{format::FmtSpan, time},
FmtSubscriber,
Expand Down Expand Up @@ -45,7 +45,7 @@ impl From<LogLevel> for tracing::log::LevelFilter {
}

/// Initialize the tracing subscriber
pub(crate) fn init(log_level: LogLevel) -> Result<(), SetGlobalDefaultError> {
pub(crate) fn init(log_level: LogLevel) -> anyhow::Result<()> {
let subscriber = FmtSubscriber::builder()
.json()
.with_max_level(LevelFilter::from_level(log_level.into()))
Expand All @@ -64,5 +64,6 @@ pub(crate) fn init(log_level: LogLevel) -> Result<(), SetGlobalDefaultError> {
// Logging is globally disabled by default, so globally enable it to the required level.
tracing::log::set_max_level(log_level.into());

tracing::subscriber::set_global_default(subscriber)
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
12 changes: 5 additions & 7 deletions catalyst-gateway/bin/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Block stream parsing and filtering utils

use std::error::Error;

use cryptoxide::{blake2b::Blake2b, digest::Digest};
use pallas::ledger::{
primitives::conway::{StakeCredential, VKeyWitness},
Expand Down Expand Up @@ -120,7 +118,7 @@ pub fn extract_stake_credentials_from_certs(
/// except for credentials (i.e. keys or scripts) which are 28-byte long (or 224 bits)
pub fn extract_hashed_witnesses(
witnesses: &[VKeyWitness],
) -> Result<Vec<(WitnessPubKey, WitnessHash)>, Box<dyn Error>> {
) -> anyhow::Result<Vec<(WitnessPubKey, WitnessHash)>> {
let mut hashed_witnesses = Vec::new();
for witness in witnesses {
let pub_key_bytes: [u8; 32] = witness.vkey.as_slice().try_into()?;
Expand All @@ -141,7 +139,7 @@ pub fn extract_hashed_witnesses(
/// to identify the correct stake credential key.
pub fn find_matching_stake_credential(
witnesses: &[(WitnessPubKey, WitnessHash)], stake_credentials: &[String],
) -> Result<(StakeCredentialKey, StakeCredentialHash), Box<dyn Error>> {
) -> anyhow::Result<(StakeCredentialKey, StakeCredentialHash)> {
stake_credentials
.iter()
.zip(witnesses.iter())
Expand All @@ -152,7 +150,7 @@ pub fn find_matching_stake_credential(
None
}
})
.ok_or(
"No stake credential from the certificates matches any of the witness pub keys".into(),
)
.ok_or(anyhow::anyhow!(
"No stake credential from the certificates matches any of the witness pub keys"
))
}
Loading