Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] fixed block_header_utxo_loop #1506

Merged
merged 9 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 43 additions & 31 deletions mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::utxo::utxo_builder::{UtxoCoinBuildError, UtxoCoinBuilder, UtxoCoinBui
UtxoFieldsWithHardwareWalletBuilder, UtxoFieldsWithIguanaPrivKeyBuilder};
use crate::utxo::{generate_and_send_tx, FeePolicy, GetUtxoListOps, UtxoArc, UtxoCommonOps, UtxoSyncStatusLoopHandle,
UtxoWeak};
use crate::{DerivationMethod, MarketCoinOps, PrivKeyBuildPolicy, UtxoActivationParams};
use crate::{DerivationMethod, PrivKeyBuildPolicy, UtxoActivationParams};
use async_trait::async_trait;
use chain::TransactionOutput;
use common::executor::{AbortSettings, SpawnAbortable, Timer};
Expand All @@ -16,10 +16,11 @@ use serde_json::Value as Json;
use spv_validation::helpers_validation::validate_headers;
use spv_validation::storage::BlockHeaderStorageOps;

const BLOCK_HEADERS_LOOP_INTERVAL: f64 = 60.;
const BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER: f64 = 60.;
const BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER: f64 = 10.;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;
const CHUNK_SIZE_REDUCER_VALUE: u64 = 100;
const ELECTRUM_MAX_CHUNK_SIZE: u64 = 2016;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;

pub struct UtxoArcBuilder<'a, F, T>
where
Expand Down Expand Up @@ -84,7 +85,7 @@ impl<'a, F, T> UtxoFieldsWithHardwareWalletBuilder for UtxoArcBuilder<'a, F, T>
impl<'a, F, T> UtxoCoinBuilder for UtxoArcBuilder<'a, F, T>
where
F: Fn(UtxoArc) -> T + Clone + Send + Sync + 'static,
T: UtxoCommonOps + GetUtxoListOps + MarketCoinOps,
T: UtxoCommonOps + GetUtxoListOps,
{
type ResultCoin = T;
type Error = UtxoCoinBuildError;
Expand All @@ -100,16 +101,18 @@ where

let result_coin = (self.constructor)(utxo_arc.clone());
if let Some(sync_status_loop_handle) = sync_status_loop_handle {
let current_block_height = result_coin
.current_block()
let block_count = result_coin
.as_ref()
.rpc_client
.get_block_count()
.compat()
.await
.map_to_mm(UtxoCoinBuildError::GetCurrentBlockHeightError)?;
.map_err(|err| UtxoCoinBuildError::CantGetBlockCount(err.to_string()))?;
self.spawn_block_header_utxo_loop(
&utxo_arc,
self.constructor.clone(),
sync_status_loop_handle,
current_block_height,
block_count,
);
}

Expand Down Expand Up @@ -224,11 +227,12 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
weak: UtxoWeak,
constructor: impl Fn(UtxoArc) -> T,
mut sync_status_loop_handle: UtxoSyncStatusLoopHandle,
mut last_block_height: u64,
mut block_count: u64,
) {
let mut chunk_size = ELECTRUM_MAX_CHUNK_SIZE;
while let Some(arc) = weak.upgrade() {
let coin = constructor(arc);
let ticker = coin.as_ref().conf.ticker.as_str();
let client = match &coin.as_ref().rpc_client {
UtxoRpcClientEnum::Native(_) => break,
UtxoRpcClientEnum::Electrum(client) => client,
Expand All @@ -238,30 +242,40 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
let from_block_height = match storage.get_last_block_height().await {
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the last stored header in DB!", e);
error!("Error {e:?} on getting the height of the last stored {ticker} header in DB!",);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
},
};

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == last_block_height {
sync_status_loop_handle.notify_sync_finished(last_block_height);
last_block_height = match coin.as_ref().rpc_client.get_block_count().compat().await {
let mut to_block_height = from_block_height + chunk_size;
if to_block_height > block_count {
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
block_count = match coin.as_ref().rpc_client.get_block_count().compat().await {
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the latest block from rpc!", e);
error!("Error {e:} on getting the height of the latest {ticker} block from rpc!");
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.0).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
},
};
Timer::sleep(BLOCK_HEADERS_LOOP_INTERVAL).await;

// More than `chunk_size` blocks could have appeared since the last `get_block_count` RPC.
// So reset `to_block_height` if only `from_block_height + chunk_size > actual_block_count`.
if to_block_height > block_count {
to_block_height = block_count;
}
}
drop_mutability!(to_block_height);

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == block_count {
sync_status_loop_handle.notify_sync_finished(to_block_height);
Timer::sleep(BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER).await;
continue;
}

let to_block_height = from_block_height + chunk_size;
sync_status_loop_handle.notify_blocks_headers_sync_status(from_block_height + 1, to_block_height);

let mut fetch_blocker_headers_attempts = FETCH_BLOCK_HEADERS_ATTEMPTS;
Expand All @@ -273,30 +287,29 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Ok(res) => res,
Err(error) => {
if error.get_inner().is_network_error() {
log!("Network Error: Will try fetching block headers again after 10 secs");
log!("Network Error: Will try fetching {ticker} block headers again after 10 secs");
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
};

// If electrum returns response too large error, we will reduce the requested headers by CHUNK_SIZE_REDUCER_VALUE every loop until we arrive to a reasonable value.
// If electrum returns response too large error, we will reduce the requested headers by CHUNK_SIZE_REDUCER_VALUE in every loop until we arrive at a reasonable value.
if error.get_inner().is_response_too_large() && chunk_size > CHUNK_SIZE_REDUCER_VALUE {
chunk_size -= CHUNK_SIZE_REDUCER_VALUE;
continue;
}

if fetch_blocker_headers_attempts > 0 {
fetch_blocker_headers_attempts -= 1;
error!("Error {error:?} on retrieving the latest headers from rpc! {fetch_blocker_headers_attempts} attempts left");
error!("Error {error:?} on retrieving latest {ticker} headers from rpc! {fetch_blocker_headers_attempts} attempts left");
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
};

error!(
"Error {} on retrieving the latest headers from rpc after {FETCH_BLOCK_HEADERS_ATTEMPTS} attempts",
error
"Error {error:?} on retrieving latest {ticker} headers from rpc after {FETCH_BLOCK_HEADERS_ATTEMPTS} attempts"
);
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_permanent_error(error.to_string());
Expand All @@ -305,10 +318,9 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
};

// Validate retrieved block headers
let ticker = coin.as_ref().conf.ticker.as_str();
if let Some(params) = &coin.as_ref().conf.block_headers_verification_params {
if let Err(e) = validate_headers(ticker, from_block_height, block_headers, storage, params).await {
error!("Error {} on validating the latest headers!", e);
error!("Error {e:?} on validating the latest headers for {ticker}!");
// Todo: remove this electrum server and use another in this case since the headers from this server are invalid
sync_status_loop_handle.notify_on_permanent_error(e.to_string());
break;
Expand All @@ -317,7 +329,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(

ok_or_continue_after_sleep!(
storage.add_block_headers_to_storage(block_registry).await,
BLOCK_HEADERS_LOOP_INTERVAL
BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER
);
}
}
Expand All @@ -328,7 +340,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
utxo_arc: &UtxoArc,
constructor: F,
sync_status_loop_handle: UtxoSyncStatusLoopHandle,
current_block_height: u64,
block_count: u64,
) where
F: Fn(UtxoArc) -> T + Send + Sync + 'static,
T: UtxoCommonOps,
Expand All @@ -337,7 +349,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
info!("Starting UTXO block header loop for coin {ticker}");

let utxo_weak = utxo_arc.downgrade();
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, current_block_height);
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, block_count);

let settings = AbortSettings::info_on_abort(format!("spawn_block_header_utxo_loop stopped for {ticker}"));
utxo_arc
Expand Down
3 changes: 2 additions & 1 deletion mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ pub enum UtxoCoinBuildError {
HwContextNotInitialized,
HDWalletStorageError(HDWalletStorageError),
CoinDoesntSupportTrezor,
GetCurrentBlockHeightError(String),
BlockHeaderStorageError(BlockHeaderStorageError),
#[display(fmt = "Error {} on getting the height of the latest block from rpc!", _0)]
CantGetBlockCount(String),
#[display(fmt = "Internal error: {}", _0)]
Internal(String),
}
Expand Down