Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap cached state in async mutex to avoid duplicate state calculation #785

Merged
merged 8 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 55 additions & 32 deletions blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::channel::oneshot;
use futures::stream::{FuturesUnordered, StreamExt};
use interpreter::{resolve_to_key_addr, ApplyRet, BlockMessages, ChainRand, Rand, VM};
use ipld_amt::Amt;
use log::{trace, warn};
use log::{debug, trace, warn};
use message::{message_receipt, unsigned_message};
use message::{ChainMessage, Message, MessageReceipt, UnsignedMessage};
use num_bigint::{bigint_ser, BigInt};
Expand Down Expand Up @@ -60,7 +60,11 @@ pub struct MarketBalance {

pub struct StateManager<DB> {
bs: Arc<DB>,
cache: RwLock<HashMap<TipsetKeys, CidPair>>,

/// This is a cache which indexes tipsets to their calculated state.
/// The calculated state is wrapped in a mutex to avoid duplicate computation
/// of the state/ receipt root.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove space after "/"

cache: RwLock<HashMap<TipsetKeys, Arc<RwLock<Option<CidPair>>>>>,
subscriber: Option<Subscriber<HeadChange>>,
}

Expand Down Expand Up @@ -186,11 +190,11 @@ where
rand: &R,
base_fee: BigInt,
callback: Option<CB>,
) -> Result<(Cid, Cid), Box<dyn StdError>>
) -> Result<CidPair, Box<dyn StdError>>
where
R: Rand,
V: ProofVerifier,
CB: FnMut(Cid, &ChainMessage, ApplyRet) -> Result<(), String>,
CB: FnMut(&Cid, &ChainMessage, &ApplyRet) -> Result<(), String>,
{
let mut buf_store = BufferedBlockStore::new(self.bs.as_ref());
// TODO change from statically using devnet params when needed
Expand Down Expand Up @@ -218,18 +222,42 @@ where
Ok((state_root, rect_root))
}

pub async fn tipset_state<V>(&self, tipset: &Tipset) -> Result<(Cid, Cid), Box<dyn StdError>>
pub async fn tipset_state<V>(&self, tipset: &Tipset) -> Result<CidPair, Box<dyn StdError>>
where
V: ProofVerifier,
{
span!("tipset_state", {
trace!("tipset {:?}", tipset.cids());
// if exists in cache return
if let Some(cid_pair) = self.cache.read().await.get(&tipset.key()) {
return Ok(cid_pair.clone());
// Get entry in cache, if it exists.
// Arc is cloned here to avoid holding the entire cache lock until function ends.
// (tasks should be able to compute different tipset state's in parallel)
//
// In the case of task `A` computing the same tipset as task `B`, `A` will hold the
// mutex until the value is updated, which task `B` will await.
//
// If two tasks are computing different tipset states, they will only block computation
// when accessing/ initializing the entry in cache, not during the whole tipset calc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove space after "/"

let cache_entry: Arc<_> = self
.cache
.write()
.await
.entry(tipset.key().clone())
.or_default()
// Clone Arc to drop lock of cache
.clone();

// Try to lock cache entry to ensure task is first to compute state.
// If another task has the lock, it will overwrite the state before releasing lock.
let mut entry_lock = cache_entry.write().await;
if let Some(ref entry) = *entry_lock {
// Entry had successfully populated state, return Cid and drop lock
trace!("hit cache for tipset {:?}", tipset.cids());
return Ok(entry.clone());
}

if tipset.epoch() == 0 {
// Entry does not have state computed yet, this task will fill entry if successful.
debug!("calculating tipset state {:?}", tipset.cids());

let cid_pair = if tipset.epoch() == 0 {
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
Expand All @@ -238,25 +266,20 @@ where
.blocks()
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;
let cid_pair = (

(
tipset.parent_state().clone(),
message_receipts.message_receipts().clone(),
);
self.cache
.write()
.await
.insert(tipset.key().clone(), cid_pair.clone());
return Ok(cid_pair);
}
)
} else {
let block_headers = tipset.blocks();
// generic constants are not implemented yet this is a lowcost method for now
let no_func = None::<fn(&Cid, &ChainMessage, &ApplyRet) -> Result<(), String>>;
self.compute_tipset_state::<V, _>(&block_headers, no_func)?
};

let block_headers = tipset.blocks();
// generic constants are not implemented yet this is a lowcost method for now
let no_func = None::<fn(Cid, &ChainMessage, ApplyRet) -> Result<(), String>>;
let cid_pair = self.compute_tipset_state::<V, _>(&block_headers, no_func)?;
self.cache
.write()
.await
.insert(tipset.key().clone(), cid_pair.clone());
// Fill entry with calculated cid pair
*entry_lock = Some(cid_pair.clone());
Ok(cid_pair)
})
}
Expand Down Expand Up @@ -390,16 +413,16 @@ where
{
let mut outm: Option<UnsignedMessage> = None;
let mut outr: Option<ApplyRet> = None;
let callback = |cid: Cid, unsigned: &ChainMessage, apply_ret: ApplyRet| {
if cid == mcid.clone() {
let callback = |cid: &Cid, unsigned: &ChainMessage, apply_ret: &ApplyRet| {
if cid == mcid {
outm = Some(unsigned.message().clone());
outr = Some(apply_ret);
outr = Some(apply_ret.clone());
return Err("halt".to_string());
}

Ok(())
};
let result: Result<(Cid, Cid), Box<dyn StdError>> =
let result: Result<CidPair, Box<dyn StdError>> =
self.compute_tipset_state::<V, _>(ts.blocks(), Some(callback));

if let Err(error_message) = result {
Expand All @@ -420,10 +443,10 @@ where
&self,
block_headers: &[BlockHeader],
callback: Option<CB>,
) -> Result<(Cid, Cid), Box<dyn StdError>>
) -> Result<CidPair, Box<dyn StdError>>
where
V: ProofVerifier,
CB: FnMut(Cid, &ChainMessage, ApplyRet) -> Result<(), String>,
CB: FnMut(&Cid, &ChainMessage, &ApplyRet) -> Result<(), String>,
{
span!("compute_tipset_state", {
let first_block = block_headers
Expand Down
4 changes: 2 additions & 2 deletions tests/conformance_tests/src/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ pub fn execute_tipset(
exec_epoch,
&TestRand,
tipset.basefee.to_bigint().unwrap_or_default(),
Some(|_, msg: &ChainMessage, ret| {
Some(|_: &Cid, msg: &ChainMessage, ret: &ApplyRet| {
_applied_messages.push(msg.clone());
applied_results.push(ret);
applied_results.push(ret.clone());
Ok(())
}),
)?;
Expand Down
10 changes: 5 additions & 5 deletions vm/interpreter/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ where
fn run_cron(
&mut self,
epoch: ChainEpoch,
callback: Option<&mut impl FnMut(Cid, &ChainMessage, ApplyRet) -> Result<(), String>>,
callback: Option<&mut impl FnMut(&Cid, &ChainMessage, &ApplyRet) -> Result<(), String>>,
) -> Result<(), Box<dyn StdError>> {
let cron_msg = UnsignedMessage {
from: *SYSTEM_ACTOR_ADDR,
Expand All @@ -141,7 +141,7 @@ where
}

if let Some(callback) = callback {
callback(cron_msg.cid()?, &ChainMessage::Unsigned(cron_msg), ret)?;
callback(&cron_msg.cid()?, &ChainMessage::Unsigned(cron_msg), &ret)?;
}
Ok(())
}
Expand All @@ -153,7 +153,7 @@ where
messages: &[BlockMessages],
parent_epoch: ChainEpoch,
epoch: ChainEpoch,
mut callback: Option<impl FnMut(Cid, &ChainMessage, ApplyRet) -> Result<(), String>>,
mut callback: Option<impl FnMut(&Cid, &ChainMessage, &ApplyRet) -> Result<(), String>>,
) -> Result<Vec<MessageReceipt>, Box<dyn StdError>> {
let mut receipts = Vec::new();
let mut processed = HashSet::<Cid>::default();
Expand All @@ -177,7 +177,7 @@ where
}
let ret = self.apply_message(msg)?;
if let Some(cb) = &mut callback {
cb(msg.cid()?, msg, ret.clone())?;
cb(&cid, msg, &ret)?;
}

// Update totals
Expand Down Expand Up @@ -235,7 +235,7 @@ where
}

if let Some(callback) = &mut callback {
callback(rew_msg.cid()?, &ChainMessage::Unsigned(rew_msg), ret)?;
callback(&rew_msg.cid()?, &ChainMessage::Unsigned(rew_msg), &ret)?;
}
}

Expand Down