Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Block header UTXO Loop Test Impl #1519

Merged
merged 22 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions mm2src/coins/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ use crate::hd_wallet_storage::{HDAccountStorageItem, HDWalletCoinStorage, HDWall
use crate::utxo::tx_cache::UtxoVerboseCacheShared;

pub mod tx_cache;
#[cfg(target_arch = "wasm32")]
pub mod utxo_indexedb_block_header_storage;
#[cfg(not(target_arch = "wasm32"))]
pub mod utxo_sql_block_header_storage;

#[cfg(any(test, target_arch = "wasm32"))]
pub mod utxo_common_tests;
Expand Down
89 changes: 46 additions & 43 deletions mm2src/coins/utxo/rpc_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,49 +1946,6 @@ impl ElectrumClient {
rpc_func!(self, "blockchain.block.headers", start_height, count)
}

pub fn retrieve_headers(&self, from: u64, to: u64) -> UtxoRpcFut<(HashMap<u64, BlockHeader>, Vec<BlockHeader>)> {
let coin_name = self.coin_ticker.clone();
if from == 0 || to < from {
return Box::new(futures01::future::err(
UtxoRpcError::Internal("Invalid values for from/to parameters".to_string()).into(),
));
}
let count: NonZeroU64 = match (to - from + 1).try_into() {
Ok(c) => c,
Err(e) => return Box::new(futures01::future::err(UtxoRpcError::Internal(e.to_string()).into())),
};
Box::new(
self.blockchain_block_headers(from, count)
.map_to_mm_fut(UtxoRpcError::from)
.and_then(move |headers| {
let (block_registry, block_headers) = {
if headers.count == 0 {
return MmError::err(UtxoRpcError::Internal("No headers available".to_string()));
}
let len = CompactInteger::from(headers.count);
let mut serialized = serialize(&len).take();
serialized.extend(headers.hex.0.into_iter());
drop_mutability!(serialized);
let mut reader =
Reader::new_with_coin_variant(serialized.as_slice(), coin_name.as_str().into());
let maybe_block_headers = reader.read_list::<BlockHeader>();
let block_headers = match maybe_block_headers {
Ok(headers) => headers,
Err(e) => return MmError::err(UtxoRpcError::InvalidResponse(format!("{:?}", e))),
};
let mut block_registry: HashMap<u64, BlockHeader> = HashMap::new();
let mut starting_height = from;
for block_header in &block_headers {
block_registry.insert(starting_height, block_header.clone());
starting_height += 1;
}
(block_registry, block_headers)
};
Ok((block_registry, block_headers))
}),
)
}

/// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-transaction-get-merkle
pub fn blockchain_transaction_get_merkle(&self, txid: H256Json, height: u64) -> RpcRes<TxMerkleBranch> {
rpc_func!(self, "blockchain.transaction.get_merkle", txid, height)
Expand Down Expand Up @@ -2081,6 +2038,52 @@ impl ElectrumClient {
}
}

#[cfg_attr(test, mockable)]
impl ElectrumClient {
pub fn retrieve_headers(&self, from: u64, to: u64) -> UtxoRpcFut<(HashMap<u64, BlockHeader>, Vec<BlockHeader>)> {
let coin_name = self.coin_ticker.clone();
if from == 0 || to < from {
return Box::new(futures01::future::err(
UtxoRpcError::Internal("Invalid values for from/to parameters".to_string()).into(),
));
}
let count: NonZeroU64 = match (to - from + 1).try_into() {
Ok(c) => c,
Err(e) => return Box::new(futures01::future::err(UtxoRpcError::Internal(e.to_string()).into())),
};
Box::new(
self.blockchain_block_headers(from, count)
.map_to_mm_fut(UtxoRpcError::from)
.and_then(move |headers| {
let (block_registry, block_headers) = {
if headers.count == 0 {
return MmError::err(UtxoRpcError::Internal("No headers available".to_string()));
}
let len = CompactInteger::from(headers.count);
let mut serialized = serialize(&len).take();
serialized.extend(headers.hex.0.into_iter());
drop_mutability!(serialized);
let mut reader =
Reader::new_with_coin_variant(serialized.as_slice(), coin_name.as_str().into());
let maybe_block_headers = reader.read_list::<BlockHeader>();
let block_headers = match maybe_block_headers {
Ok(headers) => headers,
Err(e) => return MmError::err(UtxoRpcError::InvalidResponse(format!("{:?}", e))),
};
let mut block_registry: HashMap<u64, BlockHeader> = HashMap::new();
let mut starting_height = from;
for block_header in &block_headers {
block_registry.insert(starting_height, block_header.clone());
starting_height += 1;
}
(block_registry, block_headers)
};
Ok((block_registry, block_headers))
}),
)
}
}

// if mockable is placed before async_trait there is `munmap_chunk(): invalid pointer` error on async fn mocking attempt
#[async_trait]
#[cfg_attr(test, mockable)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
#[cfg(target_arch = "wasm32")] mod indexedb_block_header_storage;
#[cfg(target_arch = "wasm32")]
use crate::utxo::utxo_indexedb_block_header_storage::IndexedDBBlockHeadersStorage;
pub use indexedb_block_header_storage::IndexedDBBlockHeadersStorage;

#[cfg(not(target_arch = "wasm32"))] mod sql_block_header_storage;
#[cfg(not(target_arch = "wasm32"))]
use crate::utxo::utxo_sql_block_header_storage::SqliteBlockHeadersStorage;
pub use sql_block_header_storage::SqliteBlockHeadersStorage;

use async_trait::async_trait;
use chain::BlockHeader;
use mm2_core::mm_ctx::MmArc;
#[cfg(all(test, not(target_arch = "wasm32")))]
use mocktopus::macros::*;
use primitives::hash::H256;
use spv_validation::storage::{BlockHeaderStorageError, BlockHeaderStorageOps};
use std::collections::HashMap;
Expand Down Expand Up @@ -54,6 +60,7 @@ impl BlockHeaderStorage {
}

#[async_trait]
#[cfg_attr(all(test, not(target_arch = "wasm32")), mockable)]
impl BlockHeaderStorageOps for BlockHeaderStorage {
async fn init(&self) -> Result<(), BlockHeaderStorageError> { self.inner.init().await }

Expand Down
3 changes: 3 additions & 0 deletions mm2src/coins/utxo/utxo_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ pub use utxo_coin_builder::{UtxoCoinBuildError, UtxoCoinBuildResult, UtxoCoinBui
UtxoFieldsWithGlobalHDBuilder, UtxoFieldsWithHardwareWalletBuilder,
UtxoFieldsWithIguanaSecretBuilder};
pub use utxo_conf_builder::{UtxoConfBuilder, UtxoConfError, UtxoConfResult};

#[cfg(test)]
pub(crate) use utxo_arc_builder::{block_header_utxo_loop, BlockHeaderUtxoLoopExtraArgs};
42 changes: 28 additions & 14 deletions mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ use common::log::{error, info, warn};
use futures::compat::Future01CompatExt;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
#[cfg(test)] use mocktopus::macros::*;
use script::Builder;
use serde_json::Value as Json;
use spv_validation::helpers_validation::validate_headers;
use spv_validation::storage::BlockHeaderStorageOps;

const BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER: f64 = 60.;
const BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER: f64 = 10.;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;
const CHUNK_SIZE_REDUCER_VALUE: u64 = 100;
const ELECTRUM_MAX_CHUNK_SIZE: u64 = 2016;

pub struct UtxoArcBuilder<'a, F, T>
where
Expand Down Expand Up @@ -229,13 +227,31 @@ pub trait MergeUtxoArcOps<T: UtxoCommonOps + GetUtxoListOps>: UtxoCoinBuilderCom
}
}

async fn block_header_utxo_loop<T: UtxoCommonOps>(
pub(crate) struct BlockHeaderUtxoLoopExtraArgs {
pub(crate) chunk_size: u64,
pub(crate) error_sleep: f64,
pub(crate) success_sleep: f64,
}

#[cfg_attr(test, mockable)]
impl Default for BlockHeaderUtxoLoopExtraArgs {
fn default() -> Self {
Self {
chunk_size: 2016,
error_sleep: 10.,
success_sleep: 60.,
}
}
}
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved

pub(crate) async fn block_header_utxo_loop<T: UtxoCommonOps>(
weak: UtxoWeak,
constructor: impl Fn(UtxoArc) -> T,
mut sync_status_loop_handle: UtxoSyncStatusLoopHandle,
mut block_count: u64,
) {
let mut chunk_size = ELECTRUM_MAX_CHUNK_SIZE;
let args = BlockHeaderUtxoLoopExtraArgs::default();
let mut chunk_size = args.chunk_size;
while let Some(arc) = weak.upgrade() {
let coin = constructor(arc);
let ticker = coin.as_ref().conf.ticker.as_str();
Expand All @@ -250,7 +266,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Err(e) => {
error!("Error {e:?} on getting the height of the last stored {ticker} header in DB!",);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
},
};
Expand All @@ -262,7 +278,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Err(e) => {
error!("Error {e:} on getting the height of the latest {ticker} block from rpc!");
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
},
};
Expand All @@ -278,7 +294,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
// Todo: Add code for the case if a chain reorganization happens
if from_block_height == block_count {
sync_status_loop_handle.notify_sync_finished(to_block_height);
Timer::sleep(BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER).await;
Timer::sleep(args.success_sleep).await;
continue;
}

Expand All @@ -295,7 +311,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
if error.get_inner().is_network_error() {
log!("Network Error: Will try fetching {ticker} block headers again after 10 secs");
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
};

Expand All @@ -310,7 +326,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
error!("Error {error:?} on retrieving latest {ticker} headers from rpc! {fetch_blocker_headers_attempts} attempts left");
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
};

Expand All @@ -333,10 +349,8 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
}
};

ok_or_continue_after_sleep!(
storage.add_block_headers_to_storage(block_registry).await,
BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER
);
let sleep = args.success_sleep;
ok_or_continue_after_sleep!(storage.add_block_headers_to_storage(block_registry).await, sleep);
}
}

Expand Down
103 changes: 98 additions & 5 deletions mm2src/coins/utxo/utxo_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use crate::utxo::rpc_clients::{BlockHashOrHeight, ElectrumBalance, ElectrumClien
GetAddressInfoRes, ListSinceBlockRes, NativeClient, NativeClientImpl, NativeUnspent,
NetworkInfo, UtxoRpcClientOps, ValidateAddressRes, VerboseBlock};
use crate::utxo::spv::SimplePaymentVerification;
use crate::utxo::utxo_block_header_storage::BlockHeaderStorage;
use crate::utxo::utxo_builder::{UtxoArcBuilder, UtxoCoinBuilderCommonOps};
use crate::utxo::utxo_block_header_storage::{BlockHeaderStorage, SqliteBlockHeadersStorage};
use crate::utxo::utxo_builder::{UtxoArcBuilder, UtxoCoinBuilder, UtxoCoinBuilderCommonOps};
use crate::utxo::utxo_common::UtxoTxBuilder;
use crate::utxo::utxo_common_tests::{self, utxo_coin_fields_for_test, utxo_coin_from_fields, TEST_COIN_DECIMALS,
TEST_COIN_NAME};
use crate::utxo::utxo_sql_block_header_storage::SqliteBlockHeadersStorage;
use crate::utxo::utxo_standard::{utxo_standard_coin_with_priv_key, UtxoStandardCoin};
use crate::utxo::utxo_tx_history_v2::{UtxoTxDetailsParams, UtxoTxHistoryOps};
#[cfg(not(target_arch = "wasm32"))] use crate::WithdrawFee;
Expand All @@ -30,11 +29,12 @@ use common::executor::Timer;
use common::{block_on, now_ms, OrdRange, PagingOptionsEnum, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::{privkey::key_pair_from_seed, Bip44Chain, RpcDerivationPath, Secp256k1Secret};
use db_common::sqlite::rusqlite::Connection;
use futures::channel::mpsc::channel;
use futures::future::join_all;
use futures::TryFutureExt;
use mm2_core::mm_ctx::MmCtxBuilder;
use mm2_number::bigdecimal::{BigDecimal, Signed};
use mm2_test_helpers::for_tests::{MORTY_ELECTRUM_ADDRS, RICK_ELECTRUM_ADDRS};
use mm2_test_helpers::for_tests::{mm_ctx_with_custom_db, MORTY_ELECTRUM_ADDRS, RICK_ELECTRUM_ADDRS};
use mocktopus::mocking::*;
use rpc::v1::types::H256 as H256Json;
use serialization::{deserialize, CoinVariant};
Expand Down Expand Up @@ -934,7 +934,6 @@ fn test_spv_proof() {
.as_slice(),
)
.unwrap();

let mut headers = HashMap::new();
headers.insert(452248, header);
let storage = client.block_headers_storage();
Expand Down Expand Up @@ -4256,3 +4255,97 @@ fn test_utxo_validate_valid_and_invalid_pubkey() {
assert!(coin.validate_other_pubkey(&[1u8; 20]).is_err());
assert!(coin.validate_other_pubkey(&[1u8; 8]).is_err());
}

#[test]
fn test_block_header_utxo_loop() {
use crate::utxo::utxo_builder::{block_header_utxo_loop, BlockHeaderUtxoLoopExtraArgs};
use futures::future::{Either, FutureExt};
use keys::hash::H256 as H256Json;

static mut CURRENT_BLOCK_COUNT: u64 = 13;

ElectrumClient::get_block_count
.mock_safe(move |_| MockResult::Return(Box::new(futures01::future::ok(unsafe { CURRENT_BLOCK_COUNT }))));
let expected_steps: Arc<Mutex<Vec<(u64, u64)>>> = Arc::new(Mutex::new(vec![]));
let expected_steps_to_move = expected_steps.clone();

ElectrumClient::retrieve_headers.mock_safe(move |this, from, to| {
let (expected_from, expected_to) = expected_steps_to_move.lock().unwrap().remove(0);
assert_eq!(from, expected_from);
assert_eq!(to, expected_to);
MockResult::Continue((this, from, to))
});

BlockHeaderUtxoLoopExtraArgs::default.mock_safe(move || {
MockResult::Return(BlockHeaderUtxoLoopExtraArgs {
chunk_size: 4,
error_sleep: 1.,
success_sleep: 1.,
})
});

let ctx = mm_ctx_with_custom_db();
let priv_key_policy = PrivKeyBuildPolicy::IguanaPrivKey(IguanaPrivKey::from(H256Json::from([1u8; 32])));
let servers: Vec<_> = RICK_ELECTRUM_ADDRS
.iter()
.map(|server| json!({ "url": server }))
.collect();
let req = json!({ "method": "electrum", "servers": servers });
let params = UtxoActivationParams::from_legacy_req(&req).unwrap();
let conf = json!({"coin":"RICK", "asset":"RICK", "rpcport":8923, "enable_spv_proof": false});
let builder = UtxoArcBuilder::new(&ctx, "RICK", &conf, &params, priv_key_policy, UtxoStandardCoin::from);
let arc: UtxoArc = block_on(builder.build_utxo_fields()).unwrap().into();
let client = match &arc.rpc_client {
UtxoRpcClientEnum::Electrum(electrum) => electrum.clone(),
UtxoRpcClientEnum::Native(_) => unreachable!(),
};

let (sync_status_notifier, _) = channel::<UtxoSyncStatus>(1);
let loop_handle = UtxoSyncStatusLoopHandle::new(sync_status_notifier);

let loop_fut = async move {
unsafe {
block_header_utxo_loop(
arc.downgrade(),
UtxoStandardCoin::from,
loop_handle,
CURRENT_BLOCK_COUNT,
)
.await
}
};

let test_fut = async move {
*expected_steps.lock().unwrap() = vec![(1, 4), (5, 8), (9, 12), (13, 13)];
unsafe { CURRENT_BLOCK_COUNT = 13 }
Timer::sleep(2.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(get_headers_count, 13);
assert!(expected_steps.lock().unwrap().is_empty());

*expected_steps.lock().unwrap() = vec![(14, 17)];
unsafe { CURRENT_BLOCK_COUNT = 17 }
Timer::sleep(2.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(get_headers_count, 17);
assert!(expected_steps.lock().unwrap().is_empty());

*expected_steps.lock().unwrap() = vec![(18, 18)];
unsafe { CURRENT_BLOCK_COUNT = 18 }
Timer::sleep(2.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(get_headers_count, 18);
assert!(expected_steps.lock().unwrap().is_empty());

*expected_steps.lock().unwrap() = vec![(19, 22), (23, 25)];
unsafe { CURRENT_BLOCK_COUNT = 25 }
Timer::sleep(3.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(get_headers_count, 25);
assert!(expected_steps.lock().unwrap().is_empty());
};

if let Either::Left(_) = block_on(futures::future::select(loop_fut.boxed(), test_fut.boxed())) {
panic!("Loop shouldn't stop")
};
}
Loading