Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: separate ChainStore and ChainIndex #3216

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 4 additions & 37 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{num::NonZeroUsize, ops::DerefMut, path::Path, sync::Arc, time::SystemTime};
use std::{ops::DerefMut, path::Path, sync::Arc, time::SystemTime};

use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use crate::blocks::{BlockHeader, Tipset, TipsetKeys, TxMeta};
use crate::interpreter::BlockMessages;
use crate::ipld::{walk_snapshot, WALK_SNAPSHOT_PROGRESS_EXPORT};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
use crate::metrics;
use crate::networks::{ChainConfig, NetworkChain};
use crate::shim::clock::ChainEpoch;
use crate::shim::{
Expand All @@ -35,8 +34,6 @@ use fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_car::CarHeader;
use fvm_ipld_encoding::CborStore;
use lru::LruCache;
use nonzero_ext::nonzero;
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::{
Expand All @@ -55,8 +52,6 @@ use crate::chain::Scale;
// A cap on the size of the future_sink
const SINK_CAP: usize = 200;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(8192usize);

/// Disambiguate the type to signify that we are expecting a delta and not an actual epoch/height
/// while maintaining the same type.
pub type ChainEpochDelta = ChainEpoch;
Expand Down Expand Up @@ -132,7 +127,7 @@ where
chain_data_root: &Path,
) -> Result<Self> {
let (publisher, _) = broadcast::channel(SINK_CAP);
let ts_cache = Arc::new(Mutex::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE)));
let chain_index = ChainIndex::new(Arc::clone(&db));
let file_backed_genesis = Mutex::new(FileBacked::new(
*genesis_block_header.cid(),
chain_data_root.join("GENESIS"),
Expand All @@ -143,7 +138,7 @@ where
|| TipsetKeys::new(vec![*genesis_block_header.cid()]),
None,
)?;
let is_valid = tipset_from_keys(&ts_cache, &db, head_store.inner()).is_ok();
let is_valid = chain_index.load_tipset(head_store.inner()).is_ok();
if !is_valid {
// If the stored HEAD is invalid, reset it to the genesis tipset.
head_store.set_inner(TipsetKeys::new(vec![*genesis_block_header.cid()]))?;
Expand All @@ -159,7 +154,7 @@ where

let cs = Self {
publisher,
chain_index: ChainIndex::new(ts_cache, Arc::clone(&db)),
chain_index,
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config),
db,
file_backed_genesis,
Expand Down Expand Up @@ -670,34 +665,6 @@ where
}
}

pub(in crate::chain) type TipsetCache = Mutex<LruCache<TipsetKeys, Arc<Tipset>>>;

/// Loads a tipset from memory given the tipset keys and cache.
pub(in crate::chain) fn tipset_from_keys<BS>(
cache: &TipsetCache,
store: &BS,
tsk: &TipsetKeys,
) -> Result<Arc<Tipset>, Error>
where
BS: Blockstore,
{
if let Some(ts) = cache.lock().get(tsk) {
metrics::LRU_CACHE_HIT
.with_label_values(&[metrics::values::TIPSET])
.inc();
return Ok(ts.clone());
}

let ts = Tipset::load(store, tsk)?.ok_or(Error::NotFound(String::from("Key for header")))?;
// construct new Tipset to return
let ts = Arc::new(ts);
cache.lock().put(tsk.clone(), ts.clone());
metrics::LRU_CACHE_MISS
.with_label_values(&[metrics::values::TIPSET])
.inc();
Ok(ts)
}

/// Returns a Tuple of BLS messages of type `UnsignedMessage` and SECP messages
/// of type `SignedMessage`
pub fn block_messages<DB>(
Expand Down
32 changes: 26 additions & 6 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use nonzero_ext::nonzero;
use parking_lot::Mutex;
use tracing::info;

use crate::chain::{tipset_from_keys, Error, TipsetCache};
use crate::chain::Error;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(8192usize);

const DEFAULT_CHAIN_INDEX_CACHE_SIZE: NonZeroUsize = nonzero!(32usize << 10);

Expand Down Expand Up @@ -98,37 +100,55 @@ pub mod checkpoint_tipsets {
/// `Lookback` entry to cache in the `ChainIndex`. Stores all relevant info when
/// doing `lookbacks`.
#[derive(Clone, PartialEq, Debug)]
pub(in crate::chain) struct LookbackEntry {
struct LookbackEntry {
tipset: Arc<Tipset>,
parent_height: ChainEpoch,
target_height: ChainEpoch,
target: TipsetKeys,
}

type TipsetCache = Mutex<LruCache<TipsetKeys, Arc<Tipset>>>;

/// Keeps look-back tipsets in cache at a given interval `skip_length` and can
/// be used to look-back at the chain to retrieve an old tipset.
pub(in crate::chain) struct ChainIndex<DB> {
/// Cache of look-back entries to speed up lookup.
skip_cache: Mutex<LruCache<TipsetKeys, Arc<LookbackEntry>>>,

/// `Arc` reference tipset cache.
ts_cache: Arc<TipsetCache>,
ts_cache: TipsetCache,

/// `Blockstore` pointer needed to load tipsets from cold storage.
db: Arc<DB>,
}

impl<DB: Blockstore> ChainIndex<DB> {
pub(in crate::chain) fn new(ts_cache: Arc<TipsetCache>, db: Arc<DB>) -> Self {
pub(in crate::chain) fn new(db: Arc<DB>) -> Self {
let ts_cache = Mutex::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE));
Self {
skip_cache: Mutex::new(LruCache::new(DEFAULT_CHAIN_INDEX_CACHE_SIZE)),
ts_cache,
db,
}
}

/// Loads a tipset from memory given the tipset keys and cache.
pub fn load_tipset(&self, tsk: &TipsetKeys) -> Result<Arc<Tipset>, Error> {
tipset_from_keys(self.ts_cache.as_ref(), &self.db, tsk)
if let Some(ts) = self.ts_cache.lock().get(tsk) {
metrics::LRU_CACHE_HIT
.with_label_values(&[metrics::values::TIPSET])
.inc();
return Ok(ts.clone());
}

let ts = Arc::new(
Tipset::load(&self.db, tsk)?.ok_or(Error::NotFound(String::from("Key for header")))?,
);
self.ts_cache.lock().put(tsk.clone(), ts.clone());
metrics::LRU_CACHE_MISS
.with_label_values(&[metrics::values::TIPSET])
.inc();
Ok(ts)
}

/// Loads tipset at `to` [`ChainEpoch`], loading from sparse cache and/or
Expand Down Expand Up @@ -163,7 +183,7 @@ impl<DB: Blockstore> ChainIndex<DB> {
if let Some(genesis_tipset_keys) =
checkpoint_tipsets::genesis_from_checkpoint_tipset(lbe.tipset.key())
{
let tipset = tipset_from_keys(&self.ts_cache, &self.db, &genesis_tipset_keys)?;
let tipset = self.load_tipset(&genesis_tipset_keys)?;
info!(
"Resolving genesis using checkpoint tipset at height: {}",
lbe.tipset.epoch()
Expand Down