Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] fixed block_header_utxo_loop #1506

Merged
merged 9 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use serde_json::Value as Json;
use spv_validation::helpers_validation::validate_headers;
use spv_validation::storage::BlockHeaderStorageOps;

const BLOCK_HEADERS_LOOP_INTERVAL: f64 = 60.;
const BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER: f64 = 60.;
const BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER: f64 = 10.;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;
const CHUNK_SIZE_REDUCER_VALUE: u64 = 100;
const ELECTRUM_MAX_CHUNK_SIZE: u64 = 2016;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;

pub struct UtxoArcBuilder<'a, F, T>
where
Expand Down Expand Up @@ -100,16 +101,17 @@ where

let result_coin = (self.constructor)(utxo_arc.clone());
if let Some(sync_status_loop_handle) = sync_status_loop_handle {
let current_block_height = result_coin
.current_block()
.compat()
.await
.map_to_mm(UtxoCoinBuildError::GetCurrentBlockHeightError)?;
let block_count = match result_coin.as_ref().rpc_client.get_block_count().compat().await {
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
Ok(h) => h,
Err(e) => {
shamardy marked this conversation as resolved.
Show resolved Hide resolved
return MmError::err(UtxoCoinBuildError::CantGetBlockCount(e.to_string()));
},
};
self.spawn_block_header_utxo_loop(
&utxo_arc,
self.constructor.clone(),
sync_status_loop_handle,
current_block_height,
block_count,
);
}

Expand Down Expand Up @@ -224,7 +226,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
weak: UtxoWeak,
constructor: impl Fn(UtxoArc) -> T,
mut sync_status_loop_handle: UtxoSyncStatusLoopHandle,
mut last_block_height: u64,
mut block_count: u64,
) {
let mut chunk_size = ELECTRUM_MAX_CHUNK_SIZE;
while let Some(arc) = weak.upgrade() {
Expand All @@ -240,28 +242,37 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Err(e) => {
error!("Error {} on getting the height of the last stored header in DB!", e);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
},
};

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == last_block_height {
sync_status_loop_handle.notify_sync_finished(last_block_height);
last_block_height = match coin.as_ref().rpc_client.get_block_count().compat().await {
let mut to_block_height = from_block_height + chunk_size;
if to_block_height > block_count {
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
block_count = match coin.as_ref().rpc_client.get_block_count().compat().await {
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the latest block from rpc!", e);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.0).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
},
};
Timer::sleep(BLOCK_HEADERS_LOOP_INTERVAL).await;

// More than `chunk_size` blocks could have appeared since the last `get_block_count` RPC.
// So reset `to_block_height` if only `from_block_height + chunk_size > actual_block_count`.
if to_block_height > block_count {
to_block_height = block_count;
}
}

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == block_count {
sync_status_loop_handle.notify_sync_finished(to_block_height);
Timer::sleep(BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER).await;
continue;
}

let to_block_height = from_block_height + chunk_size;
sync_status_loop_handle.notify_blocks_headers_sync_status(from_block_height + 1, to_block_height);

let mut fetch_blocker_headers_attempts = FETCH_BLOCK_HEADERS_ATTEMPTS;
Expand All @@ -275,7 +286,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
if error.get_inner().is_network_error() {
log!("Network Error: Will try fetching block headers again after 10 secs");
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
};

Expand All @@ -290,7 +301,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
error!("Error {error:?} on retrieving the latest headers from rpc! {fetch_blocker_headers_attempts} attempts left");
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(10.).await;
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
continue;
};

Expand All @@ -317,7 +328,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(

ok_or_continue_after_sleep!(
storage.add_block_headers_to_storage(block_registry).await,
BLOCK_HEADERS_LOOP_INTERVAL
BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER
);
}
}
Expand All @@ -328,7 +339,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
utxo_arc: &UtxoArc,
constructor: F,
sync_status_loop_handle: UtxoSyncStatusLoopHandle,
current_block_height: u64,
block_count: u64,
) where
F: Fn(UtxoArc) -> T + Send + Sync + 'static,
T: UtxoCommonOps,
Expand All @@ -337,7 +348,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
info!("Starting UTXO block header loop for coin {ticker}");

let utxo_weak = utxo_arc.downgrade();
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, current_block_height);
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, block_count);

let settings = AbortSettings::info_on_abort(format!("spawn_block_header_utxo_loop stopped for {ticker}"));
utxo_arc
Expand Down
3 changes: 2 additions & 1 deletion mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ pub enum UtxoCoinBuildError {
HwContextNotInitialized,
HDWalletStorageError(HDWalletStorageError),
CoinDoesntSupportTrezor,
GetCurrentBlockHeightError(String),
BlockHeaderStorageError(BlockHeaderStorageError),
#[display(fmt = "Error {} on getting the height of the latest block from rpc!", _0)]
CantGetBlockCount(String),
#[display(fmt = "Internal error: {}", _0)]
Internal(String),
}
Expand Down