Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] fixed block_header_utxo_loop #1506

Merged
merged 9 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mm2src/coins/utxo/utxo_block_header_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl BlockHeaderStorageOps for BlockHeaderStorage {
&self,
headers: HashMap<u64, BlockHeader>,
) -> Result<(), BlockHeaderStorageError> {
// println!("{:?}", &headers);
self.inner.add_block_headers_to_storage(headers).await
}

Expand Down
48 changes: 21 additions & 27 deletions mm2src/coins/utxo/utxo_builder/utxo_arc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,7 @@ where

let result_coin = (self.constructor)(utxo_arc.clone());
if let Some(sync_status_loop_handle) = sync_status_loop_handle {
let current_block_height = result_coin
.current_block()
.compat()
.await
.map_to_mm(UtxoCoinBuildError::GetCurrentBlockHeightError)?;
self.spawn_block_header_utxo_loop(
&utxo_arc,
self.constructor.clone(),
sync_status_loop_handle,
current_block_height,
);
self.spawn_block_header_utxo_loop(&utxo_arc, self.constructor.clone(), sync_status_loop_handle);
}

Ok(result_coin)
Expand Down Expand Up @@ -224,7 +214,6 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
weak: UtxoWeak,
constructor: impl Fn(UtxoArc) -> T,
mut sync_status_loop_handle: UtxoSyncStatusLoopHandle,
mut last_block_height: u64,
) {
let mut chunk_size = ELECTRUM_MAX_CHUNK_SIZE;
while let Some(arc) = weak.upgrade() {
Expand All @@ -245,25 +234,31 @@ async fn block_header_utxo_loop<T: UtxoCommonOps>(
},
};

let total_block_height = match coin.as_ref().rpc_client.get_block_count().compat().await {
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the latest block from rpc!", e);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.0).await;
continue;
},
};

let to_block_height = from_block_height + chunk_size;
let to_block_height = if to_block_height > total_block_height {
total_block_height
} else {
to_block_height
};

// Todo: Add code for the case if a chain reorganization happens
if from_block_height == last_block_height {
sync_status_loop_handle.notify_sync_finished(last_block_height);
last_block_height = match coin.as_ref().rpc_client.get_block_count().compat().await {
Ok(h) => h,
Err(e) => {
error!("Error {} on getting the height of the latest block from rpc!", e);
sync_status_loop_handle.notify_on_temp_error(e.to_string());
Timer::sleep(10.0).await;
continue;
},
shamardy marked this conversation as resolved.
Show resolved Hide resolved
};
if from_block_height == total_block_height {
sync_status_loop_handle.notify_sync_finished(to_block_height);
Timer::sleep(BLOCK_HEADERS_LOOP_INTERVAL).await;
continue;
}

let to_block_height = from_block_height + chunk_size;
sync_status_loop_handle.notify_blocks_headers_sync_status(from_block_height + 1, to_block_height);

let mut fetch_blocker_headers_attempts = FETCH_BLOCK_HEADERS_ATTEMPTS;
let (block_registry, block_headers) = match client
.retrieve_headers(from_block_height + 1, to_block_height)
Expand Down Expand Up @@ -328,7 +323,6 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
utxo_arc: &UtxoArc,
constructor: F,
sync_status_loop_handle: UtxoSyncStatusLoopHandle,
current_block_height: u64,
) where
F: Fn(UtxoArc) -> T + Send + Sync + 'static,
T: UtxoCommonOps,
Expand All @@ -337,7 +331,7 @@ pub trait BlockHeaderUtxoArcOps<T>: UtxoCoinBuilderCommonOps {
info!("Starting UTXO block header loop for coin {ticker}");

let utxo_weak = utxo_arc.downgrade();
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle, current_block_height);
let fut = block_header_utxo_loop(utxo_weak, constructor, sync_status_loop_handle);

let settings = AbortSettings::info_on_abort(format!("spawn_block_header_utxo_loop stopped for {ticker}"));
utxo_arc
Expand Down
1 change: 0 additions & 1 deletion mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ pub enum UtxoCoinBuildError {
HwContextNotInitialized,
HDWalletStorageError(HDWalletStorageError),
CoinDoesntSupportTrezor,
GetCurrentBlockHeightError(String),
BlockHeaderStorageError(BlockHeaderStorageError),
#[display(fmt = "Internal error: {}", _0)]
Internal(String),
Expand Down
175 changes: 175 additions & 0 deletions mm2src/coins/utxo/utxo_sql_block_header_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,181 @@ impl BlockHeaderStorageOps for SqliteBlockHeadersStorage {
}
}

#[cfg(test)]
#[derive(Debug, Clone)]
pub struct BlockHeaderInMemoryStorage {
pub ticker: String,
pub conn: Arc<Mutex<Connection>>,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified significantly since BlockHeaderInMemoryStorage is used in tests only:

pub struct BlockHeaderInMemoryStorage {
    pub ticker: String,
    // The block headers should be ordered to be able to return the last block header.
    pub block_headers: BTreeMap<u64, BlockHeader>,
    // This can be used on [`BlockHeaderStorageOps::get_block_height_by_hash`].
    pub block_headers_by_hash: HashMap<H256, BlockHeader>,
}

Please also note that get_last_block_height should be configurable from a test. For example, at the beginning of a test you can just upload one block header with a specified height. Also there could be a field like BlockHeaderInMemoryStorage::last_block_height.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted for next PR. THANKS


#[cfg(test)]
#[async_trait]
impl BlockHeaderStorageOps for BlockHeaderInMemoryStorage {
async fn init(&self) -> Result<(), BlockHeaderStorageError> {
let coin = self.ticker.clone();
let selfi = self.clone();
let sql_cache = create_block_header_cache_table_sql(&coin)?;
async_blocking(move || {
let conn = selfi.conn.lock().unwrap();
conn.execute(&sql_cache, NO_PARAMS).map(|_| ()).map_err(|e| {
BlockHeaderStorageError::InitializationError {
coin,
reason: e.to_string(),
}
})?;
Ok(())
})
.await
}

async fn is_initialized_for(&self) -> Result<bool, BlockHeaderStorageError> {
let coin = self.ticker.clone();
let block_headers_cache_table = get_table_name_and_validate(&coin)?;
let selfi = self.clone();
async_blocking(move || {
let conn = selfi.conn.lock().unwrap();
let cache_initialized = query_single_row(
&conn,
CHECK_TABLE_EXISTS_SQL,
[block_headers_cache_table],
string_from_row,
)?;
Ok(cache_initialized.is_some())
})
.await
}

async fn add_block_headers_to_storage(
&self,
headers: HashMap<u64, BlockHeader>,
) -> Result<(), BlockHeaderStorageError> {
let coin = self.ticker.clone();
let selfi = self.clone();
println!("HIII {:?}", &headers);

async_blocking(move || {
let mut conn = selfi.conn.lock().unwrap();
let sql_transaction = conn
.transaction()
.map_err(|e| BlockHeaderStorageError::AddToStorageError {
coin: coin.clone(),
reason: e.to_string(),
})?;

for (height, header) in headers {
let height = height as i64;
let hash = header.hash().reversed().to_string();
let raw_header = hex::encode(header.raw());
let bits: u32 = header.bits.into();
let block_cache_params = [
&height as &dyn ToSql,
&raw_header as &dyn ToSql,
&bits as &dyn ToSql,
&hash as &dyn ToSql,
];
sql_transaction
.execute(&insert_block_header_in_cache_sql(&coin.clone())?, block_cache_params)
.map_err(|e| BlockHeaderStorageError::AddToStorageError {
coin: coin.clone(),
reason: e.to_string(),
})?;
}
sql_transaction
.commit()
.map_err(|e| BlockHeaderStorageError::AddToStorageError {
coin: coin.clone(),
reason: e.to_string(),
})?;
Ok(())
})
.await
}

async fn get_block_header(&self, height: u64) -> Result<Option<BlockHeader>, BlockHeaderStorageError> {
let coin = self.ticker.clone();
if let Some(header_raw) = self.get_block_header_raw(height).await? {
let serialized = &hex::decode(header_raw).map_err(|e| BlockHeaderStorageError::DecodeError {
coin: coin.clone(),
reason: e.to_string(),
})?;
let mut reader = Reader::new_with_coin_variant(serialized, coin.as_str().into());
let header: BlockHeader =
reader
.read()
.map_err(|e: serialization::Error| BlockHeaderStorageError::DecodeError {
coin,
reason: e.to_string(),
})?;
return Ok(Some(header));
}
Ok(None)
}

async fn get_block_header_raw(&self, height: u64) -> Result<Option<String>, BlockHeaderStorageError> {
let coin = self.ticker.clone();
let params = [height as i64];
let sql = get_block_header_by_height(&coin)?;
let selfi = self.clone();

async_blocking(move || {
let conn = selfi.conn.lock().unwrap();
query_single_row(&conn, &sql, params, string_from_row)
})
.await
.map_err(|e| BlockHeaderStorageError::GetFromStorageError {
coin,
reason: e.to_string(),
})
}

async fn get_last_block_height(&self) -> Result<u64, BlockHeaderStorageError> { Ok(10250) }

async fn get_last_block_header_with_non_max_bits(&self) -> Result<Option<BlockHeader>, BlockHeaderStorageError> {
let coin = self.ticker.clone();
let sql = get_last_block_header_with_non_max_bits_sql(&coin)?;
let selfi = self.clone();

let maybe_header_raw = async_blocking(move || {
let conn = selfi.conn.lock().unwrap();
query_single_row(&conn, &sql, NO_PARAMS, string_from_row)
})
.await
.map_err(|e| BlockHeaderStorageError::GetFromStorageError {
coin: coin.clone(),
reason: e.to_string(),
})?;

if let Some(header_raw) = maybe_header_raw {
let header: BlockHeader =
header_raw
.try_into()
.map_err(|e: serialization::Error| BlockHeaderStorageError::DecodeError {
coin,
reason: e.to_string(),
})?;
return Ok(Some(header));
}
Ok(None)
}

async fn get_block_height_by_hash(&self, hash: H256) -> Result<Option<i64>, BlockHeaderStorageError> {
let coin = self.ticker.clone();
let params = [hash.to_string()];
let sql = get_block_height_by_hash(&coin)?;
let selfi = self.clone();

async_blocking(move || {
let conn = selfi.conn.lock().unwrap();
query_single_row(&conn, &sql, params, |row| row.get(0))
})
.await
.map_err(|e| BlockHeaderStorageError::GetFromStorageError {
coin,
reason: e.to_string(),
})
}
}

#[cfg(test)]
impl SqliteBlockHeadersStorage {
pub fn in_memory(ticker: String) -> Self {
Expand Down
Loading