Skip to content

Commit

Permalink
feat: switch to fetching state step on flat storage creation (#8019)
Browse files Browse the repository at this point in the history
Next step in background flat storage creation: once we saved enough deltas, we can start fetching state for current final head. We update the flat storage test accordingly, fetching state step itself will be implemented in the next PR.

## Testing

* `test_flat_storage_creation`
* https://buildkite.com/nearprotocol/nearcore-flat-state/builds?branch=fs-deltas-switch
  • Loading branch information
Longarithm authored Nov 11, 2022
1 parent 4aacfe1 commit 7db3eac
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 55 deletions.
1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true
[dependencies]
actix.workspace = true
ansi_term.workspace = true
assert_matches.workspace = true
borsh.workspace = true
chrono.workspace = true
crossbeam-channel.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2167,7 +2167,7 @@ impl Chain {
}
});
} else {
match &self.flat_storage_creator {
match &mut self.flat_storage_creator {
Some(flat_storage_creator) => {
flat_storage_creator.update_status(shard_id, &self.store)?;
}
Expand Down
126 changes: 93 additions & 33 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,109 @@
use crate::{ChainStore, ChainStoreAccess, RuntimeAdapter};
#[cfg(feature = "protocol_feature_flat_state")]
use assert_matches::assert_matches;
use crossbeam_channel::{unbounded, Receiver, Sender};
use near_chain_primitives::Error;
use near_primitives::types::{BlockHeight, ShardId};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::store_helper;
use near_store::flat_state::{FlatStorageStateStatus, NUM_PARTS_IN_ONE_STEP};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
#[cfg(feature = "protocol_feature_flat_state")]
use tracing::debug;
use tracing::info;

/// If we launched a node with enabled flat storage but it doesn't have flat storage data on disk, we have to create it.
/// This struct is responsible for this process for the given shard.
/// See doc comment on [`FlatStorageStateStatus`] for the details of the process.
pub struct FlatStorageShardCreator {
pub status: FlatStorageStateStatus,
pub shard_id: ShardId,
#[allow(unused)]
shard_id: ShardId,
/// Height on top of which this struct was created.
#[allow(unused)]
start_height: BlockHeight,
#[allow(unused)]
runtime_adapter: Arc<dyn RuntimeAdapter>,
/// Tracks number of traversed state parts during a single step.
#[allow(unused)]
pub traversed_state_parts: Option<u64>,
fetched_state_parts: Option<u64>,
/// Used by threads which traverse state parts to tell that traversal is finished.
#[allow(unused)]
pub traversed_parts_sender: Sender<u64>,
fetched_parts_sender: Sender<u64>,
/// Used by main thread to update the number of traversed state parts.
#[allow(unused)]
pub traversed_parts_receiver: Receiver<u64>,
fetched_parts_receiver: Receiver<u64>,
}

impl FlatStorageShardCreator {
pub fn new(status: FlatStorageStateStatus, shard_id: ShardId) -> Self {
let (traversed_parts_sender, traversed_parts_receiver) = unbounded();
pub fn new(
shard_id: ShardId,
start_height: BlockHeight,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Self {
let (fetched_parts_sender, fetched_parts_receiver) = unbounded();
Self {
status,
shard_id,
traversed_state_parts: None,
traversed_parts_sender,
traversed_parts_receiver,
start_height,
runtime_adapter,
fetched_state_parts: None,
fetched_parts_sender,
fetched_parts_receiver,
}
}

#[cfg(feature = "protocol_feature_flat_state")]
pub(crate) fn update_status(&mut self, chain_store: &ChainStore) -> Result<(), Error> {
let current_status =
store_helper::get_flat_storage_state_status(chain_store.store(), self.shard_id);
match current_status {
FlatStorageStateStatus::SavingDeltas => {
let final_head = chain_store.final_head()?;
let shard_id = self.shard_id;

if final_head.height > self.start_height {
// If it holds, deltas for all blocks after final head are saved to disk, because they have bigger
// heights than one on which we launched a node. Check that it is true:
for height in final_head.height + 1..=chain_store.head()?.height {
for (_, hashes) in
chain_store.get_all_block_hashes_by_height(height)?.iter()
{
for hash in hashes {
debug!(target: "chain", %shard_id, %height, %hash, "Checking delta existence");
assert_matches!(
store_helper::get_delta(
chain_store.store(),
shard_id,
hash.clone(),
),
Ok(Some(_))
);
}
}
}

// We continue saving deltas, and also start fetching state.
let block_hash = final_head.last_block_hash;
let mut store_update = chain_store.store().store_update();
store_helper::set_flat_head(&mut store_update, shard_id, &block_hash);
store_helper::set_fetching_state_step(&mut store_update, shard_id, 0u64);
store_update.commit()?;
}
Ok(())
}
FlatStorageStateStatus::FetchingState((_block_hash, _fetching_state_step)) => {
// TODO: spawn threads and collect results
Ok(())
}
_ => {
panic!("Status {:?} is not supported yet", current_status);
}
}
}
}

/// Creates flat storages for all shards.
pub struct FlatStorageCreator {
/// Height on top of which this struct was created.
pub start_height: BlockHeight,
pub shard_creators: Vec<Arc<Mutex<FlatStorageShardCreator>>>,
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_creators: Vec<FlatStorageShardCreator>,
/// Used to spawn threads for traversing state parts.
pub pool: rayon::ThreadPool,
}
Expand All @@ -51,7 +113,7 @@ impl FlatStorageCreator {
let chain_head = chain_store.head().unwrap();
let num_shards = runtime_adapter.num_shards(&chain_head.epoch_id).unwrap();
let start_height = chain_head.height;
let mut shard_creators: Vec<Arc<Mutex<FlatStorageShardCreator>>> = vec![];
let mut shard_creators: Vec<FlatStorageShardCreator> = vec![];
let mut creation_needed = false;
for shard_id in 0..num_shards {
let status = runtime_adapter.try_create_flat_storage_state_for_shard(
Expand All @@ -66,15 +128,16 @@ impl FlatStorageCreator {
creation_needed = true;
}
}
shard_creators
.push(Arc::new(Mutex::new(FlatStorageShardCreator::new(status, shard_id))));
shard_creators.push(FlatStorageShardCreator::new(
shard_id,
start_height,
runtime_adapter.clone(),
));
}

if creation_needed {
Some(Self {
start_height,
shard_creators,
runtime_adapter: runtime_adapter.clone(),
pool: rayon::ThreadPoolBuilder::new()
.num_threads(NUM_PARTS_IN_ONE_STEP as usize)
.build()
Expand All @@ -85,23 +148,20 @@ impl FlatStorageCreator {
}
}

pub fn update_status(&self, shard_id: ShardId, _chain_store: &ChainStore) -> Result<(), Error> {
pub fn update_status(
&mut self,
shard_id: ShardId,
#[allow(unused_variables)] chain_store: &ChainStore,
) -> Result<(), Error> {
if shard_id as usize >= self.shard_creators.len() {
// We can request update for not supported shard if resharding happens. We don't support it yet, so we just
// return Ok.
return Ok(());
}

let guard = self.shard_creators[shard_id as usize].lock().unwrap();
match guard.status.clone() {
FlatStorageStateStatus::SavingDeltas => {
// Once final head height > start height, we can switch to next step.
// Then, ChainStore is used to get state roots, block infos and flat storage creation in the end.
Ok(())
}
_ => {
panic!("Status {:?} is not supported yet", guard.status);
}
}
#[cfg(feature = "protocol_feature_flat_state")]
self.shard_creators[shard_id as usize].update_status(chain_store)?;

Ok(())
}
}
61 changes: 53 additions & 8 deletions core/store/src/flat_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ pub mod store_helper {
use near_primitives::types::ShardId;
use std::sync::Arc;

pub const FETCHING_STATE_STEP_KEY_PREFIX: &[u8; 4] = b"STEP";

pub fn get_delta(
store: &Store,
shard_id: ShardId,
Expand Down Expand Up @@ -525,11 +527,7 @@ pub mod store_helper {
.expect("Error reading flat head from storage")
}

pub(crate) fn set_flat_head(
store_update: &mut StoreUpdate,
shard_id: ShardId,
val: &CryptoHash,
) {
pub fn set_flat_head(store_update: &mut StoreUpdate, shard_id: ShardId, val: &CryptoHash) {
store_update
.set_ser(crate::DBCol::FlatStateMisc, &shard_id.try_to_vec().unwrap(), val)
.expect("Error writing flat head from storage")
Expand Down Expand Up @@ -564,15 +562,62 @@ pub mod store_helper {
}
}

fn fetching_state_step_key(shard_id: ShardId) -> Vec<u8> {
let mut fetching_state_step_key = FETCHING_STATE_STEP_KEY_PREFIX.to_vec();
fetching_state_step_key.extend_from_slice(&shard_id.try_to_vec().unwrap());
fetching_state_step_key
}

fn get_fetching_state_step(store: &Store, shard_id: ShardId) -> Option<u64> {
store.get_ser(crate::DBCol::FlatStateMisc, &fetching_state_step_key(shard_id)).expect(
format!("Error reading fetching step for flat state for shard {shard_id}").as_str(),
)
}

pub fn set_fetching_state_step(store_update: &mut StoreUpdate, shard_id: ShardId, value: u64) {
store_update
.set_ser(crate::DBCol::FlatStateMisc, &fetching_state_step_key(shard_id), &value)
.expect(
format!("Error setting fetching step for shard {shard_id} to {value}").as_str(),
);
}

fn get_catchup_status(store: &Store, shard_id: ShardId) -> bool {
let mut catchup_status_key = FETCHING_STATE_STEP_KEY_PREFIX.to_vec();
catchup_status_key.extend_from_slice(&shard_id.try_to_vec().unwrap());
let status: Option<bool> =
store.get_ser(crate::DBCol::FlatStateMisc, &catchup_status_key).expect(
format!("Error reading catchup status for flat state for shard {shard_id}")
.as_str(),
);
match status {
None => false,
Some(status) => {
assert!(
status,
"Catchup status for flat state for shard {} must be true if stored",
shard_id
);
true
}
}
}

pub fn get_flat_storage_state_status(
store: &Store,
shard_id: ShardId,
) -> FlatStorageStateStatus {
// TODO: replace this placeholder with reading flat storage data and setting correct status. ChainStore can be
// used to get flat storage heads and block heights.
match get_flat_head(store, shard_id) {
None => FlatStorageStateStatus::SavingDeltas,
Some(_) => FlatStorageStateStatus::Ready,
Some(block_hash) => {
if let Some(fetching_state_step) = get_fetching_state_step(store, shard_id) {
FlatStorageStateStatus::FetchingState((block_hash, fetching_state_step))
} else if get_catchup_status(store, shard_id) {
FlatStorageStateStatus::CatchingUp
} else {
FlatStorageStateStatus::Ready
}
}
}
}
}
Expand Down
37 changes: 24 additions & 13 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,39 @@ fn test_flat_storage_creation() {
&genesis,
))];
let mut env = TestEnv::builder(chain_genesis).runtime_adapters(runtimes.clone()).build();
for i in 4..7 {
for i in 4..6 {
env.produce_block(0, i);
}

if cfg!(feature = "protocol_feature_flat_state") {
// At first, flat storage state should start saving deltas. Deltas for all newly processed blocks should be saved to
// disk.
assert_eq!(
store_helper::get_flat_storage_state_status(&store, 0),
FlatStorageStateStatus::SavingDeltas
);
for i in 4..7 {
let block_hash = env.clients[0].chain.get_block_hash_by_height(i).unwrap();
assert_matches!(store_helper::get_delta(&store, 0, block_hash), Ok(Some(_)));
}
} else {
if !cfg!(feature = "protocol_feature_flat_state") {
assert_eq!(
store_helper::get_flat_storage_state_status(&store, 0),
FlatStorageStateStatus::DontCreate
);
assert_eq!(store_helper::get_flat_head(&store, 0), None);
// Stop the test here.
return;
}

// At first, flat storage state should start saving deltas. Deltas for all newly processed blocks should be saved to
// disk.
assert_eq!(
store_helper::get_flat_storage_state_status(&store, 0),
FlatStorageStateStatus::SavingDeltas
);
for i in 4..6 {
let block_hash = env.clients[0].chain.get_block_hash_by_height(i).unwrap();
assert_matches!(store_helper::get_delta(&store, 0, block_hash), Ok(Some(_)));
}

// When final head height becomes greater than height on which node started, we must start fetching the state.
// We started the node from height 3, and now final head should move to height 4.
env.produce_block(0, 6);
let final_block_hash = env.clients[0].chain.get_block_hash_by_height(4).unwrap();
assert_eq!(
store_helper::get_flat_storage_state_status(&store, 0),
FlatStorageStateStatus::FetchingState((final_block_hash, 0))
);

// TODO: support next statuses once their logic is implemented.
}

0 comments on commit 7db3eac

Please sign in to comment.