Skip to content

Commit

Permalink
[r2r] fixed block_header_utxo_loop (#1506)
Browse files Browse the repository at this point in the history
* fixed block_header_utxo_loop

* minor fix

* first attempt for spawn_block_header_utxo_loop unit test

* temp remove unit test and minor fixes

* CantGetBlockCount error

* minor fix for block_count

* minor fix

* pr review fixes

* log coin ticker alongside error messages
  • Loading branch information
borngraced authored Oct 27, 2022
1 parent 6fc74b9 commit 20462d9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
74 changes: 43 additions & 31 deletions mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::utxo::utxo_builder::{UtxoCoinBuildError, UtxoCoinBuilder, UtxoCoinBui
UtxoFieldsWithHardwareWalletBuilder, UtxoFieldsWithIguanaPrivKeyBuilder};
use crate::utxo::{generate_and_send_tx, FeePolicy, GetUtxoListOps, UtxoArc, UtxoCommonOps, UtxoSyncStatusLoopHandle,
UtxoWeak};
use crate::{DerivationMethod, MarketCoinOps, PrivKeyBuildPolicy, UtxoActivationParams};
use crate::{DerivationMethod, PrivKeyBuildPolicy, UtxoActivationParams};
use async_trait::async_trait;
use chain::TransactionOutput;
use common::executor::{AbortSettings, SpawnAbortable, Timer};
Expand All @@ -16,10 +16,11 @@ use serde_json::Value as Json;
use spv_validation::helpers_validation::validate_headers;
use spv_validation::storage::BlockHeaderStorageOps;

const BLOCK_HEADERS_LOOP_INTERVAL: f64 = 60.;
const BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER: f64 = 60.;
const BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER: f64 = 10.;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;
const CHUNK_SIZE_REDUCER_VALUE: u64 = 100;
const ELECTRUM_MAX_CHUNK_SIZE: u64 = 2016;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;

pub struct UtxoArcBuilder<'a, F, T>
where
Expand Down Expand Up @@ -84,7 +85,7 @@ impl<'a, F, T> UtxoFieldsWithHardwareWalletBuilder for UtxoArcBuilder<'a, F, T>
impl<'a, F, T> UtxoCoinBuilder for UtxoArcBuilder<'a, F, T>
where
F: Fn(UtxoArc) -> T + Clone + Send + Sync + 'static,
T: UtxoCommonOps + GetUtxoListOps + MarketCoinOps,
T: UtxoCommonOps + GetUtxoListOps,
{
type ResultCoin = T;
type Error = UtxoCoinBuildError;
Expand All @@ -100,16 +101,18 @@ where

let result_coin = (self.constructor)(utxo_arc.clone());
if let Some(sync_status_loop_handle) = sync_status_loop_handle {
let current_block_height = result_coin
.current_block()
let block_count = result_coin
.as_ref()
.rpc_client
.get_block_count()
.compat()
.await
.map_to_mm(UtxoCoinBuildError::GetCurrentBlockHeightError)?;
.map_err(|err| UtxoCoinBuildError::CantGetBlockCount(err.to_string()))?;
self.spawn_block_header_utxo_loop(
&utxo_arc,
self.constructor.clone(),
sync_status_loop_handle,
current_block_height,
block_count,
);
}

Expand Down Expand Up @@ -224,11 +227,12 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
weak: UtxoWeak,
constructor: impl Fn(UtxoArc) -> T,
mut sync_status_loop_handle: UtxoSyncStatusLoopHandle,
mut last_block_height: u64,
mut block_count: u64,
) {
let mut chunk_size = ELECTRUM_MAX_CHUNK_SIZE;
while let Some(arc) = weak.upgrade() {
let coin = constructor(arc);
let ticker = coin.as_ref().conf.ticker.as_str();
let client = match &coin.as_ref().rpc_client {
UtxoRpcClientEnum::Native(_) => break,
UtxoRpcClientEnum::Electrum(client) => client,
Expand All @@ -238,30 +242,40 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
let from_block_height = match storage.get_last_block_height().await {
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the last stored header in DB!", e);
error!("Error {e:?} on getting the height of the last stored {ticker} header in DB!",);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
},
};

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == last_block_height {
sync_status_loop_handle.notify_sync_finished(last_block_height);
last_block_height = match coin.as_ref().rpc_client.get_block_count().compat().await {
let mut to_block_height = from_block_height + chunk_size;
if to_block_height > block_count {
block_count = match coin.as_ref().rpc_client.get_block_count().compat().await {
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the latest block from rpc!", e);
error!("Error {e:} on getting the height of the latest {ticker} block from rpc!");
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.0).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
},
};
Timer::sleep(BLOCK_HEADERS_LOOP_INTERVAL).await;

// More than `chunk_size` blocks could have appeared since the last `get_block_count` RPC.
// So reset `to_block_height` if only `from_block_height + chunk_size > actual_block_count`.
if to_block_height > block_count {
to_block_height = block_count;
}
}
drop_mutability!(to_block_height);

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == block_count {
sync_status_loop_handle.notify_sync_finished(to_block_height);
Timer::sleep(BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER).await;
continue;
}

let to_block_height = from_block_height + chunk_size;
sync_status_loop_handle.notify_blocks_headers_sync_status(from_block_height + 1, to_block_height);

let mut fetch_blocker_headers_attempts = FETCH_BLOCK_HEADERS_ATTEMPTS;
Expand All @@ -273,30 +287,29 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Ok(res) => res,
Err(error) => {
if error.get_inner().is_network_error() {
log!("Network Error: Will try fetching block headers again after 10 secs");
log!("Network Error: Will try fetching {ticker} block headers again after 10 secs");
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
};

// If electrum returns response too large error, we will reduce the requested headers by CHUNK_SIZE_REDUCER_VALUE every loop until we arrive to a reasonable value.
// If electrum returns response too large error, we will reduce the requested headers by CHUNK_SIZE_REDUCER_VALUE in every loop until we arrive at a reasonable value.
if error.get_inner().is_response_too_large() && chunk_size > CHUNK_SIZE_REDUCER_VALUE {
chunk_size -= CHUNK_SIZE_REDUCER_VALUE;
continue;
}

if fetch_blocker_headers_attempts > 0 {
fetch_blocker_headers_attempts -= 1;
error!("Error {error:?} on retrieving the latest headers from rpc! {fetch_blocker_headers_attempts} attempts left");
error!("Error {error:?} on retrieving latest {ticker} headers from rpc! {fetch_blocker_headers_attempts} attempts left");
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
};

error!(
"Error {} on retrieving the latest headers from rpc after {FETCH_BLOCK_HEADERS_ATTEMPTS} attempts",
error
"Error {error:?} on retrieving latest {ticker} headers from rpc after {FETCH_BLOCK_HEADERS_ATTEMPTS} attempts"
);
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_permanent_error(error.to_string());
Expand All @@ -305,10 +318,9 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
};

// Validate retrieved block headers
let ticker = coin.as_ref().conf.ticker.as_str();
if let Some(params) = &coin.as_ref().conf.block_headers_verification_params {
if let Err(e) = validate_headers(ticker, from_block_height, block_headers, storage, params).await {
error!("Error {} on validating the latest headers!", e);
error!("Error {e:?} on validating the latest headers for {ticker}!");
// Todo: remove this electrum server and use another in this case since the headers from this server are invalid
sync_status_loop_handle.notify_on_permanent_error(e.to_string());
break;
Expand All @@ -317,7 +329,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(

ok_or_continue_after_sleep!(
storage.add_block_headers_to_storage(block_registry).await,
BLOCK_HEADERS_LOOP_INTERVAL
BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER
);
}
}
Expand All @@ -328,7 +340,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
utxo_arc: &UtxoArc,
constructor: F,
sync_status_loop_handle: UtxoSyncStatusLoopHandle,
current_block_height: u64,
block_count: u64,
) where
F: Fn(UtxoArc) -> T + Send + Sync + 'static,
T: UtxoCommonOps,
Expand All @@ -337,7 +349,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
info!("Starting UTXO block header loop for coin {ticker}");

let utxo_weak = utxo_arc.downgrade();
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, current_block_height);
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, block_count);

let settings = AbortSettings::info_on_abort(format!("spawn_block_header_utxo_loop stopped for {ticker}"));
utxo_arc
Expand Down
3 changes: 2 additions & 1 deletion mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ pub enum UtxoCoinBuildError {
HwContextNotInitialized,
HDWalletStorageError(HDWalletStorageError),
CoinDoesntSupportTrezor,
GetCurrentBlockHeightError(String),
BlockHeaderStorageError(BlockHeaderStorageError),
#[display(fmt = "Error {} on getting the height of the latest block from rpc!", _0)]
CantGetBlockCount(String),
#[display(fmt = "Internal error: {}", _0)]
Internal(String),
}
Expand Down

0 comments on commit 20462d9

Please sign in to comment.