From ba0b23768e21eed3428024d19e763a37a2e055dd Mon Sep 17 00:00:00 2001 From: Andrea Date: Mon, 11 Nov 2024 20:30:55 +0100 Subject: [PATCH] feat(resharding): delay split shard of flat store until resharding block is final (#12415) Contents of this PR: - New features - Actual splitting of flat storage is delayed until the target resharding block becomes final - Scheduled resharding event can be overridden. This makes resharding work in many chain fork scenarios (not all of them though) - Added `FlatStorageReshardingTaskSchedulingStatus` to express the current state of scheduled tasks waiting for resharding block finality - Changes - Shard catchup doesn't wait anymore for resharding block finality. It is now a consequence of the fact that the shard split happens on a final block. - `FlatStorageReshardingTaskStatus` renamed into `FlatStorageReshardingTaskResult` for clarity - `ReshardingActor` now takes care of re-trying "postponed" tasks. Part of #12174 --- chain/chain/src/flat_storage_resharder.rs | 733 ++++++++++++------ .../chain/src/resharding/resharding_actor.rs | 94 ++- core/store/src/flat/mod.rs | 2 +- core/store/src/flat/types.rs | 4 +- .../res/protocol_schema.toml | 6 +- 5 files changed, 567 insertions(+), 272 deletions(-) diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 4a555699028..417af0beccb 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -32,7 +32,7 @@ use near_store::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; use near_store::adapter::StoreAdapter; use near_store::flat::{ BlockInfo, FlatStateChanges, FlatStorageError, FlatStorageReadyStatus, - FlatStorageReshardingStatus, FlatStorageStatus, SplittingParentStatus, + FlatStorageReshardingStatus, FlatStorageStatus, ParentSplitParameters, }; use near_store::{ShardUId, StorageError}; use std::fmt::{Debug, Formatter}; @@ -131,8 +131,9 @@ impl FlatStorageResharder { FlatStorageReshardingStatus::SplittingParent(status) => { let parent_shard_uid = shard_uid; info!(target: "resharding", ?parent_shard_uid, ?status, "resuming flat storage shard split"); - self.check_no_resharding_in_progress()?; - // On resume flat storage status is already set. + self.check_new_event_is_allowed()?; + // On resume, flat storage status is already set correctly and read from DB. + // Thus, we don't need to care about cancelling other existing resharding events. // However, we don't know the current state of children shards, // so it's better to clean them. self.clean_children_shards(&status)?; @@ -167,13 +168,16 @@ impl FlatStorageResharder { .. } = split_params; info!(target: "resharding", ?split_params, "initiating flat storage shard split"); - self.check_no_resharding_in_progress()?; + + self.check_new_event_is_allowed()?; + // Cancel any scheduled, not yet started event. + self.cancel_scheduled_event(); // Change parent and children shards flat storage status. let store = self.runtime.store().flat_store(); let mut store_update = store.store_update(); let flat_head = retrieve_shard_flat_head(parent_shard, &store)?; - let status = SplittingParentStatus { + let split_params = ParentSplitParameters { left_child_shard, right_child_shard, shard_layout: shard_layout.clone(), @@ -183,7 +187,7 @@ impl FlatStorageResharder { store_update.set_flat_storage_status( parent_shard, FlatStorageStatus::Resharding(FlatStorageReshardingStatus::SplittingParent( - status.clone(), + split_params.clone(), )), ); store_update.set_flat_storage_status( @@ -196,19 +200,25 @@ impl FlatStorageResharder { ); store_update.commit()?; - self.schedule_split_shard(parent_shard, &status); + self.schedule_split_shard(parent_shard, &split_params); Ok(()) } - /// Returns an error if a resharding event is in progress. - fn check_no_resharding_in_progress(&self) -> Result<(), StorageError> { - // Do not allow multiple resharding events in parallel. - if self.resharding_event().is_some() { + /// Returns `Ok` if: + /// - no resharding event exists. + /// - a resharding event already exists, but it's not in progress yet. + /// + /// Returns `Err` if: + /// - a resharding event is in progress. + fn check_new_event_is_allowed(&self) -> Result<(), StorageError> { + let Some(current_event) = self.resharding_event() else { + return Ok(()); + }; + if current_event.has_started() { error!(target: "resharding", "trying to start a new flat storage resharding event while one is already in progress!"); - Err(StorageError::FlatStorageReshardingAlreadyInProgress) - } else { - Ok(()) + return Err(StorageError::FlatStorageReshardingAlreadyInProgress); } + Ok(()) } fn set_resharding_event(&self, event: FlatStorageReshardingEventStatus) { @@ -220,11 +230,23 @@ impl FlatStorageResharder { self.resharding_event.lock().unwrap().clone() } + fn set_resharding_event_execution_status(&self, new_status: TaskExecutionStatus) { + self.resharding_event + .lock() + .unwrap() + .as_mut() + .map(|event| event.set_execution_status(new_status)); + } + /// Schedules a task to split a shard. - fn schedule_split_shard(&self, parent_shard: ShardUId, status: &SplittingParentStatus) { - let event = FlatStorageReshardingEventStatus::SplitShard(parent_shard, status.clone()); + fn schedule_split_shard(&self, parent_shard: ShardUId, split_params: &ParentSplitParameters) { + let event = FlatStorageReshardingEventStatus::SplitShard( + parent_shard, + split_params.clone(), + TaskExecutionStatus::NotStarted, + ); self.set_resharding_event(event); - info!(target: "resharding", ?parent_shard, ?status,"scheduling flat storage shard split"); + info!(target: "resharding", ?parent_shard, ?split_params,"scheduling flat storage shard split"); let resharder = self.clone(); // Send a request to schedule the execution of `split_shard_task`, to do the bulk of the // splitting work. @@ -241,8 +263,8 @@ impl FlatStorageResharder { skip_all, fields(left_child_shard = ?status.left_child_shard, right_child_shard = ?status.right_child_shard) )] - fn clean_children_shards(&self, status: &SplittingParentStatus) -> Result<(), Error> { - let SplittingParentStatus { left_child_shard, right_child_shard, .. } = status; + fn clean_children_shards(&self, status: &ParentSplitParameters) -> Result<(), Error> { + let ParentSplitParameters { left_child_shard, right_child_shard, .. } = status; info!(target: "resharding", ?left_child_shard, ?right_child_shard, "cleaning up children shards flat storage's content"); let mut store_update = self.runtime.store().flat_store().store_update(); for child in [left_child_shard, right_child_shard] { @@ -253,23 +275,52 @@ impl FlatStorageResharder { Ok(()) } - /// Retrieves parent shard UIds and current resharding event status, only if a resharding event + /// Retrieves parent shard UIds and the split shard parameters, only if a resharding event /// is in progress and of type `Split`. - fn get_parent_shard_and_status(&self) -> Option<(ShardUId, SplittingParentStatus)> { + fn get_parent_shard_and_split_params(&self) -> Option<(ShardUId, ParentSplitParameters)> { let event = self.resharding_event.lock().unwrap(); match event.as_ref() { - Some(FlatStorageReshardingEventStatus::SplitShard(parent_shard, status)) => { - Some((*parent_shard, status.clone())) + Some(FlatStorageReshardingEventStatus::SplitShard(parent_shard, split_params, ..)) => { + Some((*parent_shard, split_params.clone())) } None => None, } } - /// Task to perform the actual split of a flat storage shard. This may be a long operation time-wise. + /// Task to perform the actual split of a flat storage shard. This may be a long operation + /// time-wise. /// - /// Conceptually it simply copies each key-value pair from the parent shard to the correct child. - pub fn split_shard_task(&self) -> FlatStorageReshardingTaskStatus { - info!(target: "resharding", "flat storage shard split task started"); + /// Conceptually it simply copies each key-value pair from the parent shard to the correct + /// child. This task may get cancelled or postponed. + pub fn split_shard_task( + &self, + chain_store: &ChainStore, + ) -> FlatStorageReshardingSchedulableTaskResult { + info!(target: "resharding", "flat storage shard split task execution"); + + // Make sure that the resharding block is final. + let resharding_hash = self + .resharding_event() + .expect("flat storage resharding event must exist!") + .resharding_hash(); + match self.compute_scheduled_task_status(&resharding_hash, chain_store) { + FlatStorageReshardingTaskSchedulingStatus::CanStart => { + info!(target: "resharding", "flat storage shard split task ready to perform bulk processing"); + } + FlatStorageReshardingTaskSchedulingStatus::Failed => { + // It's important to cancel the scheduled event in case of failure. + self.cancel_scheduled_event(); + error!(target: "resharding", "flat storage shard split task failed during scheduling!"); + // TODO(resharding): return failed only if scheduling of all resharding blocks have failed. + return FlatStorageReshardingSchedulableTaskResult::Failed; + } + FlatStorageReshardingTaskSchedulingStatus::Postponed => { + info!(target: "resharding", "flat storage shard split task has been postponed"); + return FlatStorageReshardingSchedulableTaskResult::Postponed; + } + }; + + // We know that the resharding block has become final so let's start the real work. let task_status = self.split_shard_task_impl(); self.split_shard_task_postprocessing(task_status); info!(target: "resharding", ?task_status, "flat storage shard split task finished"); @@ -279,9 +330,12 @@ impl FlatStorageResharder { /// Performs the bulk of [split_shard_task]. /// /// Returns `true` if the routine completed successfully. - fn split_shard_task_impl(&self) -> FlatStorageReshardingTaskStatus { + fn split_shard_task_impl(&self) -> FlatStorageReshardingSchedulableTaskResult { + self.set_resharding_event_execution_status(TaskExecutionStatus::Started); + + // Exit early if the task has already been cancelled. if self.controller.is_cancelled() { - return FlatStorageReshardingTaskStatus::Cancelled; + return FlatStorageReshardingSchedulableTaskResult::Cancelled; } // Determines after how many bytes worth of key-values the process stops to commit changes @@ -290,22 +344,22 @@ impl FlatStorageResharder { // Delay between every batch. let batch_delay = self.resharding_config.get().batch_delay.unsigned_abs(); - let (parent_shard, status) = self - .get_parent_shard_and_status() + let (parent_shard, split_params) = self + .get_parent_shard_and_split_params() .expect("flat storage resharding event must be Split!"); - info!(target: "resharding", ?parent_shard, ?status, ?batch_delay, ?batch_size, "flat storage shard split task: starting key-values copy"); + info!(target: "resharding", ?parent_shard, ?split_params, ?batch_delay, ?batch_size, "flat storage shard split task: starting key-values copy"); // Prepare the store object for commits and the iterator over parent's flat storage. let flat_store = self.runtime.store().flat_store(); let mut iter = match self.flat_storage_iterator( &flat_store, &parent_shard, - &status.resharding_hash, + &split_params.resharding_hash, ) { Ok(iter) => iter, Err(err) => { - error!(target: "resharding", ?parent_shard, block_hash=?status.resharding_hash, ?err, "failed to build flat storage iterator"); - return FlatStorageReshardingTaskStatus::Failed; + error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_hash, ?err, "failed to build flat storage iterator"); + return FlatStorageReshardingSchedulableTaskResult::Failed; } }; @@ -328,16 +382,19 @@ impl FlatStorageResharder { Some(FlatStorageAndDeltaIterItem::CommitPoint) => break, Some(FlatStorageAndDeltaIterItem::Entry(Ok((key, value)))) => { processed_size += key.len() + value.as_ref().map_or(0, |v| v.size()); - if let Err(err) = - shard_split_handle_key_value(key, value, &mut store_update, &status) - { + if let Err(err) = shard_split_handle_key_value( + key, + value, + &mut store_update, + &split_params, + ) { error!(target: "resharding", ?err, "failed to handle flat storage key"); - return FlatStorageReshardingTaskStatus::Failed; + return FlatStorageReshardingSchedulableTaskResult::Failed; } } Some(FlatStorageAndDeltaIterItem::Entry(Err(err))) => { error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); - return FlatStorageReshardingTaskStatus::Failed; + return FlatStorageReshardingSchedulableTaskResult::Failed; } None => { iter_exhausted = true; @@ -348,17 +405,17 @@ impl FlatStorageResharder { // Make a pause to commit and check if the routine should stop. if let Err(err) = store_update.commit() { error!(target: "resharding", ?err, "failed to commit store update"); - return FlatStorageReshardingTaskStatus::Failed; + return FlatStorageReshardingSchedulableTaskResult::Failed; } num_batches_done += 1; // If `iter`` is exhausted we can exit after the store commit. if iter_exhausted { - return FlatStorageReshardingTaskStatus::Successful { num_batches_done }; + return FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done }; } if self.controller.is_cancelled() { - return FlatStorageReshardingTaskStatus::Cancelled; + return FlatStorageReshardingSchedulableTaskResult::Cancelled; } // Sleep between batches in order to throttle resharding and leave some resource for the @@ -375,23 +432,26 @@ impl FlatStorageResharder { "FlatStorageResharder::split_shard_task_postprocessing", skip_all )] - fn split_shard_task_postprocessing(&self, task_status: FlatStorageReshardingTaskStatus) { - let (parent_shard, split_status) = self - .get_parent_shard_and_status() + fn split_shard_task_postprocessing( + &self, + task_status: FlatStorageReshardingSchedulableTaskResult, + ) { + let (parent_shard, split_params) = self + .get_parent_shard_and_split_params() .expect("flat storage resharding event must be Split!"); - let SplittingParentStatus { + let ParentSplitParameters { left_child_shard, right_child_shard, flat_head, resharding_hash, .. - } = split_status; + } = split_params; let flat_store = self.runtime.store().flat_store(); - info!(target: "resharding", ?parent_shard, ?task_status, ?split_status, "flat storage shard split task: post-processing"); + info!(target: "resharding", ?parent_shard, ?task_status, ?split_params, "flat storage shard split task: post-processing"); let mut store_update = flat_store.store_update(); match task_status { - FlatStorageReshardingTaskStatus::Successful { .. } => { + FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { // Split shard completed successfully. // Parent flat storage can be deleted from the FlatStoreManager. // If FlatStoreManager has no reference to the shard, delete it manually. @@ -422,8 +482,8 @@ impl FlatStorageResharder { ); } } - FlatStorageReshardingTaskStatus::Failed - | FlatStorageReshardingTaskStatus::Cancelled => { + FlatStorageReshardingSchedulableTaskResult::Failed + | FlatStorageReshardingSchedulableTaskResult::Cancelled => { // We got an error or a cancellation request. // Reset parent. store_update.set_flat_storage_status( @@ -435,13 +495,12 @@ impl FlatStorageResharder { store_update.remove_flat_storage(child_shard); } } - FlatStorageReshardingTaskStatus::Postponed => { + FlatStorageReshardingSchedulableTaskResult::Postponed => { panic!("can't finalize processing of a postponed split task!"); } } store_update.commit().unwrap(); - // Terminate the resharding event. - *self.resharding_event.lock().unwrap() = None; + self.remove_resharding_event(); } /// Returns an iterator over a shard's flat storage at the given block hash. This @@ -471,7 +530,7 @@ impl FlatStorageResharder { // Must reverse the result because we want ascending block heights. let mut blocks_to_head = flat_storage.get_blocks_to_head(block_hash).map_err(|err| { StorageError::StorageInconsistentState(format!( - "failed to find path of blocks to flat storage head ({err})" + "failed to find path from block {block_hash} to flat storage head ({err})" )) })?; blocks_to_head.reverse(); @@ -500,57 +559,25 @@ impl FlatStorageResharder { } /// Task to perform catchup and creation of a flat storage shard spawned from a previous - /// resharding operation. This may be a long operation time-wise. + /// resharding operation. May be a long operation time-wise. This task can't be cancelled + /// nor postponed. pub fn shard_catchup_task( &self, shard_uid: ShardUId, flat_head_block_hash: CryptoHash, chain_store: &ChainStore, - ) -> FlatStorageReshardingTaskStatus { + ) -> FlatStorageReshardingTaskResult { info!(target: "resharding", ?shard_uid, ?flat_head_block_hash, "flat storage shard catchup task started"); - // If flat storage current status is beyond the chain final block, try again later. - let chain_final_head = chain_store.final_head().unwrap(); - let height_result = chain_store.get_block_height(&flat_head_block_hash); - let Ok(height) = height_result else { - error!(target: "resharding", ?shard_uid, ?flat_head_block_hash, err = ?height_result.unwrap_err(), "can't find flat head height!"); - return FlatStorageReshardingTaskStatus::Failed; - }; - if height > chain_final_head.height { - info!(target = "resharding", ?height, chain_final_height = ?chain_final_head.height, "flat head beyond chain final tip: postponing flat storage shard catchup task"); - // Cancel this task and send a request to re-schedule the execution of - // `shard_catchup_task` some time later. - self.sender.flat_storage_shard_catchup_sender.send(FlatStorageShardCatchupRequest { - resharder: self.clone(), - shard_uid, - flat_head_block_hash, - }); - return FlatStorageReshardingTaskStatus::Cancelled; - } - - // If the flat head is not in the canonical chain this task has failed. - match chain_store.get_block_hash_by_height(height) { - Ok(hash) => { - if hash != flat_head_block_hash { - error!(target: "resharding", ?shard_uid, ?height, ?flat_head_block_hash, ?hash, "flat head not in canonical chain!"); - return FlatStorageReshardingTaskStatus::Failed; - } - } - Err(err) => { - error!(target: "resharding", ?shard_uid, ?err, ?height, "can't find block hash by height"); - return FlatStorageReshardingTaskStatus::Failed; - } - } - // Apply deltas and then create the flat storage. let apply_result = self.shard_catchup_apply_deltas(shard_uid, flat_head_block_hash, chain_store); let Ok((num_batches_done, flat_head)) = apply_result else { error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!"); - return FlatStorageReshardingTaskStatus::Failed; + return FlatStorageReshardingTaskResult::Failed; }; match self.shard_catchup_finalize_storage(shard_uid, &flat_head) { Ok(_) => { - let task_status = FlatStorageReshardingTaskStatus::Successful { num_batches_done }; + let task_status = FlatStorageReshardingTaskResult::Successful { num_batches_done }; info!(target: "resharding", ?shard_uid, ?task_status, "flat storage shard catchup task finished"); // At this point we can trigger the reload of memtries. self.sender.memtrie_reload_sender.send(MemtrieReloadRequest { shard_uid }); @@ -558,7 +585,7 @@ impl FlatStorageResharder { } Err(err) => { error!(target: "resharding", ?shard_uid, ?err, "flat storage shard catchup finalize failed!"); - FlatStorageReshardingTaskStatus::Failed + FlatStorageReshardingTaskResult::Failed } } } @@ -674,6 +701,82 @@ impl FlatStorageResharder { info!(target: "resharding", ?shard_uid, ?flat_head, "flat storage creation done"); Ok(()) } + + /// Cancels the current event, if it exists and it hasn't started yet. + fn cancel_scheduled_event(&self) { + let Some(current_event) = self.resharding_event() else { + return; + }; + info!(target: "resharding", ?current_event, "cancelling current scheduled resharding event"); + debug_assert!(!current_event.has_started()); + // Clean up the database state. + match current_event { + FlatStorageReshardingEventStatus::SplitShard(parent_shard, split_status, _) => { + let flat_store = self.runtime.store().flat_store(); + let mut store_update = flat_store.store_update(); + // Parent go back to Ready state. + store_update.set_flat_storage_status( + parent_shard, + FlatStorageStatus::Ready(FlatStorageReadyStatus { + flat_head: split_status.flat_head, + }), + ); + // Remove children shards status. + for child_shard in [split_status.left_child_shard, split_status.right_child_shard] { + store_update.remove_status(child_shard); + } + store_update.commit().unwrap(); + } + } + // Clean up the resharding event. + self.remove_resharding_event(); + } + + /// Computes the scheduling status of a task waiting to be started. The task will be ready to + /// start only if its resharding block has become final. Scheduling will fail if the resharding + /// block is ends up in a discarded fork. + fn compute_scheduled_task_status( + &self, + resharding_hash: &CryptoHash, + chain_store: &ChainStore, + ) -> FlatStorageReshardingTaskSchedulingStatus { + // Retrieve the height of the resharding block. + let chain_final_head = chain_store.final_head().unwrap(); + let resharding_height_result = chain_store.get_block_height(resharding_hash); + let Ok(resharding_height) = resharding_height_result else { + error!(target: "resharding", ?resharding_hash, err = ?resharding_height_result.unwrap_err(), "can't get resharding block height!"); + return FlatStorageReshardingTaskSchedulingStatus::Failed; + }; + // If the resharding block is beyond the chain final block, try again later. + let chain_final_height = chain_final_head.height; + if resharding_height > chain_final_height { + info!( + target = "resharding", + ?resharding_height, + ?chain_final_height, + "resharding block height is higher than final block height: postponing task" + ); + return FlatStorageReshardingTaskSchedulingStatus::Postponed; + } + // If the resharding block is not in the canonical chain this task has failed. + match chain_store.get_block_hash_by_height(resharding_height) { + Ok(hash) if hash == *resharding_hash => { + FlatStorageReshardingTaskSchedulingStatus::CanStart + } + Ok(hash) => { + error!(target: "resharding", ?resharding_height, ?resharding_hash, ?hash, "resharding block not in canonical chain!"); + FlatStorageReshardingTaskSchedulingStatus::Failed + } + Err(err) => { + error!(target: "resharding", ?resharding_height, ?resharding_hash, ?err, "can't find resharding block hash by height!"); + FlatStorageReshardingTaskSchedulingStatus::Failed + } + } + } + + fn remove_resharding_event(&self) { + *self.resharding_event.lock().unwrap() = None; + } } /// Enum used to wrap the `Item` of iterators over flat storage contents or flat storage deltas. Its @@ -715,7 +818,7 @@ fn shard_split_handle_key_value( key: Vec, value: Option, store_update: &mut FlatStoreUpdateAdapter, - status: &SplittingParentStatus, + split_params: &ParentSplitParameters, ) -> Result<(), Error> { if key.is_empty() { panic!("flat storage key is empty!") @@ -723,32 +826,36 @@ fn shard_split_handle_key_value( let key_column_prefix = key[0]; match key_column_prefix { - col::ACCOUNT => { - copy_kv_to_child(&status, key, value, store_update, parse_account_id_from_account_key)? - } + col::ACCOUNT => copy_kv_to_child( + &split_params, + key, + value, + store_update, + parse_account_id_from_account_key, + )?, col::CONTRACT_DATA => copy_kv_to_child( - &status, + &split_params, key, value, store_update, parse_account_id_from_contract_data_key, )?, col::CONTRACT_CODE => copy_kv_to_child( - &status, + &split_params, key, value, store_update, parse_account_id_from_contract_code_key, )?, col::ACCESS_KEY => copy_kv_to_child( - &status, + &split_params, key, value, store_update, parse_account_id_from_access_key_key, )?, col::RECEIVED_DATA => copy_kv_to_child( - &status, + &split_params, key, value, store_update, @@ -758,7 +865,7 @@ fn shard_split_handle_key_value( | col::PENDING_DATA_COUNT | col::POSTPONED_RECEIPT | col::PROMISE_YIELD_RECEIPT => { - copy_kv_to_child(&status, key, value, store_update, |raw_key: &[u8]| { + copy_kv_to_child(&split_params, key, value, store_update, |raw_key: &[u8]| { parse_account_id_from_trie_key_with_separator( key_column_prefix, raw_key, @@ -768,24 +875,28 @@ fn shard_split_handle_key_value( } col::DELAYED_RECEIPT_OR_INDICES | col::PROMISE_YIELD_INDICES - | col::PROMISE_YIELD_TIMEOUT => copy_kv_to_all_children(&status, key, value, store_update), + | col::PROMISE_YIELD_TIMEOUT + | col::BANDWIDTH_SCHEDULER_STATE => { + copy_kv_to_all_children(&split_params, key, value, store_update) + } col::BUFFERED_RECEIPT_INDICES | col::BUFFERED_RECEIPT => { - copy_kv_to_left_child(&status, key, value, store_update) + copy_kv_to_left_child(&split_params, key, value, store_update) } - _ => unreachable!(), + _ => unreachable!("key: {:?} should not appear in flat store!", key), } Ok(()) } /// Copies a key-value pair to the correct child shard by matching the account-id to the provided shard layout. fn copy_kv_to_child( - status: &SplittingParentStatus, + split_params: &ParentSplitParameters, key: Vec, value: Option, store_update: &mut FlatStoreUpdateAdapter, account_id_parser: impl FnOnce(&[u8]) -> Result, ) -> Result<(), Error> { - let SplittingParentStatus { left_child_shard, right_child_shard, shard_layout, .. } = &status; + let ParentSplitParameters { left_child_shard, right_child_shard, shard_layout, .. } = + &split_params; // Derive the shard uid for this account in the new shard layout. let account_id = account_id_parser(&key)?; let new_shard_id = account_id_to_shard_id(&account_id, shard_layout); @@ -804,42 +915,93 @@ fn copy_kv_to_child( /// Copies a key-value pair to both children. fn copy_kv_to_all_children( - status: &SplittingParentStatus, + split_params: &ParentSplitParameters, key: Vec, value: Option, store_update: &mut FlatStoreUpdateAdapter, ) { - store_update.set(status.left_child_shard, key.clone(), value.clone()); - store_update.set(status.right_child_shard, key, value); + store_update.set(split_params.left_child_shard, key.clone(), value.clone()); + store_update.set(split_params.right_child_shard, key, value); } /// Copies a key-value pair to the child on the left of the account boundary (also called 'first child'). fn copy_kv_to_left_child( - status: &SplittingParentStatus, + split_params: &ParentSplitParameters, key: Vec, value: Option, store_update: &mut FlatStoreUpdateAdapter, ) { - store_update.set(status.left_child_shard, key, value); + store_update.set(split_params.left_child_shard, key, value); } /// Struct to describe, perform and track progress of a flat storage resharding. #[derive(Clone, Debug)] pub enum FlatStorageReshardingEventStatus { - /// Split a shard. - /// Includes the parent shard uid and the operation' status. - SplitShard(ShardUId, SplittingParentStatus), + /// Split a shard. Includes the parent shard uid, the detailed information about the split + /// operation (`ParentSplitParameters`) and the execution status of the task that is performing + /// the split. + SplitShard(ShardUId, ParentSplitParameters, TaskExecutionStatus), } -/// Status of a flat storage resharding task. +impl FlatStorageReshardingEventStatus { + /// Returns `true` if the resharding event has started processing. + fn has_started(&self) -> bool { + match self { + FlatStorageReshardingEventStatus::SplitShard(_, _, execution_status) => { + matches!(execution_status, TaskExecutionStatus::Started) + } + } + } + + fn set_execution_status(&mut self, new_status: TaskExecutionStatus) { + match self { + FlatStorageReshardingEventStatus::SplitShard(_, _, execution_status) => { + *execution_status = new_status + } + } + } + + fn resharding_hash(&self) -> CryptoHash { + match self { + FlatStorageReshardingEventStatus::SplitShard(_, split_status, _) => { + split_status.resharding_hash + } + } + } +} + +/// All different states of task execution for [FlatStorageReshardingEventStatus]. +#[derive(Clone, Debug, Copy, Eq, PartialEq)] +pub enum TaskExecutionStatus { + Started, + NotStarted, +} + +/// Result of a simple flat storage resharding task. #[derive(Clone, Debug, Copy, Eq, PartialEq)] -pub enum FlatStorageReshardingTaskStatus { +pub enum FlatStorageReshardingTaskResult { + Successful { num_batches_done: usize }, + Failed, +} + +/// Result of a schedulable flat storage resharding task. Extends [FlatStorageReshardingTaskResult] +/// with the option to cancel or postpone the task. +#[derive(Clone, Debug, Copy, Eq, PartialEq)] +pub enum FlatStorageReshardingSchedulableTaskResult { Successful { num_batches_done: usize }, Failed, Cancelled, Postponed, } +/// Status of scheduling of a flat storage resharding tasks. +/// It is useful to know whether or not a task can start or has to be delayed. +enum FlatStorageReshardingTaskSchedulingStatus { + CanStart, + Failed, + Postponed, +} + /// Helps control the flat storage resharder background operations. This struct wraps /// [ReshardingHandle] and gives better meaning request to stop any processing when applied to flat /// storage. In flat storage resharding there's a slight difference between interrupt and cancel. @@ -874,7 +1036,10 @@ mod tests { use near_async::time::Clock; use near_chain_configs::{Genesis, MutableConfigValue}; - use near_epoch_manager::{shard_tracker::ShardTracker, EpochManager}; + use near_epoch_manager::{ + shard_tracker::{ShardTracker, TrackedConfig}, + EpochManager, + }; use near_o11y::testonly::init_test_logger; use near_primitives::{ hash::CryptoHash, @@ -899,6 +1064,7 @@ mod tests { }; use super::*; + use assert_matches::assert_matches; use more_asserts::assert_gt; use near_async::messaging::{CanSend, IntoMultiSender}; use near_crypto::{KeyType, PublicKey}; @@ -931,7 +1097,7 @@ mod tests { impl CanSend for SimpleSender { fn send(&self, msg: FlatStorageSplitShardRequest) { - msg.resharder.split_shard_task(); + msg.resharder.split_shard_task(&self.chain_store.lock().unwrap()); } } @@ -970,12 +1136,12 @@ mod tests { } impl DelayedSender { - fn call_split_shard_task(&self) -> FlatStorageReshardingTaskStatus { + fn call_split_shard_task(&self) -> FlatStorageReshardingSchedulableTaskResult { let request = self.split_shard_request.lock().unwrap(); - request.as_ref().unwrap().resharder.split_shard_task() + request.as_ref().unwrap().resharder.split_shard_task(&self.chain_store.lock().unwrap()) } - fn call_shard_catchup_tasks(&self) -> Vec { + fn call_shard_catchup_tasks(&self) -> Vec { self.shard_catchup_requests .lock() .unwrap() @@ -1058,7 +1224,7 @@ mod tests { let store = create_test_store(); initialize_genesis_state(store.clone(), &genesis, Some(tempdir.path())); let epoch_manager = EpochManager::new_arc_handle(store.clone(), &genesis.config, None); - let shard_tracker = ShardTracker::new_empty(epoch_manager.clone()); + let shard_tracker = ShardTracker::new(TrackedConfig::AllShards, epoch_manager.clone()); let runtime = NightshadeRuntime::test( tempdir.path(), store.clone(), @@ -1105,7 +1271,45 @@ mod tests { .unwrap() } - /// Verify that another resharding can't be triggered if one is ongoing. + enum PreviousBlockHeight { + ChainHead, + Fixed(u64), + } + + enum NextBlockHeight { + ChainHeadPlusOne, + Fixed(u64), + } + + /// Utility to add blocks on top of a chain. + fn add_blocks_to_chain( + chain: &mut Chain, + num_blocks: u64, + on_top_of_height: PreviousBlockHeight, + next_height: NextBlockHeight, + ) { + assert_gt!(num_blocks, 0); + let signer = Arc::new(create_test_signer("aa")); + let mut prev_block_height = match on_top_of_height { + PreviousBlockHeight::ChainHead => chain.head().unwrap().height, + PreviousBlockHeight::Fixed(height) => height, + }; + let next_block_height = match next_height { + NextBlockHeight::ChainHeadPlusOne => chain.head().unwrap().height + 1, + NextBlockHeight::Fixed(height) => height, + }; + for height in next_block_height..next_block_height + num_blocks { + let prev_block = chain.get_block_by_height(prev_block_height).unwrap(); + let block = TestBlockBuilder::new(Clock::real(), &prev_block, signer.clone()) + .height(height) + .build(); + chain.process_block_test(&None, block).unwrap(); + prev_block_height = height; + } + assert_eq!(chain.head().unwrap().height, next_block_height + num_blocks - 1); + } + + /// Verify that a new resharding event can't be triggered if another one has already started. #[test] fn concurrent_reshardings_are_disallowed() { init_test_logger(); @@ -1119,8 +1323,9 @@ mod tests { .start_resharding(resharding_event_type.clone(), &new_shard_layout) .is_ok()); - // Immediately cancel the resharding. + // Immediately cancel the resharding and call the resharding task. controller.handle.stop(); + resharder.split_shard_task_impl(); assert!(resharder.resharding_event().is_some()); assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_err()); @@ -1140,7 +1345,7 @@ mod tests { let resharding_event = resharder.resharding_event(); match resharding_event.unwrap() { - FlatStorageReshardingEventStatus::SplitShard(parent, status) => { + FlatStorageReshardingEventStatus::SplitShard(parent, status, exec_status) => { assert_eq!( flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Resharding( @@ -1155,6 +1360,7 @@ mod tests { flat_store.get_flat_storage_status(status.right_child_shard), Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild)) ); + assert_eq!(exec_status, TaskExecutionStatus::NotStarted); } } } @@ -1186,7 +1392,7 @@ mod tests { // Set parent state to ShardSplitting, manually, to simulate a forcibly cancelled resharding attempt. let resharding_status = - FlatStorageReshardingStatus::SplittingParent(SplittingParentStatus { + FlatStorageReshardingStatus::SplittingParent(ParentSplitParameters { // Values don't matter. left_child_shard, right_child_shard, @@ -1229,18 +1435,50 @@ mod tests { #[test] fn simple_split_shard() { init_test_logger(); - let (chain, resharder, _) = - create_chain_resharder_sender::(simple_shard_layout()); + let (mut chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); + let left_child = ShardUId { version: 3, shard_id: 2 }; + let right_child = ShardUId { version: 3, shard_id: 3 }; + let flat_store = resharder.runtime.store().flat_store(); + + // Add two blocks on top of genesis. This will make the resharding block (height 0) final. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); // Perform resharding. assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); + + // Check final status of parent flat storage. + let parent = ShardUId { version: 3, shard_id: 1 }; + assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty)); + assert_eq!(flat_store.iter(parent).count(), 0); + assert!(resharder + .runtime + .get_flat_storage_manager() + .get_flat_storage_for_shard(parent) + .is_none()); + + // Check intermediate status of children flat storages. + for child in [left_child, right_child] { + assert_eq!( + flat_store.get_flat_storage_status(child), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + chain.final_head().unwrap().last_block_hash + ))) + ); + } + + // Perform children catchup. + sender.call_shard_catchup_tasks(); // Check flat storages of children contain the correct accounts and access keys. - let left_child = ShardUId { version: 3, shard_id: 2 }; - let right_child = ShardUId { version: 3, shard_id: 3 }; - let flat_store = resharder.runtime.store().flat_store(); let account_mm_key = TrieKey::Account { account_id: account!("mm") }; let account_vv_key = TrieKey::Account { account_id: account!("vv") }; assert!(flat_store @@ -1264,26 +1502,19 @@ mod tests { .get(right_child, &account_vv_access_key.to_vec()) .is_ok_and(|val| val.is_some())); - // Check final status of parent flat storage. - let parent = ShardUId { version: 3, shard_id: 1 }; - assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty)); - assert_eq!(flat_store.iter(parent).count(), 0); - assert!(resharder - .runtime - .get_flat_storage_manager() - .get_flat_storage_for_shard(parent) - .is_none()); - // Check final status of children flat storages. - let last_hash = chain.head().unwrap().last_block_hash; - assert_eq!( - flat_store.get_flat_storage_status(left_child), - Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(last_hash))) - ); - assert_eq!( - flat_store.get_flat_storage_status(left_child), - Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(last_hash))) - ); + for child in [left_child, right_child] { + assert_eq!( + flat_store.get_flat_storage_status(child), + Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { + flat_head: BlockInfo { + hash: chain.final_head().unwrap().last_block_hash, + height: chain.final_head().unwrap().height, + prev_hash: chain.final_head().unwrap().prev_block_hash + } + })) + ); + } } /// Split shard task should run in batches. @@ -1304,7 +1535,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that more than one batch has been processed. - let FlatStorageReshardingTaskStatus::Successful { num_batches_done } = + let FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done } = sender.call_split_shard_task() else { assert!(false); @@ -1323,8 +1554,9 @@ mod tests { // Perform resharding. assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - let (parent_shard, status) = resharder.get_parent_shard_and_status().unwrap(); - let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = status; + let (parent_shard, split_params) = resharder.get_parent_shard_and_split_params().unwrap(); + let ParentSplitParameters { left_child_shard, right_child_shard, flat_head, .. } = + split_params; // Cancel the task before it starts. resharder.controller.handle.stop(); @@ -1376,8 +1608,12 @@ mod tests { let new_shard_layout = shard_layout_after_split(); // In order to have flat state deltas we must bring the chain forward by adding blocks. - add_blocks_to_chain(&mut chain, 2); - assert_eq!(chain.head().unwrap().height, 2); + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1385,6 +1621,15 @@ mod tests { } = match resharding_event_type.clone() { ReshardingEventType::SplitShard(params) => params, }; + + // Bring chain forward in order to make the resharding block (height 2) final. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); + let manager = chain.runtime_adapter.get_flat_storage_manager(); // Manually add deltas on top of parent's flat storage. @@ -1513,7 +1758,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingTaskStatus::Successful { num_batches_done: 3 } + FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } ); // Validate integrity of children shards. @@ -1565,8 +1810,8 @@ mod tests { #[test] fn split_shard_handle_account_id_keys() { init_test_logger(); - let (chain, resharder, _) = - create_chain_resharder_sender::(simple_shard_layout()); + let (chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1634,6 +1879,7 @@ mod tests { // Do resharding. assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); // Check each child has the correct keys assigned to itself. for key in &account_mm_keys { @@ -1650,8 +1896,8 @@ mod tests { #[test] fn split_shard_handle_delayed_receipts() { init_test_logger(); - let (chain, resharder, _) = - create_chain_resharder_sender::(simple_shard_layout()); + let (chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1680,6 +1926,7 @@ mod tests { // Do resharding. assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); // Check that flat storages of both children contain the delayed receipt. for child_shard in [left_child_shard, right_child_shard] { @@ -1698,8 +1945,8 @@ mod tests { #[test] fn split_shard_handle_promise_yield() { init_test_logger(); - let (chain, resharder, _) = - create_chain_resharder_sender::(simple_shard_layout()); + let (chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1754,6 +2001,7 @@ mod tests { // Do resharding. assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); // Check that flat storages of both children contain the promise yield timeout and indices. for child_shard in [left_child_shard, right_child_shard] { @@ -1783,8 +2031,8 @@ mod tests { #[test] fn split_shard_handle_buffered_receipts() { init_test_logger(); - let (chain, resharder, _) = - create_chain_resharder_sender::(simple_shard_layout()); + let (chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1818,6 +2066,7 @@ mod tests { // Do resharding. assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); // Check that only the first child contain the buffered receipt. assert_eq!( @@ -1916,8 +2165,8 @@ mod tests { assert_eq!( sender.call_shard_catchup_tasks(), vec![ - FlatStorageReshardingTaskStatus::Successful { num_batches_done: 1 }, - FlatStorageReshardingTaskStatus::Successful { num_batches_done: 1 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 1 }, + FlatStorageReshardingTaskResult::Successful { num_batches_done: 1 } ] ); @@ -2032,8 +2281,6 @@ mod tests { /// The split of a parent shard shouldn't happen until the resharding block has become final. #[test] - // TODO(resharding: remove the ignore! - #[ignore] fn shard_split_should_wait_final_block() { init_test_logger(); let (mut chain, resharder, sender) = @@ -2041,10 +2288,13 @@ mod tests { let new_shard_layout = shard_layout_after_split(); let flat_store = resharder.runtime.store().flat_store(); - // Add two blocks to the chain. - add_blocks_to_chain(&mut chain, 2); - assert_eq!(chain.head().unwrap().height, 2); - assert_eq!(chain.final_head().unwrap().height, 0); + // Add two blocks on top of genesis. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); // Trigger resharding at block 2 and it shouldn't split the parent shard. let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); @@ -2052,42 +2302,34 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskStatus::Postponed); + assert_eq!( + sender.call_split_shard_task(), + FlatStorageReshardingSchedulableTaskResult::Postponed + ); assert_gt!(flat_store.iter(parent_shard).count(), 0); - // Move final head to the resharding block (2) by adding more blocks. - add_blocks_to_chain(&mut chain, 2); - assert_eq!(chain.final_head().unwrap().height, 2); + // Move the chain final head to the resharding block height (2). + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); // Trigger resharding again and now it should split the parent shard. assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingTaskStatus::Successful { num_batches_done: 1 } + FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); } - /// Utility to add blocks on top of a chain. - fn add_blocks_to_chain(chain: &mut Chain, num_blocks: u64) { - let signer = Arc::new(create_test_signer("aa")); - let next_block_height = chain.head().unwrap().height + 1; - for height in next_block_height..next_block_height + num_blocks { - let prev_block = chain.get_block_by_height(height - 1).unwrap(); - let block = TestBlockBuilder::new(Clock::real(), &prev_block, signer.clone()) - .height(height) - .build(); - chain.process_block_test(&None, block).unwrap(); - } - } - /// Test to verify that a resharding event not yet started can be replaced by a newer resharding /// event on a different resharding hash. This property is useful to have in the presence of /// chain forks. For instance, the chain may wants to split a shard at some block B; there's a /// chance B never becomes final and instead a new split is triggered at block B'. The latter /// shouldn't be blocked by the presence of an earlier resharding event. #[test] - // TODO(resharding: remove the ignore! - #[ignore] fn resharding_event_not_started_can_be_replaced() { init_test_logger(); let (mut chain, resharder, sender) = @@ -2095,10 +2337,13 @@ mod tests { let new_shard_layout = shard_layout_after_split(); let flat_store = resharder.runtime.store().flat_store(); - // Add two blocks to the chain. - add_blocks_to_chain(&mut chain, 2); - assert_eq!(chain.head().unwrap().height, 2); - assert_eq!(chain.final_head().unwrap().height, 0); + // Add two blocks on top of genesis. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); // Trigger resharding at block 2. Parent shard shouldn't get split yet. let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); @@ -2106,21 +2351,19 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskStatus::Postponed); + assert_eq!( + sender.call_split_shard_task(), + FlatStorageReshardingSchedulableTaskResult::Postponed + ); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two blocks on top of the first block (simulate a fork). - let signer = Arc::new(create_test_signer("aa")); - let next_block_height = 2; - for height in next_block_height..next_block_height + 2 { - let prev_block = chain.get_block_by_height(height - 1).unwrap(); - let block = TestBlockBuilder::new(Clock::real(), &prev_block, signer.clone()) - .height(height) - .build(); - chain.process_block_test(&None, block).unwrap(); - } - assert_eq!(chain.head().unwrap().height, 3); - assert_eq!(chain.final_head().unwrap().height, 1); + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::Fixed(1), + NextBlockHeight::Fixed(3), + ); // Get the new resharding event and re-trigger the shard split. let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); @@ -2128,19 +2371,75 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskStatus::Postponed); + assert_eq!( + sender.call_split_shard_task(), + FlatStorageReshardingSchedulableTaskResult::Postponed + ); assert_gt!(flat_store.iter(parent_shard).count(), 0); - // Add two additional blocks to make the resharding block final. - add_blocks_to_chain(&mut chain, 2); - assert_eq!(chain.head().unwrap().height, 5); - assert_eq!(chain.final_head().unwrap().height, 3); + // Add two additional blocks on the fork to make the resharding block (height 1) final. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::Fixed(4), + NextBlockHeight::ChainHeadPlusOne, + ); // Now the second resharding event should take place. - assert_eq!( + assert_matches!( sender.call_split_shard_task(), - FlatStorageReshardingTaskStatus::Successful { num_batches_done: 1 } + FlatStorageReshardingSchedulableTaskResult::Successful { .. } ); + assert_eq!(flat_store.iter(parent_shard).count(), 0); } + + /// In this test we make sure that after a task whose scheduling has failed the cleanup logic is + /// executed correctly. + #[test] + fn scheduled_task_failure_is_handled_correctly() { + init_test_logger(); + let (mut chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + + // Add two blocks on top of genesis. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); + + // Trigger resharding at block 2. + let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + let (parent_shard, split_params) = resharder.get_parent_shard_and_split_params().unwrap(); + let ParentSplitParameters { flat_head, .. } = split_params; + assert_eq!( + sender.call_split_shard_task(), + FlatStorageReshardingSchedulableTaskResult::Postponed + ); + + // Fork the chain before the resharding block and make it final, but don't update the + // resharding block hash. + add_blocks_to_chain( + &mut chain, + 3, + PreviousBlockHeight::Fixed(1), + NextBlockHeight::Fixed(3), + ); + + // Scheduling of the shard split should fail. + assert_eq!( + sender.call_split_shard_task(), + FlatStorageReshardingSchedulableTaskResult::Failed + ); + assert!(resharder.resharding_event().is_none()); + let flat_store = resharder.runtime.store().flat_store(); + assert_eq!( + flat_store.get_flat_storage_status(parent_shard), + Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) + ); + } } diff --git a/chain/chain/src/resharding/resharding_actor.rs b/chain/chain/src/resharding/resharding_actor.rs index 8e45bdc52f7..bc4a371dfe7 100644 --- a/chain/chain/src/resharding/resharding_actor.rs +++ b/chain/chain/src/resharding/resharding_actor.rs @@ -1,13 +1,15 @@ use super::types::{ FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest, }; -use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageReshardingTaskStatus}; +use crate::flat_storage_resharder::{ + FlatStorageResharder, FlatStorageReshardingSchedulableTaskResult, + FlatStorageReshardingTaskResult, +}; use crate::ChainStore; use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; use near_async::messaging::{self, Handler, HandlerWithContext}; -use near_primitives::hash::CryptoHash; use near_primitives::types::BlockHeight; -use near_store::{ShardUId, Store}; +use near_store::Store; use time::Duration; /// Dedicated actor for resharding V3. @@ -17,51 +19,36 @@ pub struct ReshardingActor { impl messaging::Actor for ReshardingActor {} -impl Handler for ReshardingActor { - fn handle(&mut self, msg: FlatStorageSplitShardRequest) { - match msg.resharder.split_shard_task() { - FlatStorageReshardingTaskStatus::Successful { .. } => { - // All good. - } - FlatStorageReshardingTaskStatus::Failed => { - panic!("impossible to recover from a flat storage split shard failure!") - } - FlatStorageReshardingTaskStatus::Cancelled => { - // The task has been cancelled. Nothing else to do. - } - FlatStorageReshardingTaskStatus::Postponed => { - // The task has been postponed for later execution. Nothing to do. - } - } - } -} - -impl HandlerWithContext for ReshardingActor { +impl HandlerWithContext for ReshardingActor { fn handle( &mut self, - msg: FlatStorageShardCatchupRequest, + msg: FlatStorageSplitShardRequest, ctx: &mut dyn DelayedActionRunner, ) { - // Shard catchup task is delayed and could get postponed several times. This must be - // done to cover the scenario in which catchup is triggered so fast that the initial - // state of the new flat storage is beyond the chain final tip. - ctx.run_later( - "ReshardingActor FlatStorageShardCatchup", - Duration::milliseconds(100), - move |act, _| { - act.handle_flat_storage_shard_catchup( - msg.resharder, - msg.shard_uid, - msg.flat_head_block_hash, - ); - }, - ); + self.handle_flat_storage_split_shard(msg.resharder, ctx); + } +} + +impl Handler for ReshardingActor { + fn handle(&mut self, msg: FlatStorageShardCatchupRequest) { + match msg.resharder.shard_catchup_task( + msg.shard_uid, + msg.flat_head_block_hash, + &self.chain_store, + ) { + FlatStorageReshardingTaskResult::Successful { .. } => { + // All good. + } + FlatStorageReshardingTaskResult::Failed => { + panic!("impossible to recover from a flat storage shard catchup failure!") + } + } } } impl Handler for ReshardingActor { fn handle(&mut self, _msg: MemtrieReloadRequest) { - // TODO + // TODO(resharding) } } @@ -70,24 +57,33 @@ impl ReshardingActor { Self { chain_store: ChainStore::new(store, genesis_height, false) } } - fn handle_flat_storage_shard_catchup( + fn handle_flat_storage_split_shard( &self, resharder: FlatStorageResharder, - shard_uid: ShardUId, - flat_head_block_hash: CryptoHash, + ctx: &mut dyn DelayedActionRunner, ) { - match resharder.shard_catchup_task(shard_uid, flat_head_block_hash, &self.chain_store) { - FlatStorageReshardingTaskStatus::Successful { .. } => { + // In order to run to completion, the split task must wait until the resharding block + // becomes final. If the resharding block is not yet final, the task will exit early with + // `Postponed` status and it must be rescheduled. + match resharder.split_shard_task(&self.chain_store) { + FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { // All good. } - FlatStorageReshardingTaskStatus::Failed => { - panic!("impossible to recover from a flat storage shard catchup failure!") + FlatStorageReshardingSchedulableTaskResult::Failed => { + panic!("impossible to recover from a flat storage split shard failure!") } - FlatStorageReshardingTaskStatus::Cancelled => { + FlatStorageReshardingSchedulableTaskResult::Cancelled => { // The task has been cancelled. Nothing else to do. } - FlatStorageReshardingTaskStatus::Postponed => { - // The task has been postponed for later execution. Nothing to do. + FlatStorageReshardingSchedulableTaskResult::Postponed => { + // The task must be retried later. + ctx.run_later( + "ReshardingActor FlatStorageSplitShard", + Duration::milliseconds(1000), + move |act, ctx| { + act.handle_flat_storage_split_shard(resharder, ctx); + }, + ); } } } diff --git a/core/store/src/flat/mod.rs b/core/store/src/flat/mod.rs index 65c3aa3a81e..af0083db349 100644 --- a/core/store/src/flat/mod.rs +++ b/core/store/src/flat/mod.rs @@ -41,7 +41,7 @@ pub use metrics::FlatStorageCreationMetrics; pub use storage::FlatStorage; pub use types::{ BlockInfo, FetchingStateStatus, FlatStateIterator, FlatStorageCreationStatus, FlatStorageError, - FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, SplittingParentStatus, + FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, ParentSplitParameters, }; pub(crate) const POISONED_LOCK_ERR: &str = "The lock was poisoned."; diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index b327f023243..581f478a552 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -142,7 +142,7 @@ pub enum FlatStorageCreationStatus { pub enum FlatStorageReshardingStatus { /// Resharding phase entered when a shard is being split. /// Copy key-value pairs from this shard (the parent) to children shards. - SplittingParent(SplittingParentStatus), + SplittingParent(ParentSplitParameters), /// Resharding phase entered when a shard is being split. /// This shard (child) is being built from state taken from its parent. CreatingChild, @@ -179,7 +179,7 @@ pub struct FetchingStateStatus { #[derive( BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema, )] -pub struct SplittingParentStatus { +pub struct ParentSplitParameters { /// UId of the left child shard. Will contain everything lesser than boundary account. pub left_child_shard: ShardUId, /// UId of the right child shard. Will contain everything greater or equal than boundary account. diff --git a/tools/protocol-schema-check/res/protocol_schema.toml b/tools/protocol-schema-check/res/protocol_schema.toml index d75382fbacb..1642969870f 100644 --- a/tools/protocol-schema-check/res/protocol_schema.toml +++ b/tools/protocol-schema-check/res/protocol_schema.toml @@ -137,8 +137,8 @@ FlatStateDeltaMetadata = 3401366797 FlatStateValue = 83834662 FlatStorageCreationStatus = 3717607657 FlatStorageReadyStatus = 677315221 -FlatStorageReshardingStatus = 743000213 -FlatStorageStatus = 271501637 +FlatStorageReshardingStatus = 1231081276 +FlatStorageStatus = 4109699695 FunctionCallAction = 2405840012 FunctionCallError = 3652274053 FunctionCallPermission = 1517509673 @@ -162,6 +162,7 @@ MethodResolveError = 1206790835 MissingTrieValueContext = 2666011379 NextEpochValidatorInfo = 3660299258 NonDelegateAction = 3255205790 +ParentSplitParameters = 1570407998 PartialEdgeInfo = 1350359189 PartialEncodedChunk = 2722484497 PartialEncodedChunkForwardMsg = 68012243 @@ -242,7 +243,6 @@ SignedTransaction = 3898692301 SlashState = 3264273950 SlashedValidator = 2601657743 SnapshotHostInfo = 2890323952 -SplittingParentStatus = 1188425274 StakeAction = 2002027105 StateChangeCause = 3890585134 StateHeaderKey = 1666317019