diff --git a/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs b/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs index 56dac755c..5897144c2 100644 --- a/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs +++ b/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs @@ -110,7 +110,7 @@ impl StacksBlockPool { ctx.try_log(|logger| { slog::error!(logger, "unable to retrieve previous stacks fork") }); - return Ok(None); + return Err("unable to retrieve previous stacks fork".to_string()); } }; diff --git a/components/chainhook-sdk/src/observer/http.rs b/components/chainhook-sdk/src/observer/http.rs index 0c91acb46..024874142 100644 --- a/components/chainhook-sdk/src/observer/http.rs +++ b/components/chainhook-sdk/src/observer/http.rs @@ -4,7 +4,10 @@ use crate::indexer::bitcoin::{ use crate::indexer::{self, Indexer}; use crate::monitoring::PrometheusMonitoring; use crate::utils::Context; +use crate::{try_error, try_info}; use hiro_system_kit::slog; +use rocket::http::Status; +use rocket::response::status::Custom; use rocket::serde::json::{json, Json, Value as JsonValue}; use rocket::State; use std::sync::mpsc::Sender; @@ -15,6 +18,27 @@ use super::{ StacksChainMempoolEvent, }; +fn success_response() -> Result, Custom>> { + Ok(Json(json!({ + "status": 200, + "result": "Ok", + }))) +} + +fn error_response( + message: String, + ctx: &State, +) -> Result, Custom>> { + try_error!(ctx, "{message}"); + Err(Custom( + Status::InternalServerError, + Json(json!({ + "status": 500, + "result": message, + })), + )) +} + #[rocket::get("/ping", format = "application/json")] pub fn handle_ping( ctx: &State, @@ -36,18 +60,15 @@ pub async fn handle_new_bitcoin_block( background_job_tx: &State>>>, prometheus_monitoring: &State, ctx: &State, -) -> Json { +) -> Result, Custom>> { if bitcoin_config .bitcoin_block_signaling .should_ignore_bitcoin_block_signaling_through_stacks() { - return Json(json!({ - "status": 200, - "result": "Ok", - })); + return success_response(); } - ctx.try_log(|logger| slog::info!(logger, "POST /new_burn_block")); + try_info!(ctx, "POST /new_burn_block"); // Standardize the structure of the block, and identify the // kind of update that this new block would imply, taking // into account the last 7 blocks. @@ -58,116 +79,48 @@ pub async fn handle_new_bitcoin_block( match download_and_parse_block_with_retry(&http_client, block_hash, bitcoin_config, ctx) .await { - Ok(block) => Some(block), + Ok(block) => block, Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to download_and_parse_block: {}", - e.to_string() - ) - }); - None + return error_response(format!("unable to download_and_parse_block: {e}"), ctx) } }; - let Some(block) = block else { - ctx.try_log(|logger| { - slog::crit!( - logger, - "Could not download bitcoin block after receiving new_burn_block. Exiting Chainhook observer." - ) - }); - match background_job_tx.lock() { - Ok(tx) => { - let _ = tx.send(ObserverCommand::Terminate); - } - Err(e) => { - ctx.try_log(|logger| { - slog::crit!(logger, "Could not shut down event observer: {e}") - }); - std::process::exit(1) - } - } - - return Json(json!({ - "status": 500, - "result": "unable to retrieve_full_block", - })); - }; let header = block.get_block_header(); let block_height = header.block_identifier.index; prometheus_monitoring.btc_metrics_block_received(block_height); - match background_job_tx.lock() { - Ok(tx) => { - let _ = tx.send(ObserverCommand::ProcessBitcoinBlock(block)); - } - Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire background_job_tx: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); - } - }; + if let Err(e) = background_job_tx.lock().map(|tx| { + tx.send(ObserverCommand::ProcessBitcoinBlock(block)) + .map_err(|e| format!("Unable to send stacks chain event: {}", e)) + }) { + return error_response(format!("unable to acquire background_job_tx: {e}"), ctx); + } let chain_update = match indexer_rw_lock.inner().write() { Ok(mut indexer) => indexer.handle_bitcoin_header(header, ctx), Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire indexer_rw_lock: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); + return error_response(format!("Unable to acquire indexer_rw_lock: {e}"), ctx); } }; match chain_update { Ok(Some(chain_event)) => { prometheus_monitoring.btc_metrics_block_appended(block_height); - match background_job_tx.lock() { - Ok(tx) => { - let _ = tx.send(ObserverCommand::PropagateBitcoinChainEvent(chain_event)); - } - Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire background_job_tx: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); - } - }; + if let Err(e) = background_job_tx.lock().map(|tx| { + tx.send(ObserverCommand::PropagateBitcoinChainEvent(chain_event)) + .map_err(|e| format!("Unable to send stacks chain event: {}", e)) + }) { + return error_response(format!("unable to acquire background_job_tx: {e}"), ctx); + } } Ok(None) => { - ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); + try_info!(ctx, "No chain event was generated"); } Err(e) => { - ctx.try_log(|logger| slog::error!(logger, "unable to handle bitcoin block: {}", e)) + return error_response(format!("Unable to handle bitcoin block: {e}"), ctx); } } - Json(json!({ - "status": 200, - "result": "Ok", - })) + success_response() } #[post("/new_block", format = "application/json", data = "")] @@ -177,8 +130,8 @@ pub fn handle_new_stacks_block( background_job_tx: &State>>>, prometheus_monitoring: &State, ctx: &State, -) -> Json { - ctx.try_log(|logger| slog::info!(logger, "POST /new_block")); +) -> Result, Custom>> { + try_info!(ctx, "POST /new_block"); // Standardize the structure of the block, and identify the // kind of update that this new block would imply, taking // into account the last 7 blocks. @@ -191,10 +144,7 @@ pub fn handle_new_stacks_block( { Ok(block) => block, Err(e) => { - return Json(json!({ - "status": 500, - "result": format!("Unable to standardize stacks block {}", e), - })); + return error_response(format!("Unable to standardize stacks block {e}"), ctx); } }; let new_tip = block.block_identifier.index; @@ -203,53 +153,29 @@ pub fn handle_new_stacks_block( (pox_config, chain_event, new_tip) } Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire indexer_rw_lock: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); + return error_response(format!("Unable to acquire indexer_rw_lock: {e}"), ctx); } }; match chain_event { Ok(Some(chain_event)) => { prometheus_monitoring.stx_metrics_block_appeneded(new_tip); - let background_job_tx = background_job_tx.inner(); - match background_job_tx.lock() { - Ok(tx) => { - let _ = tx.send(ObserverCommand::PropagateStacksChainEvent(chain_event)); - } - Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire background_job_tx: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); - } - }; + if let Err(e) = background_job_tx.lock().map(|tx| { + tx.send(ObserverCommand::PropagateStacksChainEvent(chain_event)) + .map_err(|e| format!("Unable to send stacks chain event: {}", e)) + }) { + return error_response(format!("unable to acquire background_job_tx: {e}"), ctx); + } } Ok(None) => { - ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); + try_info!(ctx, "No chain event was generated"); + } + Err(e) => { + return error_response(format!("Chain event error: {e}"), ctx); } - Err(e) => ctx.try_log(|logger| slog::error!(logger, "{}", e)), } - Json(json!({ - "status": 200, - "result": "Ok", - })) + success_response() } #[post( @@ -262,67 +188,36 @@ pub fn handle_new_microblocks( marshalled_microblock: Json, background_job_tx: &State>>>, ctx: &State, -) -> Json { - ctx.try_log(|logger| slog::debug!(logger, "POST /new_microblocks")); +) -> Result, Custom>> { + try_info!(ctx, "POST /new_microblocks"); // Standardize the structure of the microblock, and identify the // kind of update that this new microblock would imply let chain_event = match indexer_rw_lock.inner().write() { - Ok(mut indexer) => { - - indexer.handle_stacks_marshalled_microblock_trail( - marshalled_microblock.into_inner(), - ctx, - ) - } + Ok(mut indexer) => indexer + .handle_stacks_marshalled_microblock_trail(marshalled_microblock.into_inner(), ctx), Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire background_job_tx: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); + return error_response(format!("Unable to acquire background_job_tx: {e}"), ctx); } }; match chain_event { Ok(Some(chain_event)) => { - let background_job_tx = background_job_tx.inner(); - match background_job_tx.lock() { - Ok(tx) => { - let _ = tx.send(ObserverCommand::PropagateStacksChainEvent(chain_event)); - } - Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "unable to acquire background_job_tx: {}", - e.to_string() - ) - }); - return Json(json!({ - "status": 500, - "result": "Unable to acquire lock", - })); - } - }; + if let Err(e) = background_job_tx.lock().map(|tx| { + tx.send(ObserverCommand::PropagateStacksChainEvent(chain_event)) + .map_err(|e| format!("Unable to send stacks chain event: {}", e)) + }) { + return error_response(format!("unable to acquire background_job_tx: {e}"), ctx); + } } Ok(None) => { - ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); + try_info!(ctx, "No chain event was generated"); } Err(e) => { - ctx.try_log(|logger| slog::error!(logger, "unable to handle stacks microblock: {}", e)); + return error_response(format!("Chain event error: {e}"), ctx); } } - Json(json!({ - "status": 200, - "result": "Ok", - })) + success_response() } #[post("/new_mempool_tx", format = "application/json", data = "")] @@ -330,31 +225,36 @@ pub fn handle_new_mempool_tx( raw_txs: Json>, background_job_tx: &State>>>, ctx: &State, -) -> Json { - ctx.try_log(|logger| slog::debug!(logger, "POST /new_mempool_tx")); - let transactions = raw_txs +) -> Result, Custom>> { + try_info!(ctx, "POST /new_mempool_tx"); + let transactions = match raw_txs .iter() .map(|tx_data| { - let (tx_description, ..) = indexer::stacks::get_tx_description(tx_data, &vec![]) - .expect("unable to parse transaction"); - MempoolAdmissionData { - tx_data: tx_data.clone(), - tx_description, - } + indexer::stacks::get_tx_description(tx_data, &vec![]) + .map(|(tx_description, ..)| MempoolAdmissionData { + tx_data: tx_data.clone(), + tx_description, + }) + .map_err(|e| e) }) - .collect::>(); + .collect::, _>>() + { + Ok(transactions) => transactions, + Err(e) => { + return error_response(format!("Failed to parse mempool transactions: {e}"), ctx); + } + }; - let background_job_tx = background_job_tx.inner(); - if let Ok(tx) = background_job_tx.lock() { - let _ = tx.send(ObserverCommand::PropagateStacksMempoolEvent( + if let Err(e) = background_job_tx.lock().map(|tx| { + tx.send(ObserverCommand::PropagateStacksMempoolEvent( StacksChainMempoolEvent::TransactionsAdmitted(transactions), - )); - }; + )) + .map_err(|e| format!("Unable to send stacks chain event: {}", e)) + }) { + return error_response(format!("unable to acquire background_job_tx: {e}"), ctx); + } - Json(json!({ - "status": 200, - "result": "Ok", - })) + success_response() } #[post("/drop_mempool_tx", format = "application/json")] diff --git a/components/chainhook-sdk/src/utils/mod.rs b/components/chainhook-sdk/src/utils/mod.rs index 550eb56a5..4de1df404 100644 --- a/components/chainhook-sdk/src/utils/mod.rs +++ b/components/chainhook-sdk/src/utils/mod.rs @@ -403,3 +403,44 @@ pub fn write_file_content_at_path(file_path: &PathBuf, content: &[u8]) -> Result .map_err(|e| format!("unable to write file {}\n{}", file_path.display(), e))?; Ok(()) } + +// TODO: Fold these macros into one generic macro with configurable log levels. +#[macro_export] +macro_rules! try_info { + ($a:expr, $tag:expr, $($args:tt)*) => { + $a.try_log(|l| slog::info!(l, $tag, $($args)*)); + }; + ($a:expr, $tag:expr) => { + $a.try_log(|l| slog::info!(l, $tag)); + }; +} + +#[macro_export] +macro_rules! try_debug { + ($a:expr, $tag:expr, $($args:tt)*) => { + $a.try_log(|l| slog::debug!(l, $tag, $($args)*)); + }; + ($a:expr, $tag:expr) => { + $a.try_log(|l| slog::debug!(l, $tag)); + }; +} + +#[macro_export] +macro_rules! try_warn { + ($a:expr, $tag:expr, $($args:tt)*) => { + $a.try_log(|l| slog::warn!(l, $tag, $($args)*)); + }; + ($a:expr, $tag:expr) => { + $a.try_log(|l| slog::warn!(l, $tag)); + }; +} + +#[macro_export] +macro_rules! try_error { + ($a:expr, $tag:expr, $($args:tt)*) => { + $a.try_log(|l| slog::error!(l, $tag, $($args)*)); + }; + ($a:expr, $tag:expr) => { + $a.try_log(|l| slog::error!(l, $tag)); + }; +}