Skip to content

Commit

Permalink
Wrap ChainSyncer::state in an Arc<Mutex> and set it to Follow accordi…
Browse files Browse the repository at this point in the history
…ngly (#914)
  • Loading branch information
timvermeulen authored Jan 7, 2021
1 parent adfbe85 commit 05b282d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
16 changes: 8 additions & 8 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::sync_state::SyncState;
use super::sync_worker::SyncWorker;
use super::{Error, SyncNetworkContext};
use amt::Amt;
use async_std::sync::{channel, Receiver, RwLock, Sender};
use async_std::sync::{channel, Mutex, Receiver, RwLock, Sender};
use async_std::task::{self, JoinHandle};
use beacon::Beacon;
use blocks::{Block, FullTipset, GossipBlock, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -38,7 +38,7 @@ use std::sync::Arc;
type WorkerState = Arc<RwLock<Vec<Arc<RwLock<SyncState>>>>>;

#[derive(Debug, PartialEq)]
enum ChainSyncState {
pub enum ChainSyncState {
/// Bootstrapping peers before starting sync.
Bootstrap,
/// Syncing chain with ChainExchange protocol.
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Default for SyncConfig {
/// messages to be able to do the initial sync.
pub struct ChainSyncer<DB, TBeacon, V, M> {
/// State of general `ChainSync` protocol.
state: ChainSyncState,
state: Arc<Mutex<ChainSyncState>>,

/// Syncing state of chain sync workers.
worker_state: WorkerState,
Expand Down Expand Up @@ -142,7 +142,7 @@ where
);

Ok(Self {
state: ChainSyncState::Bootstrap,
state: Arc::new(Mutex::new(ChainSyncState::Bootstrap)),
worker_state: Default::default(),
beacon,
network,
Expand Down Expand Up @@ -221,7 +221,7 @@ where
.await
}
NetworkEvent::PubsubMessage { source, message } => {
if self.state != ChainSyncState::Follow {
if *self.state.lock().await != ChainSyncState::Follow {
// Ignore gossipsub events if not in following state
return;
}
Expand Down Expand Up @@ -375,7 +375,7 @@ where
verifier: PhantomData::<V>::default(),
req_window: self.sync_config.req_window,
}
.spawn(channel)
.spawn(channel, Arc::clone(&self.state))
.await
}

Expand Down Expand Up @@ -424,10 +424,10 @@ where
.await;

// Only update target on initial sync
if self.state == ChainSyncState::Bootstrap {
if *self.state.lock().await == ChainSyncState::Bootstrap {
if let Some(best_target) = self.select_sync_target().await {
self.schedule_tipset(best_target).await;
self.state = ChainSyncState::Initial;
*self.state.lock().await = ChainSyncState::Initial;
return;
}
}
Expand Down
21 changes: 14 additions & 7 deletions blockchain/chain_sync/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ mod full_sync_test;
#[cfg(test)]
mod validate_block_test;

use super::bad_block_cache::BadBlockCache;
use super::sync_state::{SyncStage, SyncState};
use super::{bad_block_cache::BadBlockCache, sync::ChainSyncState};
use super::{Error, SyncNetworkContext};
use actor::{is_account_actor, power};
use address::Address;
use amt::Amt;
use async_std::sync::{Receiver, RwLock};
use async_std::sync::{Mutex, Receiver, RwLock};
use async_std::task::{self, JoinHandle};
use beacon::{Beacon, BeaconEntry, IGNORE_DRAND_VAR};
use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -77,13 +77,20 @@ where
self.state_manager.chain_store()
}

pub async fn spawn(self, mut inbound_channel: Receiver<Arc<Tipset>>) -> JoinHandle<()> {
pub async fn spawn(
self,
mut inbound_channel: Receiver<Arc<Tipset>>,
state: Arc<Mutex<ChainSyncState>>,
) -> JoinHandle<()> {
task::spawn(async move {
while let Some(ts) = inbound_channel.next().await {
if let Err(e) = self.sync(ts).await {
let err = e.to_string();
warn!("failed to sync tipset: {}", &err);
self.state.write().await.error(err);
match self.sync(ts).await {
Ok(()) => *state.lock().await = ChainSyncState::Follow,
Err(e) => {
let err = e.to_string();
warn!("failed to sync tipset: {}", &err);
self.state.write().await.error(err);
}
}
}
})
Expand Down

0 comments on commit 05b282d

Please sign in to comment.