Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement chain index #855

Merged
merged 8 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions blockchain/beacon/src/drand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ impl Beacon for DrandBeacon {
let sig_match = bls_signatures::verify(&sig, &[digest], &[self.pub_key.key()]);

// Cache the result
if sig_match && !self.local_cache.read().await.contains_key(&curr.round()) {
let contains_curr = self.local_cache.read().await.contains_key(&curr.round());
if sig_match && !contains_curr {
self.local_cache
.write()
.await
Expand All @@ -153,8 +154,9 @@ impl Beacon for DrandBeacon {
}

async fn entry(&self, round: u64) -> Result<BeaconEntry, Box<dyn error::Error>> {
match self.local_cache.read().await.get(&round) {
Some(cached_entry) => Ok(cached_entry.clone()),
let cached: Option<BeaconEntry> = self.local_cache.read().await.get(&round).cloned();
match cached {
Some(cached_entry) => Ok(cached_entry),
None => {
let url = format!("{}/public/{}", self.url, round);
let resp: BeaconEntryJson = surf::get(&url).recv_json().await?;
Expand Down
148 changes: 80 additions & 68 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{Error, TipIndex};
use super::{ChainIndex, Error};
use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR};
use address::Address;
use async_std::sync::RwLock;
Expand Down Expand Up @@ -95,22 +95,24 @@ pub struct ChainStore<DB> {
/// Tipset at the head of the best-known chain.
heaviest: RwLock<Option<Arc<Tipset>>>,

ts_cache: RwLock<LruCache<TipsetKeys, Arc<Tipset>>>,
/// Caches loaded tipsets for fast retrieval.
ts_cache: Arc<TipsetCache>,

/// tip_index tracks tipsets by epoch/parentset for use by expected consensus.
tip_index: TipIndex,
/// Used as a cache for tipset lookbacks.
chain_index: ChainIndex<DB>,
}

impl<DB> ChainStore<DB>
where
DB: BlockStore + Send + Sync + 'static,
{
pub fn new(db: Arc<DB>) -> Self {
let ts_cache = Arc::new(RwLock::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE)));
let cs = Self {
db,
publisher: RwLock::new(Publisher::new(SINK_CAP)),
tip_index: TipIndex::default(),
ts_cache: RwLock::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE)),
chain_index: ChainIndex::new(ts_cache.clone(), db.clone()),
db,
ts_cache,
heaviest: Default::default(),
};

Expand Down Expand Up @@ -159,10 +161,7 @@ where
/// Loads heaviest tipset from datastore and sets as heaviest in chainstore
pub async fn load_heaviest_tipset(&self) -> Result<(), Error> {
let heaviest_ts = match self.db.read(HEAD_KEY)? {
Some(bz) => {
let keys: Vec<Cid> = from_slice(&bz)?;
self.tipset_from_keys(&TipsetKeys::new(keys)).await?
}
Some(bz) => self.tipset_from_keys(&from_slice(&bz)?).await?,
None => {
warn!("No previous chain state found");
return Err(Error::Other("No chain state found".to_owned()));
Expand Down Expand Up @@ -190,10 +189,6 @@ where
self.heaviest.read().await.clone()
}

pub fn tip_index(&self) -> &TipIndex {
&self.tip_index
}

pub fn publisher(&self) -> &RwLock<Publisher<HeadChange>> {
&self.publisher
}
Expand All @@ -210,33 +205,22 @@ where

/// Returns Tipset from key-value store from provided cids
pub async fn tipset_from_keys(&self, tsk: &TipsetKeys) -> Result<Arc<Tipset>, Error> {
if let Some(ts) = self.ts_cache.write().await.get(tsk) {
return Ok(ts.clone());
}

let block_headers: Vec<BlockHeader> = tsk
.cids()
.par_iter()
.map(|c| {
self.db
.get(c)
.map_err(|e| Error::Other(e.to_string()))?
.ok_or_else(|| Error::NotFound("Key for header"))
})
.collect::<Result<_, Error>>()?;

// construct new Tipset to return
let ts = Arc::new(Tipset::new(block_headers)?);
self.ts_cache.write().await.put(tsk.clone(), ts.clone());
Ok(ts)
tipset_from_keys(&self.ts_cache, self.blockstore(), tsk).await
}

/// Determines if provided tipset is heavier than existing known heaviest tipset
async fn update_heaviest(&self, ts: &Tipset) -> Result<(), Error> {
match self.heaviest.read().await.as_ref() {
// Calculate heaviest weight before matching to avoid deadlock with mutex
let heaviest_weight = self
.heaviest
.read()
.await
.as_ref()
.map(|ts| weight(self.db.as_ref(), ts.as_ref()));
match heaviest_weight {
Some(heaviest) => {
let new_weight = weight(self.blockstore(), ts)?;
let curr_weight = weight(self.blockstore(), &heaviest)?;
let curr_weight = heaviest?;
if new_weight > curr_weight {
// TODO potentially need to deal with re-orgs here
info!("New heaviest tipset");
Expand Down Expand Up @@ -269,20 +253,19 @@ where
/// Returns `None` if the tipset is also the lookback tipset.
pub async fn get_lookback_tipset_for_round(
&self,
ts: &Tipset,
ts: Arc<Tipset>,
round: ChainEpoch,
) -> Result<Option<Arc<Tipset>>, Error> {
) -> Result<Arc<Tipset>, Error> {
let lbr = if round > WINNING_POST_SECTOR_SET_LOOKBACK {
round - WINNING_POST_SECTOR_SET_LOOKBACK
} else {
0
};

if lbr > ts.epoch() {
return Ok(None);
return Ok(ts);
}

// TODO would be better to get tipset with ChainStore cache.
self.tipset_by_height(lbr, ts, true).await
}

Expand All @@ -295,34 +278,38 @@ where
pub async fn tipset_by_height(
&self,
height: ChainEpoch,
ts: &Tipset,
ts: Arc<Tipset>,
prev: bool,
) -> Result<Option<Arc<Tipset>>, Error> {
) -> Result<Arc<Tipset>, Error> {
if height > ts.epoch() {
return Err(Error::Other(
"searching for tipset that has a height less than starting point".to_owned(),
));
}
if height == ts.epoch() {
return Ok(None);
return Ok(ts.clone());
}
let mut ts_temp: Option<Arc<Tipset>> = None;
loop {
let pts = if let Some(temp) = &ts_temp {
self.tipset_from_keys(temp.parents()).await?
} else {
self.tipset_from_keys(ts.parents()).await?
};
if height > pts.epoch() {
if prev {
return Ok(Some(pts));
}
return Ok(ts_temp);
}
if height == pts.epoch() {
return Ok(Some(pts));
}
ts_temp = Some(pts);

let mut lbts = self
.chain_index
.get_tipset_by_height(ts.clone(), height)
.await?;

if lbts.epoch() < height {
log::warn!(
"chain index returned the wrong tipset at height {}, using slow retrieval",
height
);
lbts = self
.chain_index
.get_tipset_by_height_without_cache(ts, height)
.await?;
}

if lbts.epoch() == height || !prev {
Ok(lbts)
} else {
self.tipset_from_keys(lbts.parents()).await
}
}

Expand All @@ -343,10 +330,7 @@ where

let search_height = if round < 0 { 0 } else { round };

let rand_ts = self
.tipset_by_height(search_height, &ts, true)
.await?
.unwrap_or(ts);
let rand_ts = self.tipset_by_height(search_height, ts, true).await?;

draw_randomness(
rand_ts
Expand Down Expand Up @@ -377,10 +361,7 @@ where

let search_height = if round < 0 { 0 } else { round };

let rand_ts = self
.tipset_by_height(search_height, &ts, true)
.await?
.unwrap_or(ts);
let rand_ts = self.tipset_by_height(search_height, ts, true).await?;

let be = self.latest_beacon_entry(&rand_ts).await?;

Expand Down Expand Up @@ -515,6 +496,37 @@ where
}
}

pub(crate) type TipsetCache = RwLock<LruCache<TipsetKeys, Arc<Tipset>>>;

pub(crate) async fn tipset_from_keys<BS>(
cache: &TipsetCache,
store: &BS,
tsk: &TipsetKeys,
) -> Result<Arc<Tipset>, Error>
where
BS: BlockStore + Send + Sync + 'static,
{
if let Some(ts) = cache.write().await.get(tsk) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use cache.read() here instead of getting a write lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's a least recently used (LRU) cache, and it needs to update the position of the entry on read to function correctly, which requires mutable access. There is a method called peek on this cache, which only requires immutable access, but that defeats the purpose of this cache.

Yes this limits the concurrent reads, but the lock is kept for a very short amount of time and this means that frequently accessed keys don't get evicted from the cache early

return Ok(ts.clone());
}

let block_headers: Vec<BlockHeader> = tsk
.cids()
.par_iter()
.map(|c| {
store
.get(c)
.map_err(|e| Error::Other(e.to_string()))?
.ok_or_else(|| Error::NotFound("Key for header"))
})
.collect::<Result<_, Error>>()?;

// construct new Tipset to return
let ts = Arc::new(Tipset::new(block_headers)?);
cache.write().await.put(tsk.clone(), ts.clone());
Ok(ts)
}

/// Helper to ensure consistent Cid -> db key translation.
fn block_validation_key(cid: &Cid) -> Vec<u8> {
let mut key = Vec::new();
Expand Down
Loading