Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Block header UTXO Loop Test Impl #1519

Merged
merged 22 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions mm2src/coins/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ use crate::hd_wallet_storage::{HDAccountStorageItem, HDWalletCoinStorage, HDWall
use crate::utxo::tx_cache::UtxoVerboseCacheShared;

pub mod tx_cache;
#[cfg(target_arch = "wasm32")]
pub mod utxo_indexedb_block_header_storage;
#[cfg(not(target_arch = "wasm32"))]
pub mod utxo_sql_block_header_storage;

#[cfg(any(test, target_arch = "wasm32"))]
pub mod utxo_common_tests;
Expand Down
6 changes: 5 additions & 1 deletion mm2src/coins/utxo/rpc_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,7 @@ impl JsonRpcMultiClient for ElectrumClient {
}
}

#[cfg_attr(test, mockable)]
impl ElectrumClient {
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
/// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#server-ping
pub fn server_ping(&self) -> RpcRes<()> { rpc_func!(self, "server.ping") }
Expand Down Expand Up @@ -2010,9 +2011,12 @@ impl ElectrumClient {
// get_tx_height_from_storage is always preferred to be used instead of this, but if there is no headers in storage (storing headers is not enabled)
// this function can be used instead
async fn get_tx_height_from_rpc(&self, tx: &UtxoTx) -> Result<u64, GetTxHeightError> {
// Refactor scripthash_get_history to avoid `capturing dynamic environment in a fn item" when using mockable
// attribute
let scripthash_get_history = |s: String| async move { self.scripthash_get_history(s.as_str()).compat().await };
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
for output in tx.outputs.clone() {
let script_pubkey_str = hex::encode(electrum_script_hash(&output.script_pubkey));
if let Ok(history) = self.scripthash_get_history(script_pubkey_str.as_str()).compat().await {
if let Ok(history) = scripthash_get_history(script_pubkey_str.to_string()).await {
if let Some(item) = history
.into_iter()
.find(|item| item.tx_hash.reversed() == H256Json(*tx.hash()) && item.height > 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
#[cfg(target_arch = "wasm32")] mod indexedb_block_header_storage;
#[cfg(target_arch = "wasm32")]
use crate::utxo::utxo_indexedb_block_header_storage::IndexedDBBlockHeadersStorage;
pub use indexedb_block_header_storage::IndexedDBBlockHeadersStorage;

#[cfg(not(target_arch = "wasm32"))] mod sql_block_header_storage;
#[cfg(not(target_arch = "wasm32"))]
use crate::utxo::utxo_sql_block_header_storage::SqliteBlockHeadersStorage;
pub use sql_block_header_storage::SqliteBlockHeadersStorage;

use async_trait::async_trait;
use chain::BlockHeader;
use mm2_core::mm_ctx::MmArc;
#[cfg(all(test, not(target_arch = "wasm32")))]
use mocktopus::macros::*;
use primitives::hash::H256;
use spv_validation::storage::{BlockHeaderStorageError, BlockHeaderStorageOps};
use std::collections::HashMap;
Expand Down Expand Up @@ -54,6 +60,7 @@ impl BlockHeaderStorage {
}

#[async_trait]
#[cfg_attr(all(test, not(target_arch = "wasm32")), mockable)]
impl BlockHeaderStorageOps for BlockHeaderStorage {
async fn init(&self) -> Result<(), BlockHeaderStorageError> { self.inner.init().await }

Expand Down
3 changes: 3 additions & 0 deletions mm2src/coins/utxo/utxo_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ pub use utxo_coin_builder::{UtxoCoinBuildError, UtxoCoinBuildResult, UtxoCoinBui
UtxoFieldsWithGlobalHDBuilder, UtxoFieldsWithHardwareWalletBuilder,
UtxoFieldsWithIguanaSecretBuilder};
pub use utxo_conf_builder::{UtxoConfBuilder, UtxoConfError, UtxoConfResult};

#[cfg(test)]
pub(crate) use utxo_arc_builder::{block_header_utxo_loop, BlockHeaderUtxoLoopExtraArgs};
42 changes: 28 additions & 14 deletions mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ use common::log::{error, info, warn};
use futures::compat::Future01CompatExt;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
#[cfg(test)] use mocktopus::macros::*;
use script::Builder;
use serde_json::Value as Json;
use spv_validation::helpers_validation::validate_headers;
use spv_validation::storage::BlockHeaderStorageOps;

const BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER: f64 = 60.;
const BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER: f64 = 10.;
const FETCH_BLOCK_HEADERS_ATTEMPTS: u64 = 3;
const CHUNK_SIZE_REDUCER_VALUE: u64 = 100;
const ELECTRUM_MAX_CHUNK_SIZE: u64 = 2016;

pub struct UtxoArcBuilder<'a, F, T>
where
Expand Down Expand Up @@ -229,13 +227,31 @@ pub trait MergeUtxoArcOps<T: UtxoCommonOps + GetUtxoListOps>: UtxoCoinBuilderCom
}
}

async fn block_header_utxo_loop<T: UtxoCommonOps>(
pub(crate) struct BlockHeaderUtxoLoopExtraArgs {
pub(crate) chunk_size: u64,
pub(crate) error_sleep: f64,
pub(crate) success_sleep: f64,
}

#[cfg_attr(test, mockable)]
impl Default for BlockHeaderUtxoLoopExtraArgs {
fn default() -> Self {
Self {
chunk_size: 2016,
error_sleep: 10.,
success_sleep: 60.,
}
}
}
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved

pub(crate) async fn block_header_utxo_loop<T: UtxoCommonOps>(
weak: UtxoWeak,
constructor: impl Fn(UtxoArc) -> T,
mut sync_status_loop_handle: UtxoSyncStatusLoopHandle,
mut block_count: u64,
) {
let mut chunk_size = ELECTRUM_MAX_CHUNK_SIZE;
let args = BlockHeaderUtxoLoopExtraArgs::default();
let mut chunk_size = args.chunk_size;
while let Some(arc) = weak.upgrade() {
let coin = constructor(arc);
let ticker = coin.as_ref().conf.ticker.as_str();
Expand All @@ -250,7 +266,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Err(e) => {
error!("Error {e:?} on getting the height of the last stored {ticker} header in DB!",);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
},
};
Expand All @@ -262,7 +278,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
Err(e) => {
error!("Error {e:} on getting the height of the latest {ticker} block from rpc!");
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
},
};
Expand All @@ -278,7 +294,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
// Todo: Add code for the case if a chain reorganization happens
if from_block_height == block_count {
sync_status_loop_handle.notify_sync_finished(to_block_height);
Timer::sleep(BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER).await;
Timer::sleep(args.success_sleep).await;
continue;
}

Expand All @@ -295,7 +311,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
if error.get_inner().is_network_error() {
log!("Network Error: Will try fetching {ticker} block headers again after 10 secs");
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
};

Expand All @@ -310,7 +326,7 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
error!("Error {error:?} on retrieving latest {ticker} headers from rpc! {fetch_blocker_headers_attempts} attempts left");
// Todo: remove this electrum server and use another in this case since the headers from this server can't be retrieved
sync_status_loop_handle.notify_on_temp_error(error.to_string());
Timer::sleep(BLOCK_HEADERS_LOOP_ERORR_SLEEP_TIMER).await;
Timer::sleep(args.error_sleep).await;
continue;
};

Expand All @@ -333,10 +349,8 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
}
};

ok_or_continue_after_sleep!(
storage.add_block_headers_to_storage(block_registry).await,
BLOCK_HEADERS_LOOP_SUCCESS_SLEEP_TIMER
);
let sleep = args.success_sleep;
ok_or_continue_after_sleep!(storage.add_block_headers_to_storage(block_registry).await, sleep);
}
}

Expand Down
105 changes: 100 additions & 5 deletions mm2src/coins/utxo/utxo_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use crate::utxo::rpc_clients::{BlockHashOrHeight, ElectrumBalance, ElectrumClien
GetAddressInfoRes, ListSinceBlockRes, NativeClient, NativeClientImpl, NativeUnspent,
NetworkInfo, UtxoRpcClientOps, ValidateAddressRes, VerboseBlock};
use crate::utxo::spv::SimplePaymentVerification;
use crate::utxo::utxo_block_header_storage::BlockHeaderStorage;
use crate::utxo::utxo_builder::{UtxoArcBuilder, UtxoCoinBuilderCommonOps};
use crate::utxo::utxo_block_header_storage::{BlockHeaderStorage, SqliteBlockHeadersStorage};
use crate::utxo::utxo_builder::{UtxoArcBuilder, UtxoCoinBuilder, UtxoCoinBuilderCommonOps};
use crate::utxo::utxo_common::UtxoTxBuilder;
use crate::utxo::utxo_common_tests::{self, utxo_coin_fields_for_test, utxo_coin_from_fields, TEST_COIN_DECIMALS,
TEST_COIN_NAME};
use crate::utxo::utxo_sql_block_header_storage::SqliteBlockHeadersStorage;
use crate::utxo::utxo_standard::{utxo_standard_coin_with_priv_key, UtxoStandardCoin};
use crate::utxo::utxo_tx_history_v2::{UtxoTxDetailsParams, UtxoTxHistoryOps};
#[cfg(not(target_arch = "wasm32"))] use crate::WithdrawFee;
Expand All @@ -30,13 +29,15 @@ use common::executor::Timer;
use common::{block_on, now_ms, OrdRange, PagingOptionsEnum, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::{privkey::key_pair_from_seed, Bip44Chain, RpcDerivationPath, Secp256k1Secret};
use db_common::sqlite::rusqlite::Connection;
use futures::channel::mpsc::channel;
use futures::future::join_all;
use futures::TryFutureExt;
use mm2_core::mm_ctx::MmCtxBuilder;
use mm2_number::bigdecimal::{BigDecimal, Signed};
use mm2_test_helpers::for_tests::{MORTY_ELECTRUM_ADDRS, RICK_ELECTRUM_ADDRS};
use mm2_test_helpers::for_tests::{mm_ctx_with_custom_db, MORTY_ELECTRUM_ADDRS, RICK_ELECTRUM_ADDRS};
use mocktopus::mocking::*;
use rpc::v1::types::H256 as H256Json;
use serde_json::Value;
use serialization::{deserialize, CoinVariant};
use spv_validation::storage::BlockHeaderStorageOps;
use std::convert::TryFrom;
Expand Down Expand Up @@ -934,7 +935,6 @@ fn test_spv_proof() {
.as_slice(),
)
.unwrap();

let mut headers = HashMap::new();
headers.insert(452248, header);
let storage = client.block_headers_storage();
Expand Down Expand Up @@ -4256,3 +4256,98 @@ fn test_utxo_validate_valid_and_invalid_pubkey() {
assert!(coin.validate_other_pubkey(&[1u8; 20]).is_err());
assert!(coin.validate_other_pubkey(&[1u8; 8]).is_err());
}

#[test]
fn test_block_header_utxo_loop() {
use crate::utxo::utxo_builder::{block_header_utxo_loop, BlockHeaderUtxoLoopExtraArgs};
use futures::future::{Either, FutureExt};
use keys::hash::H256 as H256Json;

static mut CURRENT_BLOCK_COUNT: u64 = 13;

ElectrumClient::get_block_count
.mock_safe(move |_| MockResult::Return(Box::new(futures01::future::ok(unsafe { CURRENT_BLOCK_COUNT }))));
let expected_steps: Arc<Mutex<Vec<(u64, u64)>>> = Arc::new(Mutex::new(vec![]));
let expected_steps_to_move = expected_steps.clone();

ElectrumClient::retrieve_headers.mock_safe(move |this, from, count| {
let (expected_from, expected_count) = expected_steps_to_move.lock().unwrap().remove(0);
assert_eq!(from, expected_from);
assert_eq!(count, expected_count);
MockResult::Continue((this, from, count))
});
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved

BlockHeaderUtxoLoopExtraArgs::default.mock_safe(move || {
MockResult::Return(BlockHeaderUtxoLoopExtraArgs {
chunk_size: 4,
error_sleep: 5.,
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
success_sleep: 1.,
})
});

let ctx = mm_ctx_with_custom_db();
let priv_key_policy = PrivKeyBuildPolicy::IguanaPrivKey(IguanaPrivKey::from(H256Json::from([1u8; 32])));
let servers: Vec<_> = RICK_ELECTRUM_ADDRS
.iter()
.map(|server| json!({ "url": server }))
.collect();
let servers: Vec<Value> = servers.into_iter().map(|s| json::from_value(s).unwrap()).collect();
shamardy marked this conversation as resolved.
Show resolved Hide resolved
let req = json!({ "method": "electrum", "servers": servers });
let params = UtxoActivationParams::from_legacy_req(&req).unwrap();
let conf = json!({"coin":"RICK", "asset":"RICK", "rpcport":8923, "enable_spv_proof": false});
let builder = UtxoArcBuilder::new(&ctx, "RICK", &conf, &params, priv_key_policy, UtxoStandardCoin::from);
let arc: UtxoArc = block_on(builder.build_utxo_fields()).unwrap().into();
let client = match &arc.rpc_client {
UtxoRpcClientEnum::Electrum(electrum) => electrum.clone(),
UtxoRpcClientEnum::Native(_) => unreachable!(),
};

let (sync_status_notifier, _) = channel::<UtxoSyncStatus>(1);
let loop_handle = UtxoSyncStatusLoopHandle::new(sync_status_notifier);

let loop_fut = async move {
unsafe {
block_header_utxo_loop(
arc.downgrade(),
UtxoStandardCoin::from,
loop_handle,
CURRENT_BLOCK_COUNT,
)
.await
}
};

let test_fut = async move {
*expected_steps.lock().unwrap() = vec![(1, 4), (5, 8), (9, 12), (13, 13)];
unsafe { CURRENT_BLOCK_COUNT = 13 }
Timer::sleep(2.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(get_headers_count, 13);
assert!(expected_steps.lock().unwrap().is_empty());

*expected_steps.lock().unwrap() = vec![(14, 17)];
unsafe { CURRENT_BLOCK_COUNT = 17 }
Timer::sleep(2.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(17, get_headers_count);
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
assert!(expected_steps.lock().unwrap().is_empty());

unsafe { CURRENT_BLOCK_COUNT = 18 }
*expected_steps.lock().unwrap() = vec![(18, 18)];
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
Timer::sleep(2.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(18, get_headers_count);
assert!(expected_steps.lock().unwrap().is_empty());

unsafe { CURRENT_BLOCK_COUNT = 25 }
*expected_steps.lock().unwrap() = vec![(19, 22), (23, 25)];
Timer::sleep(3.).await;
let get_headers_count = client.block_headers_storage().get_last_block_height().await.unwrap();
assert_eq!(25, get_headers_count);
assert!(expected_steps.lock().unwrap().is_empty());
};

if let Either::Left(_) = block_on(futures::future::select(loop_fut.boxed(), test_fut.boxed())) {
panic!("Loop shouldn't stop")
};
}
3 changes: 1 addition & 2 deletions mm2src/coins/utxo/utxo_wasm_tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::rpc_clients::{ElectrumClient, ElectrumClientImpl, ElectrumProtocol};
use super::*;
use crate::utxo::rpc_clients::UtxoRpcClientOps;
use crate::utxo::utxo_block_header_storage::BlockHeaderStorage;
use crate::utxo::utxo_block_header_storage::{BlockHeaderStorage, IndexedDBBlockHeadersStorage};
use crate::utxo::utxo_common_tests;
use crate::utxo::utxo_indexedb_block_header_storage::IndexedDBBlockHeadersStorage;
use common::executor::Timer;
use serialization::deserialize;
use wasm_bindgen_test::*;
Expand Down