Skip to content

Commit

Permalink
concurrent state migration
Browse files Browse the repository at this point in the history
  • Loading branch information
creativcoder committed Jun 3, 2021
1 parent 378e23b commit de1c56f
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 80 deletions.
34 changes: 19 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions vm/state_migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ futures = "0.3.14"
state_tree = { path="../state_tree" }
async-std = "1.9.0"
actor_interface = {path = "../actor_interface"}
crossbeam-utils = "0.8.4"
crossbeam-channel = "0.5.1"
rayon = "1.5"
log = "0.4.8"
num_cpus = "1.13.0"
18 changes: 10 additions & 8 deletions vm/state_migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
//! Each network upgrade / state migration code lives in their own module.
use address::Address;
use async_std::sync::Arc;
use cid::Cid;
use clock::ChainEpoch;
use ipld_blockstore::BlockStore;
use std::error::Error as StdError;
use std::rc::Rc;
use rayon::ThreadPoolBuildError;
use vm::{ActorState, TokenAmount};
use async_std::sync::Arc;

pub mod nv12;

Expand All @@ -38,6 +37,8 @@ pub enum MigrationError {
StateTreeCreation(String),
#[error("Incomplete migration specification with {0} code CIDs")]
IncompleteMigrationSpec(usize),
#[error("Thread pool creation failed: {0}")]
ThreadPoolCreation(ThreadPoolBuildError),
#[error("Migration failed")]
Other,
}
Expand All @@ -58,7 +59,7 @@ pub(crate) struct MigrationOutput {
new_head: Cid,
}

pub(crate) trait ActorMigration<BS: BlockStore> {
pub(crate) trait ActorMigration<BS: BlockStore + Send + Sync> {
fn migrate_state(
&self,
store: Arc<BS>,
Expand All @@ -72,7 +73,7 @@ struct MigrationJob<BS: BlockStore> {
actor_migration: Arc<dyn ActorMigration<BS>>,
}

impl<BS: BlockStore> MigrationJob<BS> {
impl<BS: BlockStore + Send + Sync> MigrationJob<BS> {
fn run(&self, store: Arc<BS>, prior_epoch: ChainEpoch) -> MigrationResult<MigrationJobOutput> {
let result = self
.actor_migration
Expand Down Expand Up @@ -114,14 +115,15 @@ struct MigrationJobOutput {
actor_state: ActorState,
}

fn nil_migrator_v4<BS: BlockStore>(cid: Cid) -> Arc<dyn ActorMigration<BS>> {
fn nil_migrator_v4<BS: BlockStore + Send + Sync>(
cid: Cid,
) -> Arc<dyn ActorMigration<BS> + Send + Sync> {
Arc::new(NilMigrator(cid))
}

// Migrator which preserves the head CID and provides a fixed result code CID.
pub(crate) struct NilMigrator(Cid);

impl<'db, BS: BlockStore> ActorMigration<BS> for NilMigrator {
impl<BS: BlockStore + Send + Sync> ActorMigration<BS> for NilMigrator {
fn migrate_state(
&self,
_store: Arc<BS>,
Expand Down
13 changes: 6 additions & 7 deletions vm/state_migration/src/nv12/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ use crate::{ActorMigration, ActorMigrationInput};
use crate::{MigrationError, MigrationOutput, MigrationResult};
use actor_interface::actorv3::miner::State as V3State;
use actor_interface::actorv4::miner::State as V4State;
use async_std::sync::Arc;
use cid::Cid;
use cid::Code::Blake2b256;
use ipld_blockstore::BlockStore;
use std::io::{Error, ErrorKind};
use std::rc::Rc;
use async_std::sync::Arc;

pub(crate) struct MinerMigrator(Cid);

pub(crate) fn miner_migrator_v4<'db, BS: BlockStore>(cid: Cid) -> Arc<dyn ActorMigration<BS>> {
pub(crate) fn miner_migrator_v4<'db, BS: BlockStore + Send + Sync>(
cid: Cid,
) -> Arc<dyn ActorMigration<BS> + Send + Sync> {
Arc::new(MinerMigrator(cid))
}

// each actor's state migration is read from blockstore, change state tree, and write back to the blocstore.

impl<'db, BS: BlockStore> ActorMigration<BS> for MinerMigrator {
// each actor's state migration is read from blockstore, changes state tree, and writes back to the blocstore.
impl<BS: BlockStore + Send + Sync> ActorMigration<BS> for MinerMigrator {
fn migrate_state(
&self,
store: Arc<BS>,
Expand Down
104 changes: 54 additions & 50 deletions vm/state_migration/src/nv12/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,27 @@ pub mod miner;
use crate::nil_migrator_v4;
use crate::{ActorMigration, MigrationError, MigrationJob, MigrationResult};
use actor_interface::{actorv3, actorv4};
use async_std::task;
use async_std::sync::Arc;
use cid::Cid;
use clock::ChainEpoch;
use fil_types::StateTreeVersion;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use ipld_blockstore::BlockStore;
use miner::miner_migrator_v4;
use state_tree::StateTree;
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
use crate::MigrationJobOutput;

use async_std::sync::Mutex;
use async_std::sync::Arc;

type Migrator<BS> = Arc<dyn ActorMigration<BS>>;
type Migrator<BS> = Arc<dyn ActorMigration<BS> + Send + Sync>;

const ACTORS_COUNT: usize = 11;

pub fn migrate_state_tree<BS: BlockStore + Sync + Send>(
// Try to pass an Arc<BS> here.
pub fn migrate_state_tree<BS: BlockStore + Send + Sync>(
store: Arc<BS>,
actors_root_in: Cid,
prior_epoch: ChainEpoch,
) -> MigrationResult<Cid> {
let mut jobs = FuturesOrdered::new();
// TODO
// pass job_tx to each job instance's run method.
// iterate and collect on job_rx with block_on
let (job_tx, job_rx) = async_std::channel::unbounded::<MigrationJobOutput>();

// Maps prior version code CIDs to migration functions.
let mut migrations: HashMap<Cid, Migrator<BS>> =
HashMap::with_capacity(ACTORS_COUNT);
let mut migrations: HashMap<Cid, Migrator<BS>> = HashMap::with_capacity(ACTORS_COUNT);
migrations.insert(
*actorv3::ACCOUNT_ACTOR_CODE_ID,
nil_migrator_v4(*actorv4::ACCOUNT_ACTOR_CODE_ID),
Expand Down Expand Up @@ -98,43 +85,60 @@ pub fn migrate_state_tree<BS: BlockStore + Sync + Send>(
return Err(MigrationError::IncompleteMigrationSpec(migrations.len()));
}

// input actors state tree -
let actors_in = StateTree::new_from_root(&*store, &actors_root_in).unwrap();
let mut actors_out = StateTree::new(&*store, StateTreeVersion::V3)
.map_err(|e| MigrationError::StateTreeCreation(e.to_string()))?;

actors_in
.for_each(|addr, state| {
if deferred_code_ids.contains(&state.code) {
return Ok(());
let cpus = num_cpus::get();
let chan_size = 2;
log::info!("Using {} CPUs for migration and channel size of {}", cpus, chan_size);

let pool = rayon::ThreadPoolBuilder::new()
.thread_name(|id| format!("nv12 migration thread: {}", id))
.num_threads(cpus)
.build()
.map_err(|e| MigrationError::ThreadPoolCreation(e))?;

let (state_tx, state_rx) = crossbeam_channel::bounded(chan_size);
let (job_tx, job_rx) = crossbeam_channel::bounded(chan_size);

pool.scope(|s| {
let store_clone = store.clone();

s.spawn(move |_| {
actors_in.for_each(|addr, state| {
state_tx.send((addr, state.clone())).expect("failed sending actor state through channel");
Ok(())
}).expect("Failed iterating over actor state");
});

s.spawn(move |scope| {
while let Ok((addr, state)) = state_rx.recv() {
let job_tx = job_tx.clone();
let store_clone = store_clone.clone();
let migrator = migrations.get(&state.code).cloned().unwrap();
scope.spawn(move |_| {
let job = MigrationJob {
address: addr.clone(),
actor_state: state,
actor_migration: migrator,
};

let job_output = job.run(store_clone, prior_epoch).expect(&format!("failed executing job for address: {}", addr));

job_tx.send(job_output).expect(&format!("failed sending job output for address: {}", addr));
});
}

let next_input = MigrationJob {
address: addr,
actor_state: state.clone(),
actor_migration: migrations
.get(&state.code)
.cloned()
.ok_or(MigrationError::MigratorNotFound(state.code))?,
};

// TODO pass job_tx
let store_clone = store.clone();
jobs.push(async move { next_input.run(store_clone, prior_epoch) });

Ok(())
})
.map_err(|e| MigrationError::MigrationJobCreate(e.to_string()))?;

// task::spawn(async {
// while let Some(job_result) = jobs.next().await {
// let result = job_result?;
// actors_out
// .set_actor(&result.address, result.actor_state)
// .map_err(|e| MigrationError::SetActorState(e.to_string()))?;
// }

// Ok(())
// });
drop(job_tx);
});

while let Ok(job_output) = job_rx.recv() {
actors_out
.set_actor(&job_output.address, job_output.actor_state)
.expect(&format!("Failed setting new actor state at given address: {}", job_output.address));
}
});

let root_cid = actors_out
.flush()
Expand Down

0 comments on commit de1c56f

Please sign in to comment.