From 04eb2c7850b2bf07afe7c44c16fdc7c7015c6ec3 Mon Sep 17 00:00:00 2001 From: Andrea Date: Mon, 14 Oct 2024 12:38:55 +0200 Subject: [PATCH] feat(resharding): flat storage resharding mvp (#12164) The PR adds early MVP capabilities of resharding flat storage (V3). Main addition is `FlatStorageResharder` and all the toolings around that. Also, you can see traces of an early attempt to tie-in the resharder to existing flat storage code, mainly the flat storage creator. - `FlatStorageResharder` takes care of everything related to resharding the flat storage. - Its running tasks can be interrupted by the a controller (integrated with existing resharding handle) - Uses the concept of a scheduler to run tasks in the background - `ReshardingEventType` is an utility enum to represent types of resharding. There's one for now, but it makes easier adding more. ## Achievements - Preparing flat storage's content for children after a shard split, for account-id based keys only. - Deletion of parent flat storage ## Missing pieces - Catchup phase for children and creation of proper flat storage - Handling more complex key-values (not account-id based) - Integration with resharding manager and flat storage creator - Additional tests - Metrics Missing pieces will likely be done in another PR. --- EDIT: integrated with ShardLayoutV2, fixed all unit tests, re-arranged description. --- chain/chain-primitives/src/error.rs | 5 + chain/chain/src/flat_storage_creator.rs | 20 +- chain/chain/src/flat_storage_resharder.rs | 948 ++++++++++++++++++ chain/chain/src/lib.rs | 1 + chain/chain/src/resharding/event_type.rs | 184 ++++ chain/chain/src/resharding/manager.rs | 21 +- chain/chain/src/resharding/mod.rs | 1 + chain/client/src/client.rs | 6 + core/primitives/src/errors.rs | 3 + core/primitives/src/shard_layout.rs | 56 +- core/store/src/adapter/flat_store.rs | 13 +- core/store/src/flat/mod.rs | 2 +- core/store/src/flat/storage.rs | 2 +- core/store/src/flat/types.rs | 52 + core/store/src/trie/from_flat.rs | 4 +- .../res/protocol_schema.toml | 26 +- 16 files changed, 1314 insertions(+), 30 deletions(-) create mode 100644 chain/chain/src/flat_storage_resharder.rs create mode 100644 chain/chain/src/resharding/event_type.rs diff --git a/chain/chain-primitives/src/error.rs b/chain/chain-primitives/src/error.rs index 866eb237794..a3395eb02f2 100644 --- a/chain/chain-primitives/src/error.rs +++ b/chain/chain-primitives/src/error.rs @@ -232,6 +232,9 @@ pub enum Error { /// GC error. #[error("GC Error: {0}")] GCError(String), + /// Resharding error. + #[error("Resharding Error: {0}")] + ReshardingError(String), /// Anything else #[error("Other Error: {0}")] Other(String), @@ -269,6 +272,7 @@ impl Error { | Error::CannotBeFinalized | Error::StorageError(_) | Error::GCError(_) + | Error::ReshardingError(_) | Error::DBNotFoundErr(_) => false, Error::InvalidBlockPastTime(_, _) | Error::InvalidBlockFutureTime(_) @@ -392,6 +396,7 @@ impl Error { Error::NotAValidator(_) => "not_a_validator", Error::NotAChunkValidator => "not_a_chunk_validator", Error::InvalidChallengeRoot => "invalid_challenge_root", + Error::ReshardingError(_) => "resharding_error", } } } diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index e26e721d90b..1f190f44cad 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -9,6 +9,7 @@ //! `CatchingUp`: moves flat storage head forward, so it may reach chain final head. //! `Ready`: flat storage is created and it is up-to-date. +use crate::flat_storage_resharder::FlatStorageResharder; use crate::types::RuntimeAdapter; use crate::{ChainStore, ChainStoreAccess}; use assert_matches::assert_matches; @@ -388,6 +389,13 @@ impl FlatStorageShardCreator { FlatStorageStatus::Disabled => { panic!("initiated flat storage creation for shard {shard_id} while it is disabled"); } + // If the flat storage is undergoing resharding it means it was previously created + // successfully, but resharding itself hasn't been finished. This case is a no-op + // because the flat storage resharder has already been created in + // `create_flat_storage_for_current_epoch`. + FlatStorageStatus::Resharding(_) => { + return Ok(true); + } }; Ok(false) } @@ -403,10 +411,13 @@ pub struct FlatStorageCreator { impl FlatStorageCreator { /// For each of tracked shards, either creates flat storage if it is already stored on DB, /// or starts migration to flat storage which updates DB in background and creates flat storage afterwards. + /// + /// Also resumes any resharding operation which was already in progress. pub fn new( epoch_manager: Arc, runtime: Arc, chain_store: &ChainStore, + flat_storage_resharder: &FlatStorageResharder, num_threads: usize, ) -> Result, Error> { let flat_storage_manager = runtime.get_flat_storage_manager(); @@ -420,6 +431,7 @@ impl FlatStorageCreator { &epoch_manager, &flat_storage_manager, &runtime, + &flat_storage_resharder, )?; // Create flat storage for the shards in the next epoch. This only @@ -447,6 +459,7 @@ impl FlatStorageCreator { epoch_manager: &Arc, flat_storage_manager: &FlatStorageManager, runtime: &Arc, + _flat_storage_resharder: &FlatStorageResharder, ) -> Result, Error> { let epoch_id = &chain_head.epoch_id; tracing::debug!(target: "store", ?epoch_id, "creating flat storage for the current epoch"); @@ -473,6 +486,10 @@ impl FlatStorageCreator { ); } FlatStorageStatus::Disabled => {} + FlatStorageStatus::Resharding(_status) => { + // TODO(Trisfald): call resume + // flat_storage_resharder.resume(shard_uid, &status, ...)?; + } } } @@ -502,7 +519,8 @@ impl FlatStorageCreator { } FlatStorageStatus::Empty | FlatStorageStatus::Creation(_) - | FlatStorageStatus::Disabled => { + | FlatStorageStatus::Disabled + | FlatStorageStatus::Resharding(_) => { // The flat storage for children shards will be created // separately in the resharding process. } diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs new file mode 100644 index 00000000000..54374e7e95b --- /dev/null +++ b/chain/chain/src/flat_storage_resharder.rs @@ -0,0 +1,948 @@ +//! Logic for resharding flat storage in parallel to chain processing. +//! +//! See [FlatStorageResharder] for more details about how the resharding takes place. + +use std::sync::{Arc, Mutex}; + +use crossbeam_channel::{Receiver, Sender}; +use near_chain_configs::ReshardingHandle; +use near_chain_primitives::Error; + +use tracing::{debug, error, info, warn}; + +use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams}; +use crate::types::RuntimeAdapter; +use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout}; +use near_primitives::state::FlatStateValue; +use near_primitives::trie_key::col::{self, ALL_COLUMNS_WITH_NAMES}; +use near_primitives::trie_key::trie_key_parsers::{ + parse_account_id_from_access_key_key, parse_account_id_from_account_key, + parse_account_id_from_contract_code_key, parse_account_id_from_contract_data_key, + parse_account_id_from_received_data_key, parse_account_id_from_trie_key_with_separator, +}; +use near_primitives::types::AccountId; +use near_store::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; +use near_store::adapter::StoreAdapter; +use near_store::flat::{ + BlockInfo, FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, + SplittingParentStatus, +}; +use near_store::{ShardUId, StorageError}; + +/// `FlatStorageResharder` takes care of updating flat storage when a resharding event happens. +/// +/// On an high level, the events supported are: +/// - #### Shard splitting +/// Parent shard must be split into two children. The entire operation freezes the flat storage +/// for the involved shards. Children shards are created empty and the key-values of the parent +/// will be copied into one of them, in the background. +/// +/// After the copy is finished the children shard will have the correct state at some past block +/// height. It'll be necessary to perform catchup before the flat storage can be put again in +/// Ready state. The parent shard storage is not needed anymore and can be removed. +/// +/// The resharder has also the following properties: +/// - Background processing: the bulk of resharding is done in a separate task, see +/// [FlatStorageResharderScheduler] +/// - Interruptible: a reshard operation can be cancelled through a +/// [FlatStorageResharderController]. +/// - In the case of event `Split` the state of flat storage will go back to what it was +/// previously. +#[derive(Clone)] +pub struct FlatStorageResharder { + runtime: Arc, + resharding_event: Arc>>, +} + +impl FlatStorageResharder { + /// Creates a new `FlatStorageResharder`. + pub fn new(runtime: Arc) -> Self { + let resharding_event = Arc::new(Mutex::new(None)); + Self { runtime, resharding_event } + } + + /// Starts a resharding event. + /// + /// For now, only splitting a shard is supported. + /// + /// # Args: + /// * `event_type`: the type of resharding event + /// * `shard_layout`: the new shard layout + /// * `scheduler`: component used to schedule the background tasks + /// * `controller`: manages the execution of the background tasks + pub fn start_resharding( + &self, + event_type: ReshardingEventType, + shard_layout: &ShardLayout, + scheduler: &dyn FlatStorageResharderScheduler, + controller: FlatStorageResharderController, + ) -> Result<(), Error> { + match event_type { + ReshardingEventType::SplitShard(params) => { + self.split_shard(params, shard_layout, scheduler, controller) + } + } + } + + /// Resumes a resharding event that was interrupted. + /// + /// Flat-storage resharding will resume upon a node crash. + /// + /// # Args: + /// * `shard_uid`: UId of the shard + /// * `status`: resharding status of the shard + /// * `scheduler`: component used to schedule the background tasks + /// * `controller`: manages the execution of the background tasks + pub fn resume( + &self, + shard_uid: ShardUId, + status: &FlatStorageReshardingStatus, + scheduler: &dyn FlatStorageResharderScheduler, + controller: FlatStorageResharderController, + ) -> Result<(), Error> { + match status { + FlatStorageReshardingStatus::CreatingChild => { + // Nothing to do here because the parent will take care of resuming work. + } + FlatStorageReshardingStatus::SplittingParent(status) => { + let parent_shard_uid = shard_uid; + info!(target: "resharding", ?parent_shard_uid, ?status, "resuming flat storage shard split"); + self.check_no_resharding_in_progress()?; + // On resume flat storage status is already set. + // However, we don't know the current state of children shards, + // so it's better to clean them. + self.clean_children_shards(&status)?; + self.schedule_split_shard(parent_shard_uid, &status, scheduler, controller); + } + FlatStorageReshardingStatus::CatchingUp(_) => { + info!(target: "resharding", ?shard_uid, ?status, "resuming flat storage shard catchup"); + // TODO(Trisfald): implement child catch up + todo!() + } + } + Ok(()) + } + + /// Starts the event of splitting a parent shard flat storage into two children. + fn split_shard( + &self, + split_params: ReshardingSplitShardParams, + shard_layout: &ShardLayout, + scheduler: &dyn FlatStorageResharderScheduler, + controller: FlatStorageResharderController, + ) -> Result<(), Error> { + let ReshardingSplitShardParams { + parent_shard, + left_child_shard, + right_child_shard, + block_hash, + prev_block_hash, + .. + } = split_params; + info!(target: "resharding", ?split_params, "initiating flat storage shard split"); + self.check_no_resharding_in_progress()?; + + // Change parent and children shards flat storage status. + let store = self.runtime.store().flat_store(); + let mut store_update = store.store_update(); + let flat_head = retrieve_shard_flat_head(parent_shard, &store)?; + let status = SplittingParentStatus { + left_child_shard, + right_child_shard, + shard_layout: shard_layout.clone(), + block_hash, + prev_block_hash, + flat_head, + }; + store_update.set_flat_storage_status( + parent_shard, + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::SplittingParent( + status.clone(), + )), + ); + store_update.set_flat_storage_status( + left_child_shard, + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild), + ); + store_update.set_flat_storage_status( + right_child_shard, + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild), + ); + store_update.commit()?; + + self.schedule_split_shard(parent_shard, &status, scheduler, controller); + Ok(()) + } + + /// Returns an error if a resharding event is in progress. + fn check_no_resharding_in_progress(&self) -> Result<(), StorageError> { + // Do not allow multiple resharding events in parallel. + if self.resharding_event().is_some() { + error!(target: "resharding", "trying to start a new flat storage resharding event while one is already in progress!"); + Err(StorageError::FlatStorageReshardingAlreadyInProgress) + } else { + Ok(()) + } + } + + fn set_resharding_event(&self, event: FlatStorageReshardingEventStatus) { + *self.resharding_event.lock().unwrap() = Some(event); + } + + /// Returns the current in-progress resharding event, if any. + pub fn resharding_event(&self) -> Option { + self.resharding_event.lock().unwrap().clone() + } + + /// Schedules a task to split a shard. + fn schedule_split_shard( + &self, + parent_shard: ShardUId, + status: &SplittingParentStatus, + scheduler: &dyn FlatStorageResharderScheduler, + controller: FlatStorageResharderController, + ) { + let event = FlatStorageReshardingEventStatus::SplitShard(parent_shard, status.clone()); + self.set_resharding_event(event); + info!(target: "resharding", ?parent_shard, ?status,"scheduling flat storage shard split"); + + let resharder = self.clone(); + let task = Box::new(move || split_shard_task(resharder, controller)); + scheduler.schedule(task); + } + + /// Cleans up children shards flat storage's content (status is excluded). + fn clean_children_shards(&self, status: &SplittingParentStatus) -> Result<(), Error> { + let SplittingParentStatus { left_child_shard, right_child_shard, .. } = status; + debug!(target: "resharding", ?left_child_shard, ?right_child_shard, "cleaning up children shards flat storage's content"); + let mut store_update = self.runtime.store().flat_store().store_update(); + for child in [left_child_shard, right_child_shard] { + store_update.remove_all_deltas(*child); + store_update.remove_all_values(*child); + } + store_update.commit()?; + Ok(()) + } + + /// Retrieves parent shard UIds and current resharding event status, only if a resharding event + /// is in progress and of type `Split`. + fn get_parent_shard_and_status(&self) -> Option<(ShardUId, SplittingParentStatus)> { + let event = self.resharding_event.lock().unwrap(); + match event.as_ref() { + Some(FlatStorageReshardingEventStatus::SplitShard(parent_shard, status)) => { + Some((*parent_shard, status.clone())) + } + None => None, + } + } +} + +/// Retrieves the flat head of the given `shard`. +/// The shard must be in [FlatStorageStatus::Ready] state otherwise this method returns an error. +fn retrieve_shard_flat_head(shard: ShardUId, store: &FlatStoreAdapter) -> Result { + let status = + store.get_flat_storage_status(shard).map_err(|err| Into::::into(err))?; + if let FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) = status { + Ok(flat_head) + } else { + let err_msg = "flat storage shard status is not ready!"; + error!(target: "resharding", ?shard, ?status, err_msg); + Err(Error::ReshardingError(err_msg.to_owned())) + } +} + +/// Task to perform the actual split of a flat storage shard. This may be a long operation time-wise. +/// +/// Conceptually it simply copies each key-value pair from the parent shard to the correct child. +fn split_shard_task(resharder: FlatStorageResharder, controller: FlatStorageResharderController) { + let task_status = split_shard_task_impl(resharder.clone(), controller.clone()); + split_shard_task_postprocessing(resharder, task_status); + info!(target: "resharding", ?task_status, "flat storage shard split task finished"); + if let Err(err) = controller.completion_sender.send(task_status) { + warn!(target: "resharding", ?err, "error notifying completion of flat storage shard split task") + }; +} + +/// Performs the bulk of [split_shard_task]. +/// +/// Returns `true` if the routine completed successfully. +fn split_shard_task_impl( + resharder: FlatStorageResharder, + controller: FlatStorageResharderController, +) -> FlatStorageReshardingTaskStatus { + if controller.is_interrupted() { + return FlatStorageReshardingTaskStatus::Cancelled; + } + + /// Determines after how many key-values the process stops to + /// commit changes and to check interruptions. + const BATCH_SIZE: usize = 10_000; + + let (parent_shard, status) = resharder + .get_parent_shard_and_status() + .expect("flat storage resharding event must be Split!"); + info!(target: "resharding", ?parent_shard, ?status, "flat storage shard split task: starting key-values copy"); + + // Parent shard flat storage head must be on block height just before the new shard layout kicks + // in. This guarantees that all deltas have been applied and thus the state of all key-values is + // up to date. + // TODO(trisfald): do this check, maybe call update_flat_storage_for_shard + let _parent_flat_head = status.flat_head; + + // Prepare the store object for commits and the iterator over parent's flat storage. + let flat_store = resharder.runtime.store().flat_store(); + let mut iter = flat_store.iter(parent_shard); + + loop { + let mut store_update = flat_store.store_update(); + + // Process a `BATCH_SIZE` worth of key value pairs. + let mut iter_exhausted = false; + for _ in 0..BATCH_SIZE { + match iter.next() { + Some(Ok((key, value))) => { + if let Err(err) = + shard_split_handle_key_value(key, value, &mut store_update, &status) + { + error!(target: "resharding", ?err, "failed to handle flat storage key"); + return FlatStorageReshardingTaskStatus::Failed; + } + } + Some(Err(err)) => { + error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); + return FlatStorageReshardingTaskStatus::Failed; + } + None => { + iter_exhausted = true; + } + } + } + + // Make a pause to commit and check if the routine should stop. + if let Err(err) = store_update.commit() { + error!(target: "resharding", ?err, "failed to commit store update"); + return FlatStorageReshardingTaskStatus::Failed; + } + + // TODO(Trisfald): metrics and logs + + // If `iter`` is exhausted we can exit after the store commit. + if iter_exhausted { + break; + } + if controller.is_interrupted() { + return FlatStorageReshardingTaskStatus::Cancelled; + } + } + FlatStorageReshardingTaskStatus::Successful +} + +/// Handles the inheritance of a key-value pair from parent shard to children shards. +fn shard_split_handle_key_value( + key: Vec, + value: FlatStateValue, + store_update: &mut FlatStoreUpdateAdapter, + status: &SplittingParentStatus, +) -> Result<(), Error> { + if key.is_empty() { + panic!("flat storage key is empty!") + } + let key_column_prefix = key[0]; + + match key_column_prefix { + col::ACCOUNT => { + copy_kv_to_child(&status, key, value, store_update, parse_account_id_from_account_key)? + } + col::CONTRACT_DATA => copy_kv_to_child( + &status, + key, + value, + store_update, + parse_account_id_from_contract_data_key, + )?, + col::CONTRACT_CODE => copy_kv_to_child( + &status, + key, + value, + store_update, + parse_account_id_from_contract_code_key, + )?, + col::ACCESS_KEY => copy_kv_to_child( + &status, + key, + value, + store_update, + parse_account_id_from_access_key_key, + )?, + col::RECEIVED_DATA => copy_kv_to_child( + &status, + key, + value, + store_update, + parse_account_id_from_received_data_key, + )?, + col::POSTPONED_RECEIPT_ID | col::PENDING_DATA_COUNT | col::POSTPONED_RECEIPT => { + copy_kv_to_child(&status, key, value, store_update, |raw_key: &[u8]| { + parse_account_id_from_trie_key_with_separator( + key_column_prefix, + raw_key, + ALL_COLUMNS_WITH_NAMES[key_column_prefix as usize].1, + ) + })? + } + col::DELAYED_RECEIPT_OR_INDICES + | col::PROMISE_YIELD_INDICES + | col::PROMISE_YIELD_TIMEOUT + | col::PROMISE_YIELD_RECEIPT + | col::BUFFERED_RECEIPT_INDICES + | col::BUFFERED_RECEIPT => { + // TODO(trisfald): implement logic and remove error log + let col_name = ALL_COLUMNS_WITH_NAMES[key_column_prefix as usize].1; + error!(target: "resharding", "flat storage resharding of {col_name} is not implemented yet!"); + } + _ => unreachable!(), + } + Ok(()) +} + +/// Performs post-processing of shard splitting after all key-values have been moved from parent to +/// children. `success` indicates whether or not the previous phase was successful. +fn split_shard_task_postprocessing( + resharder: FlatStorageResharder, + task_status: FlatStorageReshardingTaskStatus, +) { + let (parent_shard, split_status) = resharder + .get_parent_shard_and_status() + .expect("flat storage resharding event must be Split!"); + let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = split_status; + let flat_store = resharder.runtime.store().flat_store(); + info!(target: "resharding", ?parent_shard, ?task_status, ?split_status, "flat storage shard split task: post-processing"); + + let mut store_update = flat_store.store_update(); + match task_status { + FlatStorageReshardingTaskStatus::Successful => { + // Split shard completed successfully. + // Parent flat storage can be deleted from the FlatStoreManager. + resharder + .runtime + .get_flat_storage_manager() + .remove_flat_storage_for_shard(parent_shard, &mut store_update) + .unwrap(); + store_update.remove_flat_storage(parent_shard); + // Children must perform catchup. + for child_shard in [left_child_shard, right_child_shard] { + store_update.set_flat_storage_status( + child_shard, + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + flat_head.hash, + )), + ); + } + // TODO(trisfald): trigger catchup + } + FlatStorageReshardingTaskStatus::Failed | FlatStorageReshardingTaskStatus::Cancelled => { + // We got an error or an interrupt request. + // Reset parent. + store_update.set_flat_storage_status( + parent_shard, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }), + ); + // Remove children shards leftovers. + for child_shard in [left_child_shard, right_child_shard] { + store_update.remove_flat_storage(child_shard); + } + } + } + store_update.commit().unwrap(); + // Terminate the resharding event. + *resharder.resharding_event.lock().unwrap() = None; +} + +/// Copies a key-value pair to the correct child shard by matching the account-id to the provided shard layout. +fn copy_kv_to_child( + status: &SplittingParentStatus, + key: Vec, + value: FlatStateValue, + store_update: &mut FlatStoreUpdateAdapter, + account_id_parser: impl FnOnce(&[u8]) -> Result, +) -> Result<(), Error> { + let SplittingParentStatus { left_child_shard, right_child_shard, shard_layout, .. } = &status; + // Derive the shard uid for this account in the new shard layout. + let account_id = account_id_parser(&key)?; + let new_shard_id = account_id_to_shard_id(&account_id, shard_layout); + let new_shard_uid = ShardUId::from_shard_id_and_layout(new_shard_id, &shard_layout); + + // Sanity check we are truly writing to one of the expected children shards. + if new_shard_uid != *left_child_shard && new_shard_uid != *right_child_shard { + let err_msg = "account id doesn't map to any child shard!"; + error!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg); + return Err(Error::ReshardingError(err_msg.to_string())); + } + // Add the new flat store entry. + store_update.set(new_shard_uid, key, Some(value)); + Ok(()) +} + +/// Struct to describe, perform and track progress of a flat storage resharding. +#[derive(Clone, Debug)] +pub enum FlatStorageReshardingEventStatus { + /// Split a shard. + /// Includes the parent shard uid and the operation' status. + SplitShard(ShardUId, SplittingParentStatus), +} + +/// Status of a flat storage resharding task. +#[derive(Clone, Debug, Copy, Eq, PartialEq)] +pub enum FlatStorageReshardingTaskStatus { + Successful, + Failed, + Cancelled, +} + +/// Helps control the flat storage resharder operation. More specifically, +/// it has a way to know when the background task is done or to interrupt it. +#[derive(Clone)] +pub struct FlatStorageResharderController { + /// Resharding handle to control interruption. + handle: ReshardingHandle, + /// This object will be used to signal when the background task is completed. + completion_sender: Sender, + /// Corresponding receiver for `completion_sender`. + pub completion_receiver: Receiver, +} + +impl FlatStorageResharderController { + /// Creates a new `FlatStorageResharderController` with its own handle. + pub fn new() -> Self { + let (completion_sender, completion_receiver) = crossbeam_channel::bounded(1); + let handle = ReshardingHandle::new(); + Self { handle, completion_sender, completion_receiver } + } + + pub fn from_resharding_handle(handle: ReshardingHandle) -> Self { + let (completion_sender, completion_receiver) = crossbeam_channel::bounded(1); + Self { handle, completion_sender, completion_receiver } + } + + pub fn handle(&self) -> &ReshardingHandle { + &self.handle + } + + /// Returns whether or not background task is interrupted. + pub fn is_interrupted(&self) -> bool { + !self.handle.get() + } +} + +/// Represent the capability of scheduling the background tasks spawned by flat storage resharding. +pub trait FlatStorageResharderScheduler { + fn schedule(&self, f: Box); +} + +#[cfg(test)] +mod tests { + use std::{cell::RefCell, collections::BTreeMap, time::Duration}; + + use near_async::time::Clock; + use near_chain_configs::{Genesis, MutableConfigValue}; + use near_epoch_manager::{shard_tracker::ShardTracker, EpochManager}; + use near_o11y::testonly::init_test_logger; + use near_primitives::{ + hash::CryptoHash, shard_layout::ShardLayout, state::FlatStateValue, trie_key::TrieKey, + types::AccountId, + }; + use near_store::{ + flat::{BlockInfo, FlatStorageReadyStatus}, + genesis::initialize_genesis_state, + test_utils::create_test_store, + }; + + use crate::{ + rayon_spawner::RayonAsyncComputationSpawner, runtime::NightshadeRuntime, + types::ChainConfig, Chain, ChainGenesis, DoomslugThresholdMode, + }; + + use super::*; + + /// Shorthand to create account ID. + macro_rules! account { + ($str:expr) => { + $str.parse::().unwrap() + }; + } + + struct TestScheduler {} + + impl FlatStorageResharderScheduler for TestScheduler { + fn schedule(&self, f: Box) { + f(); + } + } + + #[derive(Default)] + struct DelayedScheduler { + callable: RefCell>>, + } + + impl DelayedScheduler { + fn call(&self) { + self.callable.take().unwrap()(); + } + } + + impl FlatStorageResharderScheduler for DelayedScheduler { + fn schedule(&self, f: Box) { + *self.callable.borrow_mut() = Some(f); + } + } + + /// Simple shard layout with two shards. + fn simple_shard_layout() -> ShardLayout { + let shards_split_map = BTreeMap::from([(0, vec![0]), (1, vec![1])]); + ShardLayout::v2(vec![account!("ff")], vec![0, 1], Some(shards_split_map)) + } + + /// Derived from [simple_shard_layout] by splitting the second shard. + fn shard_layout_after_split() -> ShardLayout { + let shards_split_map = BTreeMap::from([(0, vec![0]), (1, vec![2, 3])]); + ShardLayout::v2(vec![account!("ff"), account!("pp")], vec![0, 2, 3], Some(shards_split_map)) + } + + /// Generic test setup. + fn create_fs_resharder(shard_layout: ShardLayout) -> (Chain, FlatStorageResharder) { + let num_shards = shard_layout.shard_ids().count(); + let genesis = Genesis::test_with_seeds( + Clock::real(), + vec![account!("aa"), account!("mm"), account!("vv")], + 1, + vec![1; num_shards], + shard_layout, + ); + let tempdir = tempfile::tempdir().unwrap(); + let store = create_test_store(); + initialize_genesis_state(store.clone(), &genesis, Some(tempdir.path())); + let epoch_manager = EpochManager::new_arc_handle(store.clone(), &genesis.config); + let shard_tracker = ShardTracker::new_empty(epoch_manager.clone()); + let runtime = + NightshadeRuntime::test(tempdir.path(), store, &genesis.config, epoch_manager.clone()); + let chain_genesis = ChainGenesis::new(&genesis.config); + let chain = Chain::new( + Clock::real(), + epoch_manager, + shard_tracker, + runtime.clone(), + &chain_genesis, + DoomslugThresholdMode::NoApprovals, + ChainConfig::test(), + None, + Arc::new(RayonAsyncComputationSpawner), + MutableConfigValue::new(None, "validator_signer"), + ) + .unwrap(); + (chain, FlatStorageResharder::new(runtime)) + } + + /// Verify that another resharding can't be triggered if one is ongoing. + #[test] + fn concurrent_reshardings_are_disallowed() { + init_test_logger(); + let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let scheduler = DelayedScheduler::default(); + let controller = FlatStorageResharderController::new(); + let resharding_event_type = ReshardingEventType::from_shard_layout( + &new_shard_layout, + chain.head().unwrap().last_block_hash, + chain.head().unwrap().prev_block_hash, + ) + .unwrap() + .unwrap(); + + assert!(resharder + .start_resharding( + resharding_event_type.clone(), + &new_shard_layout, + &scheduler, + controller.clone() + ) + .is_ok()); + + // Immediately interrupt the resharding. + controller.handle().stop(); + + assert!(resharder.resharding_event().is_some()); + assert!(resharder + .start_resharding(resharding_event_type, &new_shard_layout, &scheduler, controller) + .is_err()); + } + + /// Flat storage shard status should be set correctly upon starting a shard split. + #[test] + fn flat_storage_split_status_set() { + init_test_logger(); + let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let scheduler = DelayedScheduler::default(); + let controller = FlatStorageResharderController::new(); + let flat_store = resharder.runtime.store().flat_store(); + let resharding_event_type = ReshardingEventType::from_shard_layout( + &new_shard_layout, + chain.head().unwrap().last_block_hash, + chain.head().unwrap().prev_block_hash, + ) + .unwrap() + .unwrap(); + + assert!(resharder + .start_resharding(resharding_event_type, &new_shard_layout, &scheduler, controller) + .is_ok()); + + let resharding_event = resharder.resharding_event(); + match resharding_event.unwrap() { + FlatStorageReshardingEventStatus::SplitShard(parent, status) => { + assert_eq!( + flat_store.get_flat_storage_status(parent), + Ok(FlatStorageStatus::Resharding( + FlatStorageReshardingStatus::SplittingParent(status.clone()) + )) + ); + assert_eq!( + flat_store.get_flat_storage_status(status.left_child_shard), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild)) + ); + assert_eq!( + flat_store.get_flat_storage_status(status.right_child_shard), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild)) + ); + } + } + } + + /// In this test we write some dirty state into children shards and then try to resume a shard split. + /// Verify that the dirty writes are cleaned up correctly. + #[test] + fn resume_split_starts_from_clean_state() { + init_test_logger(); + let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let flat_store = resharder.runtime.store().flat_store(); + let new_shard_layout = shard_layout_after_split(); + let resharding_event_type = ReshardingEventType::from_shard_layout( + &new_shard_layout, + chain.head().unwrap().last_block_hash, + chain.head().unwrap().prev_block_hash, + ) + .unwrap() + .unwrap(); + let ReshardingSplitShardParams { + parent_shard, left_child_shard, right_child_shard, .. + } = match resharding_event_type { + ReshardingEventType::SplitShard(params) => params, + }; + + let mut store_update = flat_store.store_update(); + + // Write some random key-values in children shards. + let dirty_key: Vec = vec![1, 2, 3, 4]; + let dirty_value = Some(FlatStateValue::Inlined(dirty_key.clone())); + for child_shard in [left_child_shard, right_child_shard] { + store_update.set(child_shard, dirty_key.clone(), dirty_value.clone()); + } + + // Set parent state to ShardSplitting, manually, to simulate a forcibly cancelled resharding attempt. + let resharding_status = + FlatStorageReshardingStatus::SplittingParent(SplittingParentStatus { + // Values don't matter. + left_child_shard, + right_child_shard, + shard_layout: new_shard_layout, + block_hash: CryptoHash::default(), + prev_block_hash: CryptoHash::default(), + flat_head: BlockInfo { + hash: CryptoHash::default(), + height: 1, + prev_hash: CryptoHash::default(), + }, + }); + store_update.set_flat_storage_status( + parent_shard, + FlatStorageStatus::Resharding(resharding_status.clone()), + ); + + store_update.commit().unwrap(); + + // Resume resharding. + let scheduler = TestScheduler {}; + let controller = FlatStorageResharderController::new(); + resharder.resume(parent_shard, &resharding_status, &scheduler, controller).unwrap(); + + // Children should not contain the random keys written before. + for child_shard in [left_child_shard, right_child_shard] { + assert_eq!(flat_store.get(child_shard, &dirty_key), Ok(None)); + } + } + + /// Tests a simple split shard scenario. + /// + /// Old layout: + /// shard 0 -> accounts [aa] + /// shard 1 -> accounts [mm, vv] + /// + /// New layout: + /// shard 0 -> accounts [aa] + /// shard 2 -> accounts [mm] + /// shard 3 -> accounts [vv] + /// + /// Shard to split is shard 1. + #[test] + fn simple_split_shard() { + init_test_logger(); + // Perform resharding. + let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let scheduler = TestScheduler {}; + let controller = FlatStorageResharderController::new(); + let resharding_event_type = ReshardingEventType::from_shard_layout( + &new_shard_layout, + chain.head().unwrap().last_block_hash, + chain.head().unwrap().prev_block_hash, + ) + .unwrap() + .unwrap(); + + assert!(resharder + .start_resharding( + resharding_event_type, + &new_shard_layout, + &scheduler, + controller.clone() + ) + .is_ok()); + + // Check flat storages of children contain the correct accounts. + let left_child = ShardUId { version: 3, shard_id: 2 }; + let right_child = ShardUId { version: 3, shard_id: 3 }; + let flat_store = resharder.runtime.store().flat_store(); + let account_mm_key = TrieKey::Account { account_id: account!("mm") }; + let account_vv_key = TrieKey::Account { account_id: account!("vv") }; + assert!(flat_store + .get(left_child, &account_mm_key.to_vec()) + .is_ok_and(|val| val.is_some())); + assert!(flat_store + .get(right_child, &account_vv_key.to_vec()) + .is_ok_and(|val| val.is_some())); + + // Controller should signal that resharding ended. + assert_eq!( + controller.completion_receiver.recv_timeout(Duration::from_secs(1)), + Ok(FlatStorageReshardingTaskStatus::Successful) + ); + + // Check final status of parent flat storage. + let parent = ShardUId { version: 3, shard_id: 1 }; + assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty)); + assert_eq!(flat_store.iter(parent).count(), 0); + assert!(resharder + .runtime + .get_flat_storage_manager() + .get_flat_storage_for_shard(parent) + .is_none()); + + // Check final status of children flat storages. + let last_hash = chain.head().unwrap().last_block_hash; + assert_eq!( + flat_store.get_flat_storage_status(left_child), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(last_hash))) + ); + assert_eq!( + flat_store.get_flat_storage_status(left_child), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(last_hash))) + ); + } + + #[test] + fn interrupt_split_shard() { + init_test_logger(); + // Perform resharding. + let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let scheduler = DelayedScheduler::default(); + let controller = FlatStorageResharderController::new(); + let resharding_event_type = ReshardingEventType::from_shard_layout( + &new_shard_layout, + chain.head().unwrap().last_block_hash, + chain.head().unwrap().prev_block_hash, + ) + .unwrap() + .unwrap(); + + assert!(resharder + .start_resharding( + resharding_event_type, + &new_shard_layout, + &scheduler, + controller.clone() + ) + .is_ok()); + let (parent_shard, status) = resharder.get_parent_shard_and_status().unwrap(); + let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = status; + + // Interrupt the task before it starts. + controller.handle().stop(); + + // Run the task. + scheduler.call(); + + // Check that resharding was effectively cancelled. + let flat_store = resharder.runtime.store().flat_store(); + assert_eq!( + controller.completion_receiver.recv_timeout(Duration::from_secs(1)), + Ok(FlatStorageReshardingTaskStatus::Cancelled) + ); + assert_eq!( + flat_store.get_flat_storage_status(parent_shard), + Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) + ); + for child_shard in [left_child_shard, right_child_shard] { + assert_eq!( + flat_store.get_flat_storage_status(status.left_child_shard), + Ok(FlatStorageStatus::Empty) + ); + assert_eq!(flat_store.iter(child_shard).count(), 0); + } + } + + /// A shard can't be split if it isn't in ready state. + #[test] + fn reject_split_shard_if_parent_is_not_ready() { + let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let scheduler = TestScheduler {}; + let controller = FlatStorageResharderController::new(); + let resharding_event_type = ReshardingEventType::from_shard_layout( + &new_shard_layout, + chain.head().unwrap().last_block_hash, + chain.head().unwrap().prev_block_hash, + ) + .unwrap() + .unwrap(); + + // Make flat storage of parent shard not ready. + let parent_shard = ShardUId { version: 3, shard_id: 1 }; + let flat_store = resharder.runtime.store().flat_store(); + let mut store_update = flat_store.store_update(); + store_update.set_flat_storage_status(parent_shard, FlatStorageStatus::Empty); + store_update.commit().unwrap(); + + // Trigger resharding and it should fail. + assert!(resharder + .start_resharding(resharding_event_type, &new_shard_layout, &scheduler, controller) + .is_err()); + } + + /// Verify that a shard can be split correctly even if its flat head is lagging behind the expected + /// block height. + #[test] + fn split_shard_parent_flat_store_lagging_behind() { + // TODO(Trisfald): implement + } +} diff --git a/chain/chain/src/lib.rs b/chain/chain/src/lib.rs index ed95f14c37f..fb228431928 100644 --- a/chain/chain/src/lib.rs +++ b/chain/chain/src/lib.rs @@ -18,6 +18,7 @@ mod chain_update; pub mod crypto_hash_timer; mod doomslug; pub mod flat_storage_creator; +pub mod flat_storage_resharder; mod garbage_collection; mod lightclient; pub mod metrics; diff --git a/chain/chain/src/resharding/event_type.rs b/chain/chain/src/resharding/event_type.rs new file mode 100644 index 00000000000..68b756c1d8d --- /dev/null +++ b/chain/chain/src/resharding/event_type.rs @@ -0,0 +1,184 @@ +//! Collection of all resharding V3 event types. + +use near_chain_primitives::Error; +use near_primitives::hash::CryptoHash; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::types::AccountId; +use near_store::ShardUId; +use tracing::error; + +/// Struct used to destructure a new shard layout definition into the resulting resharding event. +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] +pub enum ReshardingEventType { + /// Split of a shard. + SplitShard(ReshardingSplitShardParams), +} + +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] +pub struct ReshardingSplitShardParams { + // Shard being split. + pub parent_shard: ShardUId, + // Child to the left of the account boundary. + pub left_child_shard: ShardUId, + // Child to the right of the account boundary. + pub right_child_shard: ShardUId, + /// The account at the boundary between the two children. + pub boundary_account: AccountId, + /// Hash of the first block having the new shard layout. + pub block_hash: CryptoHash, + /// The block before `block_hash`. + pub prev_block_hash: CryptoHash, +} + +impl ReshardingEventType { + /// Takes as input a [ShardLayout] definition and deduces which kind of resharding operation must be + /// performed. + /// + /// # Args: + /// * `shard_layout`: the new shard layout + /// * `block_hash`: hash of the first block with `shard_layout` + /// * `prev_block_hash`: hash of the block preceding `block_hash` + /// + /// Returns a [ReshardingEventType] if exactly one resharding change is contained in `shard_layout`, otherwise returns `None`. + pub fn from_shard_layout( + shard_layout: &ShardLayout, + block_hash: CryptoHash, + prev_block_hash: CryptoHash, + ) -> Result, Error> { + let log_and_error = |err_msg: &str| { + error!(target: "resharding", ?shard_layout, err_msg); + Err(Error::ReshardingError(err_msg.to_owned())) + }; + + // Resharding V3 supports shard layout V2 onwards. + let (shards_split_map, boundary_accounts) = match shard_layout { + ShardLayout::V0(_) | ShardLayout::V1(_) => { + return log_and_error("unsupported shard layout!"); + } + ShardLayout::V2(layout) => { + let Some(shards_split_map) = layout.shards_split_map() else { + return log_and_error("ShardLayoutV2 must have a shards_split_map!"); + }; + (shards_split_map, layout.boundary_accounts()) + } + }; + + let mut event = None; + + // Look for a shard having exactly two children, to detect a split. + for (parent_id, children_ids) in shards_split_map { + match children_ids.len() { + 1 => {} + 2 => { + if event.is_some() { + return log_and_error("can't perform two reshardings at the same time!"); + } + // Parent shard is no longer part of this shard layout. + let parent_shard = + ShardUId { version: shard_layout.version(), shard_id: *parent_id as u32 }; + let left_child_shard = + ShardUId::from_shard_id_and_layout(children_ids[0], shard_layout); + let right_child_shard = + ShardUId::from_shard_id_and_layout(children_ids[1], shard_layout); + // Find the boundary account between the two children. + let Some(boundary_account_index) = + shard_layout.shard_ids().position(|id| id == left_child_shard.shard_id()) + else { + return log_and_error(&format!( + "shard {left_child_shard} not found in shard layout" + )); + }; + let boundary_account = boundary_accounts[boundary_account_index].clone(); + event = Some(ReshardingEventType::SplitShard(ReshardingSplitShardParams { + parent_shard, + left_child_shard, + right_child_shard, + boundary_account, + block_hash, + prev_block_hash, + })); + } + _ => { + return log_and_error(&format!( + "invalid number of children for shard {parent_id}" + )); + } + } + } + + // We may have found at least one resharding event by now. + Ok(event) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use near_primitives::shard_layout::ShardLayout; + use near_primitives::types::AccountId; + use near_store::ShardUId; + use std::collections::BTreeMap; + + /// Shorthand to create account ID. + macro_rules! account { + ($str:expr) => { + $str.parse::().unwrap() + }; + } + + /// Verify that the correct type of resharding is deduced from a new shard layout. + #[test] + fn parse_event_type_from_shard_layout() { + let block = CryptoHash::hash_bytes(&[1]); + let prev_block = CryptoHash::hash_bytes(&[2]); + + // Shard layouts V0 and V1 are rejected. + assert!(ReshardingEventType::from_shard_layout( + &ShardLayout::v0_single_shard(), + block, + prev_block + ) + .is_err()); + assert!(ReshardingEventType::from_shard_layout(&ShardLayout::v1_test(), block, prev_block) + .is_err()); + + // No resharding is ok. + let shards_split_map = BTreeMap::from([(0, vec![0])]); + let layout = ShardLayout::v2(vec![], vec![0], Some(shards_split_map)); + assert!(ReshardingEventType::from_shard_layout(&layout, block, prev_block) + .is_ok_and(|event| event.is_none())); + + // Single split shard is ok. + let shards_split_map = BTreeMap::from([(0, vec![0]), (1, vec![2, 3])]); + let layout = ShardLayout::v2( + vec![account!("ff"), account!("pp")], + vec![0, 2, 3], + Some(shards_split_map), + ); + + let event_type = + ReshardingEventType::from_shard_layout(&layout, block, prev_block).unwrap(); + assert_eq!( + event_type, + Some(ReshardingEventType::SplitShard(ReshardingSplitShardParams { + parent_shard: ShardUId { version: 3, shard_id: 1 }, + left_child_shard: ShardUId { version: 3, shard_id: 2 }, + right_child_shard: ShardUId { version: 3, shard_id: 3 }, + block_hash: block, + prev_block_hash: prev_block, + boundary_account: account!("pp") + })) + ); + + // Double split shard is not ok. + let shards_split_map = BTreeMap::from([(0, vec![2, 3]), (1, vec![4, 5])]); + let layout = ShardLayout::v2( + vec![account!("ff"), account!("pp"), account!("ss")], + vec![2, 3, 4, 5], + Some(shards_split_map), + ); + assert!(ReshardingEventType::from_shard_layout(&layout, block, prev_block).is_err()); + } +} diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 43b8fc36a72..bf8e4267ae5 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -1,6 +1,6 @@ -use std::str::FromStr; use std::sync::Arc; +use super::event_type::ReshardingEventType; use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::Error; use near_epoch_manager::EpochManagerAdapter; @@ -10,7 +10,6 @@ use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::get_block_shard_uid; use near_primitives::stateless_validation::stored_chunk_state_transition_data::StoredChunkStateTransitionData; use near_primitives::types::chunk_extra::ChunkExtra; -use near_primitives::types::AccountId; use near_primitives::utils::get_block_shard_id; use near_store::adapter::StoreUpdateAdapter; use near_store::trie::mem::resharding::RetainMode; @@ -57,15 +56,18 @@ impl ReshardingManager { let next_epoch_id = self.epoch_manager.get_next_epoch_id_from_prev_block(prev_hash)?; let next_shard_layout = self.epoch_manager.get_shard_layout(&next_epoch_id)?; - let children_shard_uids = - next_shard_layout.get_children_shards_uids(shard_uid.shard_id()).unwrap(); // Hack to ensure this logic is not applied before ReshardingV3. // TODO(#12019): proper logic. - if next_shard_layout.version() < 3 || children_shard_uids.len() == 1 { + if next_shard_layout.version() < 3 { return Ok(()); } - assert_eq!(children_shard_uids.len(), 2); + + let resharding_event_type = + ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash, *prev_hash)?; + let Some(ReshardingEventType::SplitShard(split_shard_event)) = resharding_event_type else { + return Ok(()); + }; let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?; let Some(mem_tries) = tries.get_mem_tries(shard_uid) else { @@ -80,13 +82,12 @@ impl ReshardingManager { return Err(Error::Other("Memtrie not loaded".to_string())); }; - // TODO(#12019): take proper boundary account. - let boundary_account = AccountId::from_str("boundary.near").unwrap(); + let boundary_account = split_shard_event.boundary_account; // TODO(#12019): leave only tracked shards. for (new_shard_uid, retain_mode) in [ - (children_shard_uids[0], RetainMode::Left), - (children_shard_uids[1], RetainMode::Right), + (split_shard_event.left_child_shard, RetainMode::Left), + (split_shard_event.right_child_shard, RetainMode::Right), ] { let mut mem_tries = mem_tries.write().unwrap(); let mem_trie_update = mem_tries.update(*chunk_extra.state_root(), true)?; diff --git a/chain/chain/src/resharding/mod.rs b/chain/chain/src/resharding/mod.rs index 8a316f46fdd..f3e8410acc6 100644 --- a/chain/chain/src/resharding/mod.rs +++ b/chain/chain/src/resharding/mod.rs @@ -1,3 +1,4 @@ +pub mod event_type; pub mod manager; pub mod resharding_v2; diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 6bbff7d0994..045820d3cf0 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -25,6 +25,7 @@ use near_chain::chain::{ BlocksCatchUpState, LoadMemtrieRequest, VerifyBlockHashAndSignatureResult, }; use near_chain::flat_storage_creator::FlatStorageCreator; +use near_chain::flat_storage_resharder::FlatStorageResharder; use near_chain::orphan::OrphanMissingChunks; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::format_hash; @@ -176,6 +177,8 @@ pub struct Client { /// Cached precomputed set of TIER1 accounts. /// See send_network_chain_info(). tier1_accounts_cache: Option<(EpochId, Arc)>, + /// Takes care of performing resharding on the flat storage. + pub flat_storage_resharder: FlatStorageResharder, /// Used when it is needed to create flat storage in background for some shards. flat_storage_creator: Option, /// A map storing the last time a block was requested for state sync. @@ -269,11 +272,13 @@ impl Client { async_computation_spawner.clone(), validator_signer.clone(), )?; + let flat_storage_resharder = FlatStorageResharder::new(runtime_adapter.clone()); // Create flat storage or initiate migration to flat storage. let flat_storage_creator = FlatStorageCreator::new( epoch_manager.clone(), runtime_adapter.clone(), chain.chain_store(), + &flat_storage_resharder, chain_config.background_migration_threads, )?; let sharded_tx_pool = @@ -397,6 +402,7 @@ impl Client { NonZeroUsize::new(PRODUCTION_TIMES_CACHE_SIZE).unwrap(), ), tier1_accounts_cache: None, + flat_storage_resharder, flat_storage_creator, last_time_sync_block_requested: HashMap::new(), chunk_validator, diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 1f437d1f3ad..6cf280f42e6 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -134,6 +134,9 @@ pub enum StorageError { FlatStorageBlockNotSupported(String), /// In-memory trie could not be loaded for some reason. MemTrieLoadingError(String), + /// Indicates that a resharding operation on flat storage is already in progress, + /// when it wasn't expected to be so. + FlatStorageReshardingAlreadyInProgress, } impl std::fmt::Display for StorageError { diff --git a/core/primitives/src/shard_layout.rs b/core/primitives/src/shard_layout.rs index a11914d9d55..022617bc35a 100644 --- a/core/primitives/src/shard_layout.rs +++ b/core/primitives/src/shard_layout.rs @@ -51,7 +51,17 @@ use std::{fmt, str}; pub type ShardVersion = u32; -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive( + BorshSerialize, + BorshDeserialize, + serde::Serialize, + serde::Deserialize, + Clone, + Debug, + PartialEq, + Eq, + ProtocolSchema, +)] pub enum ShardLayout { V0(ShardLayoutV0), V1(ShardLayoutV1), @@ -63,7 +73,17 @@ pub enum ShardLayout { /// to keep backward compatibility for some existing tests. /// `parent_shards` for `ShardLayoutV1` is always `None`, meaning it can only be the first shard layout /// a chain uses. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive( + BorshSerialize, + BorshDeserialize, + serde::Serialize, + serde::Deserialize, + Clone, + Debug, + PartialEq, + Eq, + ProtocolSchema, +)] pub struct ShardLayoutV0 { /// Map accounts evenly across all shards num_shards: NumShards, @@ -102,7 +122,17 @@ fn new_shards_split_map_v2(shards_split_map: BTreeMap>) -> ShardsS shards_split_map.into_iter().map(|(k, v)| (k.into(), new_shard_ids_vec(v))).collect() } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive( + BorshSerialize, + BorshDeserialize, + serde::Serialize, + serde::Deserialize, + Clone, + Debug, + PartialEq, + Eq, + ProtocolSchema, +)] pub struct ShardLayoutV1 { /// The boundary accounts are the accounts on boundaries between shards. /// Each shard contains a range of accounts from one boundary account to @@ -136,7 +166,17 @@ impl ShardLayoutV1 { } /// Making the shard ids non-contiguous. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive( + BorshSerialize, + BorshDeserialize, + serde::Serialize, + serde::Deserialize, + Clone, + Debug, + PartialEq, + Eq, + ProtocolSchema, +)] pub struct ShardLayoutV2 { /// The boundary accounts are the accounts on boundaries between shards. /// Each shard contains a range of accounts from one boundary account to @@ -187,6 +227,14 @@ impl ShardLayoutV2 { } self.shard_ids[shard_id_index] } + + pub fn shards_split_map(&self) -> &Option { + &self.shards_split_map + } + + pub fn boundary_accounts(&self) -> &Vec { + &self.boundary_accounts + } } #[derive(Debug)] diff --git a/core/store/src/adapter/flat_store.rs b/core/store/src/adapter/flat_store.rs index 6cb8d2e687e..322f4dd09cc 100644 --- a/core/store/src/adapter/flat_store.rs +++ b/core/store/src/adapter/flat_store.rs @@ -234,7 +234,7 @@ impl<'a> FlatStoreUpdateAdapter<'a> { } } - pub fn remove_all(&mut self, shard_uid: ShardUId) { + pub fn remove_all_values(&mut self, shard_uid: ShardUId) { self.remove_range_by_shard_uid(shard_uid, DBCol::FlatState); } @@ -244,6 +244,10 @@ impl<'a> FlatStoreUpdateAdapter<'a> { .expect("Borsh should not have failed here") } + pub fn remove_status(&mut self, shard_uid: ShardUId) { + self.store_update.delete(DBCol::FlatStorageStatus, &shard_uid.to_bytes()); + } + pub fn set_delta(&mut self, shard_uid: ShardUId, delta: &FlatStateDelta) { let key = KeyForFlatStateDelta { shard_uid, block_hash: delta.metadata.block.hash }.to_bytes(); @@ -266,6 +270,13 @@ impl<'a> FlatStoreUpdateAdapter<'a> { self.remove_range_by_shard_uid(shard_uid, DBCol::FlatStateDeltaMetadata); } + /// Removes flat storage in its entirety for a shard: deltas, values and status. + pub fn remove_flat_storage(&mut self, shard_uid: ShardUId) { + self.remove_all_deltas(shard_uid); + self.remove_all_values(shard_uid); + self.remove_status(shard_uid); + } + // helper fn remove_range_by_shard_uid(&mut self, shard_uid: ShardUId, col: DBCol) { let key_from = shard_uid.to_bytes(); diff --git a/core/store/src/flat/mod.rs b/core/store/src/flat/mod.rs index 1e0b1d967ab..65c3aa3a81e 100644 --- a/core/store/src/flat/mod.rs +++ b/core/store/src/flat/mod.rs @@ -41,7 +41,7 @@ pub use metrics::FlatStorageCreationMetrics; pub use storage::FlatStorage; pub use types::{ BlockInfo, FetchingStateStatus, FlatStateIterator, FlatStorageCreationStatus, FlatStorageError, - FlatStorageReadyStatus, FlatStorageStatus, + FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, SplittingParentStatus, }; pub(crate) const POISONED_LOCK_ERR: &str = "The lock was poisoned."; diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 79f7001aacf..de54a39a0b6 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -478,7 +478,7 @@ impl FlatStorage { ) -> Result<(), StorageError> { let guard = self.0.write().expect(super::POISONED_LOCK_ERR); let shard_uid = guard.shard_uid; - store_update.remove_all(shard_uid); + store_update.remove_all_values(shard_uid); store_update.remove_all_deltas(shard_uid); store_update.set_flat_storage_status(shard_uid, FlatStorageStatus::Empty); guard.update_delta_metrics(); diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index 744ae1b936b..0dd2a5ff933 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -1,6 +1,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; use near_primitives::state::FlatStateValue; use near_primitives::types::BlockHeight; use near_schema_checker_lib::ProtocolSchema; @@ -58,6 +59,8 @@ pub enum FlatStorageStatus { Creation(FlatStorageCreationStatus), /// Flat Storage is ready to be used. Ready(FlatStorageReadyStatus), + /// Flat storage is undergoing resharding. + Resharding(FlatStorageReshardingStatus), } impl Into for &FlatStorageStatus { @@ -74,6 +77,12 @@ impl Into for &FlatStorageStatus { FlatStorageCreationStatus::FetchingState(_) => 11, FlatStorageCreationStatus::CatchingUp(_) => 12, }, + // 20..30 is reserved for resharding statuses. + FlatStorageStatus::Resharding(resharding_status) => match resharding_status { + FlatStorageReshardingStatus::SplittingParent(_) => 20, + FlatStorageReshardingStatus::CreatingChild => 21, + FlatStorageReshardingStatus::CatchingUp(_) => 22, + }, } } } @@ -119,6 +128,29 @@ pub enum FlatStorageCreationStatus { CatchingUp(CryptoHash), } +/// This struct represents what is the current status of flat storage resharding. +/// During resharding flat storage must be changed to reflect the new shard layout. +/// +/// When two shards are split, the parent shard disappears and two children are created. The flat storage +/// entries that belonged to the parent must be copied in one of the two shards. This operation happens in the +/// background and could take significant time. +/// After all elements have been copied the new flat storages will be behind the chain head. To remediate this issue +/// they will enter a catching up phase. The parent shard, instead, must be removed and cleaned up. +#[derive( + BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema, +)] +pub enum FlatStorageReshardingStatus { + /// Resharding phase entered when a shard is being split. + /// Copy key-value pairs from this shard (the parent) to children shards. + SplittingParent(SplittingParentStatus), + /// Resharding phase entered when a shard is being split. + /// This shard (child) is being built from state taken from its parent. + CreatingChild, + /// We apply deltas from disk until the head reaches final head. + /// Includes block hash of flat storage head. + CatchingUp(CryptoHash), +} + /// Current step of fetching state to fill flat storage. #[derive( BorshSerialize, @@ -142,5 +174,25 @@ pub struct FetchingStateStatus { pub num_parts: u64, } +/// Holds the state associated to [FlatStorageReshardingStatus::SplittingParent]. +/// This struct stores the necessary data to execute a shard split of a parent shard into two children. +#[derive( + BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema, +)] +pub struct SplittingParentStatus { + /// UId of the left child shard. Will contain everything lesser than boundary account. + pub left_child_shard: ShardUId, + /// UId of the right child shard. Will contain everything greater or equal than boundary account. + pub right_child_shard: ShardUId, + /// The new shard layout. + pub shard_layout: ShardLayout, + /// Hash of the first block having the new shard layout. + pub block_hash: CryptoHash, + /// The block before `block_hash`. + pub prev_block_hash: CryptoHash, + /// Parent's flat head state when the split began. + pub flat_head: BlockInfo, +} + pub type FlatStateIterator<'a> = Box, FlatStateValue)>> + 'a>; diff --git a/core/store/src/trie/from_flat.rs b/core/store/src/trie/from_flat.rs index 97ccb717666..eeb1e887d6e 100644 --- a/core/store/src/trie/from_flat.rs +++ b/core/store/src/trie/from_flat.rs @@ -6,11 +6,11 @@ use std::time::Instant; // This function creates a new trie from flat storage for a given shard_uid // store: location of RocksDB store from where we read flatstore -// write_store: location of RocksDB store where we write the newly constructred trie +// write_store: location of RocksDB store where we write the newly constructed trie // shard_uid: The shard which we are recreating // // Please note that the trie is created for the block state with height equal to flat_head -// flat state can comtain deltas after flat_head and can be different from tip of the blockchain. +// flat state can contain deltas after flat_head and can be different from tip of the blockchain. pub fn construct_trie_from_flat(store: Store, write_store: Store, shard_uid: ShardUId) { let trie_storage = TrieDBStorage::new(store.trie_store(), shard_uid); let flat_state_to_trie_kv = diff --git a/tools/protocol-schema-check/res/protocol_schema.toml b/tools/protocol-schema-check/res/protocol_schema.toml index 7c138ad6180..3b31a183e0a 100644 --- a/tools/protocol-schema-check/res/protocol_schema.toml +++ b/tools/protocol-schema-check/res/protocol_schema.toml @@ -103,11 +103,11 @@ EpochInfoV4 = 434230701 EpochSummary = 742414117 EpochValidatorInfo = 378323971 ExecutionMetadata = 3853243413 -ExecutionOutcome = 1325623645 -ExecutionOutcomeWithId = 36999569 -ExecutionOutcomeWithIdAndProof = 4044381219 -ExecutionOutcomeWithProof = 4104315440 -ExecutionStatus = 1810006625 +ExecutionOutcome = 2925419955 +ExecutionOutcomeWithId = 1289816961 +ExecutionOutcomeWithIdAndProof = 3179626578 +ExecutionOutcomeWithProof = 2219338929 +ExecutionStatus = 3681865123 ExtCosts = 1172935704 FetchingStateStatus = 2204896805 FlatStateChanges = 2811133731 @@ -115,7 +115,8 @@ FlatStateDeltaMetadata = 3401366797 FlatStateValue = 83834662 FlatStorageCreationStatus = 3717607657 FlatStorageReadyStatus = 677315221 -FlatStorageStatus = 1026335026 +FlatStorageReshardingStatus = 4155800626 +FlatStorageStatus = 2745297627 FunctionCallAction = 2405840012 FunctionCallError = 3652274053 FunctionCallPermission = 1517509673 @@ -126,7 +127,7 @@ HostError = 3173968216 IgnoredVecU8 = 1855789801 IntegerOverflowError = 2542362165 InvalidAccessKeyError = 2954698659 -InvalidTxError = 1219344901 +InvalidTxError = 2090866399 KeyForFlatStateDelta = 2002998927 LatestKnown = 2945167085 LatestWitnessesInfo = 2488443612 @@ -183,7 +184,7 @@ RoutedMessageBody = 4241045537 RoutingTableUpdate = 2987752645 Secp256K1PublicKey = 4117078281 Secp256K1Signature = 3687154735 -ServerError = 3794571225 +ServerError = 2338793369 ShardChunk = 834871798 ShardChunkHeader = 4215449923 ShardChunkHeaderInner = 3760333502 @@ -195,6 +196,10 @@ ShardChunkHeaderV2 = 3706194757 ShardChunkHeaderV3 = 2763275079 ShardChunkV1 = 1814805625 ShardChunkV2 = 1857597167 +ShardLayout = 2672297879 +ShardLayoutV0 = 3139625127 +ShardLayoutV1 = 198917829 +ShardLayoutV2 = 1739189967 ShardProof = 2773021473 ShardStateSyncResponse = 2185281594 ShardStateSyncResponseHeaderV1 = 2708725662 @@ -209,6 +214,7 @@ SignedTransaction = 3898692301 SlashState = 3264273950 SlashedValidator = 2601657743 SnapshotHostInfo = 278564957 +SplittingParentStatus = 3614986382 StakeAction = 2002027105 StateChangeCause = 1569242014 StateHeaderKey = 1385533899 @@ -222,7 +228,7 @@ StateStoredReceipt = 3853311293 StateStoredReceiptMetadata = 2895538362 StateStoredReceiptV0 = 4029868827 StateSyncDumpProgress = 2225888613 -StorageError = 1838871872 +StorageError = 2572184728 StoredChunkStateTransitionData = 516372819 String = 2587724713 SyncSnapshotHosts = 4230057383 @@ -236,7 +242,7 @@ TrieKey = 768968236 TrieQueueIndices = 2601394796 TrieRefcountAddition = 2117109883 TrieRefcountSubtraction = 2150368599 -TxExecutionError = 706862037 +TxExecutionError = 214948980 VMKind = 2110212047 ValidatorKickoutReason = 2362237969 ValidatorKickoutView = 2660746751