Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(zcoin): syncing and activation improvements #2089

Merged
merged 5 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mm2src/coins/z_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ impl ZCoin {
.await
}

#[inline]
pub async fn first_sync_block(&self) -> Result<FirstSyncBlock, MmError<BlockchainScanStopped>> {
self.z_fields.sync_state_connector.lock().await.first_sync_block().await
}

#[inline]
fn secp_keypair(&self) -> &KeyPair {
self.utxo_arc
Expand Down
62 changes: 43 additions & 19 deletions mm2src/coins/z_coin/z_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,29 +552,31 @@ pub(super) async fn init_light_client<'a>(
blocks_db.rewind_to_height(u32::MIN.into()).await?;
};

let first_sync_block = FirstSyncBlock {
requested: sync_height,
is_pre_sapling: sync_height < sapling_activation_height,
actual: sync_height.max(sapling_activation_height),
};
let sync_handle = SaplingSyncLoopHandle {
coin,
current_block: BlockHeight::from_u32(0),
blocks_db,
wallet_db: wallet_db.clone(),
consensus_params: builder.protocol_info.consensus_params.clone(),
sync_status_notifier,
main_sync_state_finished: false,
on_tx_gen_watcher,
watch_for_tx: None,
scan_blocks_per_iteration: builder.z_coin_params.scan_blocks_per_iteration,
scan_interval_ms: builder.z_coin_params.scan_interval_ms,
first_sync_block: FirstSyncBlock {
requested: sync_height,
is_pre_sapling: sync_height < sapling_activation_height,
actual: sync_height.max(sapling_activation_height),
},
first_sync_block: first_sync_block.clone(),
z_balance_event_sender,
};

let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(light_rpc_clients)));

Ok((
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle),
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle, first_sync_block),
wallet_db,
))
}
Expand All @@ -590,6 +592,7 @@ pub(super) async fn init_native_client<'a>(
let coin = builder.ticker.to_string();
let (sync_status_notifier, sync_watcher) = channel(1);
let (on_tx_gen_notifier, on_tx_gen_watcher) = channel(1);

let checkpoint_block = builder.protocol_info.check_point_block.clone();
let sapling_height = builder.protocol_info.consensus_params.sapling_activation_height;
let checkpoint_height = checkpoint_block.clone().map(|b| b.height).unwrap_or(sapling_height) as u64;
Expand All @@ -609,17 +612,18 @@ pub(super) async fn init_native_client<'a>(
wallet_db: wallet_db.clone(),
consensus_params: builder.protocol_info.consensus_params.clone(),
sync_status_notifier,
main_sync_state_finished: false,
on_tx_gen_watcher,
watch_for_tx: None,
scan_blocks_per_iteration: builder.z_coin_params.scan_blocks_per_iteration,
scan_interval_ms: builder.z_coin_params.scan_interval_ms,
first_sync_block,
first_sync_block: first_sync_block.clone(),
z_balance_event_sender,
};
let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(native_client)));

Ok((
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle),
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle, first_sync_block),
wallet_db,
))
}
Expand Down Expand Up @@ -661,20 +665,18 @@ impl SaplingSyncRespawnGuard {
/// - `TemporaryError(String)`: Represents a temporary error state, with an associated error message
/// providing details about the error.
/// - `Finishing`: Represents the finishing state of an operation.
#[derive(Debug)]
pub enum SyncStatus {
UpdatingBlocksCache {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
BuildingWalletDb {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
TemporaryError(String),
Finished {
first_sync_block: FirstSyncBlock,
block_number: u64,
},
}
Expand Down Expand Up @@ -706,6 +708,8 @@ pub struct SaplingSyncLoopHandle {
consensus_params: ZcoinConsensusParams,
/// Notifies about sync status without stopping the loop, e.g. on coin activation
sync_status_notifier: AsyncSender<SyncStatus>,
/// Signal to determine if main sync state is finished.
main_sync_state_finished: bool,
/// If new tx is required to be generated, we stop the sync and respawn it after tx is sent
/// This watcher waits for such notification
on_tx_gen_watcher: AsyncReceiver<OneshotSender<(Self, Box<dyn ZRpcOps>)>>,
Expand All @@ -717,39 +721,47 @@ pub struct SaplingSyncLoopHandle {
}

impl SaplingSyncLoopHandle {
fn first_sync_block(&self) -> FirstSyncBlock { self.first_sync_block.clone() }

#[inline]
fn notify_blocks_cache_status(&mut self, current_scanned_block: u64, latest_block: u64) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::UpdatingBlocksCache {
current_scanned_block,
latest_block,
first_sync_block: self.first_sync_block(),
})
.debug_log_with_msg("No one seems interested in SyncStatus");
}

fn notify_building_wallet_db(&mut self, current_scanned_block: u64, latest_block: u64) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::BuildingWalletDb {
current_scanned_block,
latest_block,
first_sync_block: self.first_sync_block(),
})
.debug_log_with_msg("No one seems interested in SyncStatus");
}

fn notify_on_error(&mut self, error: String) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::TemporaryError(error))
.debug_log_with_msg("No one seems interested in SyncStatus");
}

fn notify_sync_finished(&mut self) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::Finished {
block_number: self.current_block.into(),
first_sync_block: self.first_sync_block(),
})
.debug_log_with_msg("No one seems interested in SyncStatus");
}
Expand Down Expand Up @@ -781,7 +793,7 @@ impl SaplingSyncLoopHandle {
/// For more notes on the process, check https://github.com/zcash/librustzcash/blob/master/zcash_client_backend/src/data_api/chain.rs#L2
async fn scan_validate_and_update_blocks(&mut self) -> Result<(), MmError<ZcoinStorageError>> {
let blocks_db = self.blocks_db.clone();
let wallet_db = self.wallet_db.clone().db;
let wallet_db = self.wallet_db.db.clone();
let mut wallet_ops = wallet_db.get_update_ops().expect("get_update_ops always returns Ok");

if let Err(e) = blocks_db
Expand Down Expand Up @@ -918,6 +930,10 @@ async fn light_wallet_db_sync_loop(mut sync_handle: SaplingSyncLoopHandle, mut c
}
}

if !sync_handle.main_sync_state_finished {
sync_handle.main_sync_state_finished = true
}
shamardy marked this conversation as resolved.
Show resolved Hide resolved

Timer::sleep(10.).await;
}
}
Expand All @@ -929,23 +945,31 @@ pub(super) struct SaplingSyncConnector {
sync_watcher: SyncWatcher,
on_tx_gen_notifier: NewTxNotifier,
abort_handle: Arc<Mutex<AbortOnDropHandle>>,
first_sync_block: FirstSyncBlock,
}

impl SaplingSyncConnector {
#[allow(unused)]
#[inline]
pub(super) fn new_mutex_wrapped(
simple_sync_watcher: SyncWatcher,
sync_watcher: SyncWatcher,
on_tx_gen_notifier: NewTxNotifier,
abort_handle: AbortOnDropHandle,
first_sync_block: FirstSyncBlock,
) -> AsyncMutex<Self> {
AsyncMutex::new(SaplingSyncConnector {
sync_watcher: simple_sync_watcher,
sync_watcher,
on_tx_gen_notifier,
abort_handle: Arc::new(Mutex::new(abort_handle)),
first_sync_block,
})
}

#[inline]
pub(super) async fn first_sync_block(&self) -> Result<FirstSyncBlock, MmError<BlockchainScanStopped>> {
Ok(self.first_sync_block.clone())
}

#[inline]
pub(super) async fn current_sync_status(&mut self) -> Result<SyncStatus, MmError<BlockchainScanStopped>> {
self.sync_watcher.next().await.or_mm_err(|| BlockchainScanStopped {})
Expand Down
15 changes: 2 additions & 13 deletions mm2src/coins_activation/src/z_coin_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct ZcoinActivationResult {
pub ticker: String,
pub current_block: u64,
pub wallet_balance: CoinBalanceReport,
pub first_sync_block: Option<FirstSyncBlock>,
pub first_sync_block: FirstSyncBlock,
}

impl CurrentBlock for ZcoinActivationResult {
Expand Down Expand Up @@ -77,12 +77,10 @@ impl GetAddressesBalances for ZcoinActivationResult {
pub enum ZcoinInProgressStatus {
ActivatingCoin,
UpdatingBlocksCache {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
BuildingWalletDb {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
Expand Down Expand Up @@ -247,20 +245,16 @@ impl InitStandaloneCoinActivationOps for ZCoin {
loop {
let in_progress_status = match coin.sync_status().await? {
SyncStatus::UpdatingBlocksCache {
first_sync_block,
current_scanned_block,
latest_block,
} => ZcoinInProgressStatus::UpdatingBlocksCache {
first_sync_block,
current_scanned_block,
latest_block,
},
SyncStatus::BuildingWalletDb {
first_sync_block,
current_scanned_block,
latest_block,
} => ZcoinInProgressStatus::BuildingWalletDb {
first_sync_block,
current_scanned_block,
latest_block,
},
Expand All @@ -287,12 +281,7 @@ impl InitStandaloneCoinActivationOps for ZCoin {
.map_to_mm(ZcoinInitError::CouldNotGetBlockCount)?;

let balance = self.my_balance().compat().await?;
let first_sync_block = match self.sync_status().await? {
SyncStatus::Finished { first_sync_block, .. }
| SyncStatus::BuildingWalletDb { first_sync_block, .. }
| SyncStatus::UpdatingBlocksCache { first_sync_block, .. } => Some(first_sync_block),
_ => None,
};
let first_sync_block = self.first_sync_block().await?;

Ok(ZcoinActivationResult {
ticker: self.ticker().into(),
Expand Down
Loading