From 01ff4cef6626448998a3bcbc5be401dc15a394cf Mon Sep 17 00:00:00 2001 From: Squirrel Date: Wed, 23 Jun 2021 13:41:46 +0100 Subject: [PATCH] Result> rather than Option> (#9119) * Clearer API to code against. --- .../src/unsigned.rs | 40 +++++----- frame/example-offchain-worker/src/lib.rs | 16 ++-- frame/im-online/src/lib.rs | 12 ++- frame/session/src/historical/offchain.rs | 22 ++--- primitives/runtime/src/offchain/storage.rs | 80 +++++++++++++------ .../runtime/src/offchain/storage_lock.rs | 35 ++++---- primitives/trie/Cargo.toml | 2 +- 7 files changed, 121 insertions(+), 86 deletions(-) diff --git a/frame/election-provider-multi-phase/src/unsigned.rs b/frame/election-provider-multi-phase/src/unsigned.rs index 78726c542078c..543883fc035c5 100644 --- a/frame/election-provider-multi-phase/src/unsigned.rs +++ b/frame/election-provider-multi-phase/src/unsigned.rs @@ -29,7 +29,10 @@ use sp_npos_elections::{ CompactSolution, ElectionResult, assignment_ratio_to_staked_normalized, assignment_staked_to_ratio_normalized, is_score_better, seq_phragmen, }; -use sp_runtime::{offchain::storage::StorageValueRef, traits::TrailingZeroInput, SaturatedConversion}; +use sp_runtime::{ + offchain::storage::{MutateStorageError, StorageValueRef}, + traits::TrailingZeroInput, SaturatedConversion +}; use sp_std::{cmp::Ordering, convert::TryFrom, vec::Vec}; /// Storage key used to store the last block number at which offchain worker ran. @@ -98,9 +101,9 @@ fn save_solution(call: &Call) -> Result<(), MinerError> { log!(debug, "saving a call to the offchain storage."); let storage = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL); match storage.mutate::<_, (), _>(|_| Ok(call.clone())) { - Ok(Ok(_)) => Ok(()), - Ok(Err(_)) => Err(MinerError::FailedToStoreSolution), - Err(_) => { + Ok(_) => Ok(()), + Err(MutateStorageError::ConcurrentModification(_)) => Err(MinerError::FailedToStoreSolution), + Err(MutateStorageError::ValueFunctionFailed(_)) => { // this branch should be unreachable according to the definition of // `StorageValueRef::mutate`: that function should only ever `Err` if the closure we // pass it returns an error. however, for safety in case the definition changes, we do @@ -114,6 +117,7 @@ fn save_solution(call: &Call) -> Result<(), MinerError> { fn restore_solution() -> Result, MinerError> { StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL) .get() + .ok() .flatten() .ok_or(MinerError::NoStoredSolution) } @@ -135,12 +139,9 @@ fn clear_offchain_repeat_frequency() { } /// `true` when OCW storage contains a solution -/// -/// More precise than `restore_solution::().is_ok()`; that invocation will return `false` -/// if a solution exists but cannot be decoded, whereas this just checks whether an item is present. #[cfg(test)] fn ocw_solution_exists() -> bool { - StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL).get::>().is_some() + matches!(StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL).get::>(), Ok(Some(_))) } impl Pallet { @@ -584,13 +585,13 @@ impl Pallet { let last_block = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK); let mutate_stat = last_block.mutate::<_, &'static str, _>( - |maybe_head: Option>| { + |maybe_head: Result, _>| { match maybe_head { - Some(Some(head)) if now < head => Err("fork."), - Some(Some(head)) if now >= head && now <= head + threshold => { + Ok(Some(head)) if now < head => Err("fork."), + Ok(Some(head)) if now >= head && now <= head + threshold => { Err("recently executed.") } - Some(Some(head)) if now > head + threshold => { + Ok(Some(head)) if now > head + threshold => { // we can run again now. Write the new head. Ok(now) } @@ -604,11 +605,12 @@ impl Pallet { match mutate_stat { // all good - Ok(Ok(_)) => Ok(()), + Ok(_) => Ok(()), // failed to write. - Ok(Err(_)) => Err(MinerError::Lock("failed to write to offchain db.")), + Err(MutateStorageError::ConcurrentModification(_)) => + Err(MinerError::Lock("failed to write to offchain db (concurrent modification).")), // fork etc. - Err(why) => Err(MinerError::Lock(why)), + Err(MutateStorageError::ValueFunctionFailed(why)) => Err(MinerError::Lock(why)), } } @@ -1117,15 +1119,15 @@ mod tests { assert!(MultiPhase::current_phase().is_unsigned()); // initially, the lock is not set. - assert!(guard.get::().is_none()); + assert!(guard.get::().unwrap().is_none()); // a successful a-z execution. MultiPhase::offchain_worker(25); assert_eq!(pool.read().transactions.len(), 1); // afterwards, the lock is not set either.. - assert!(guard.get::().is_none()); - assert_eq!(last_block.get::().unwrap().unwrap(), 25); + assert!(guard.get::().unwrap().is_none()); + assert_eq!(last_block.get::().unwrap(), Some(25)); }); } @@ -1280,7 +1282,7 @@ mod tests { // this ensures that when the resubmit window rolls around, we're ready to regenerate // from scratch if necessary let mut call_cache = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL); - assert!(matches!(call_cache.get::>(), Some(Some(_call)))); + assert!(matches!(call_cache.get::>(), Ok(Some(_call)))); call_cache.clear(); // attempts to resubmit the tx after the threshold has expired diff --git a/frame/example-offchain-worker/src/lib.rs b/frame/example-offchain-worker/src/lib.rs index 1ec2591f5ec6b..b7a766ad847b2 100644 --- a/frame/example-offchain-worker/src/lib.rs +++ b/frame/example-offchain-worker/src/lib.rs @@ -53,7 +53,7 @@ use frame_support::traits::Get; use sp_core::crypto::KeyTypeId; use sp_runtime::{ RuntimeDebug, - offchain::{http, Duration, storage::StorageValueRef}, + offchain::{http, Duration, storage::{MutateStorageError, StorageRetrievalError, StorageValueRef}}, traits::Zero, transaction_validity::{InvalidTransaction, ValidTransaction, TransactionValidity}, }; @@ -366,15 +366,11 @@ impl Pallet { // low-level method of local storage API, which means that only one worker // will be able to "acquire a lock" and send a transaction if multiple workers // happen to be executed concurrently. - let res = val.mutate(|last_send: Option>| { - // We match on the value decoded from the storage. The first `Option` - // indicates if the value was present in the storage at all, - // the second (inner) `Option` indicates if the value was succesfuly - // decoded to expected type (`T::BlockNumber` in our case). + let res = val.mutate(|last_send: Result, StorageRetrievalError>| { match last_send { // If we already have a value in storage and the block number is recent enough // we avoid sending another transaction at this time. - Some(Some(block)) if block_number < block + T::GracePeriod::get() => { + Ok(Some(block)) if block_number < block + T::GracePeriod::get() => { Err(RECENTLY_SENT) }, // In every other case we attempt to acquire the lock and send a transaction. @@ -390,7 +386,7 @@ impl Pallet { // written to in the meantime. match res { // The value has been set correctly, which means we can safely send a transaction now. - Ok(Ok(block_number)) => { + Ok(block_number) => { // Depending if the block is even or odd we will send a `Signed` or `Unsigned` // transaction. // Note that this logic doesn't really guarantee that the transactions will be sent @@ -406,13 +402,13 @@ impl Pallet { else { TransactionType::Raw } }, // We are in the grace period, we should not send a transaction this time. - Err(RECENTLY_SENT) => TransactionType::None, + Err(MutateStorageError::ValueFunctionFailed(RECENTLY_SENT)) => TransactionType::None, // We wanted to send a transaction, but failed to write the block number (acquire a // lock). This indicates that another offchain worker that was running concurrently // most likely executed the same logic and succeeded at writing to storage. // Thus we don't really want to send the transaction, knowing that the other run // already did. - Ok(Err(_)) => TransactionType::None, + Err(MutateStorageError::ConcurrentModification(_)) => TransactionType::None, } } diff --git a/frame/im-online/src/lib.rs b/frame/im-online/src/lib.rs index 318e3d2de3ad2..3df5df7bb4d74 100644 --- a/frame/im-online/src/lib.rs +++ b/frame/im-online/src/lib.rs @@ -80,7 +80,7 @@ use sp_core::offchain::OpaqueNetworkState; use sp_std::prelude::*; use sp_std::convert::TryInto; use sp_runtime::{ - offchain::storage::StorageValueRef, + offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef}, traits::{AtLeast32BitUnsigned, Convert, Saturating, TrailingZeroInput}, Perbill, Permill, PerThing, RuntimeDebug, SaturatedConversion, }; @@ -719,14 +719,15 @@ impl Pallet { key }; let storage = StorageValueRef::persistent(&key); - let res = storage.mutate(|status: Option>>| { + let res = storage.mutate( + |status: Result>, StorageRetrievalError>| { // Check if there is already a lock for that particular block. // This means that the heartbeat has already been sent, and we are just waiting // for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD // we will re-send it. match status { // we are still waiting for inclusion. - Some(Some(status)) if status.is_recent(session_index, now) => { + Ok(Some(status)) if status.is_recent(session_index, now) => { Err(OffchainErr::WaitingForInclusion(status.sent_at)) }, // attempt to set new status @@ -735,7 +736,10 @@ impl Pallet { sent_at: now, }), } - })?; + }); + if let Err(MutateStorageError::ValueFunctionFailed(err)) = res { + return Err(err); + } let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?; diff --git a/frame/session/src/historical/offchain.rs b/frame/session/src/historical/offchain.rs index f675d878c1e28..68cc78029f12c 100644 --- a/frame/session/src/historical/offchain.rs +++ b/frame/session/src/historical/offchain.rs @@ -25,7 +25,10 @@ //! This is used in conjunction with [`ProvingTrie`](super::ProvingTrie) and //! the off-chain indexing API. -use sp_runtime::{offchain::storage::StorageValueRef, KeyTypeId}; +use sp_runtime::{ + offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef}, + KeyTypeId +}; use sp_session::MembershipProof; use super::super::{Pallet as SessionModule, SessionIndex}; @@ -49,6 +52,7 @@ impl ValidatorSet { let derived_key = shared::derive_key(shared::PREFIX, session_index); StorageValueRef::persistent(derived_key.as_ref()) .get::>() + .ok() .flatten() .map(|validator_set| Self { validator_set }) } @@ -100,19 +104,19 @@ pub fn prove_session_membership>( pub fn prune_older_than(first_to_keep: SessionIndex) { let derived_key = shared::LAST_PRUNE.to_vec(); let entry = StorageValueRef::persistent(derived_key.as_ref()); - match entry.mutate(|current: Option>| -> Result<_, ()> { + match entry.mutate(|current: Result, StorageRetrievalError>| -> Result<_, ()> { match current { - Some(Some(current)) if current < first_to_keep => Ok(first_to_keep), + Ok(Some(current)) if current < first_to_keep => Ok(first_to_keep), // do not move the cursor, if the new one would be behind ours - Some(Some(current)) => Ok(current), - None => Ok(first_to_keep), + Ok(Some(current)) => Ok(current), + Ok(None) => Ok(first_to_keep), // if the storage contains undecodable data, overwrite with current anyways // which might leak some entries being never purged, but that is acceptable // in this context - Some(None) => Ok(first_to_keep), + Err(_) => Ok(first_to_keep), } }) { - Ok(Ok(new_value)) => { + Ok(new_value) => { // on a re-org this is not necessarily true, with the above they might be equal if new_value < first_to_keep { for session_index in new_value..first_to_keep { @@ -121,8 +125,8 @@ pub fn prune_older_than(first_to_keep: SessionIndex) { } } } - Ok(Err(_)) => {} // failed to store the value calculated with the given closure - Err(_) => {} // failed to calculate the value to store with the given closure + Err(MutateStorageError::ConcurrentModification(_)) => {} + Err(MutateStorageError::ValueFunctionFailed(_)) => {} } } diff --git a/primitives/runtime/src/offchain/storage.rs b/primitives/runtime/src/offchain/storage.rs index 794ae4255a330..c6ed10c5be26f 100644 --- a/primitives/runtime/src/offchain/storage.rs +++ b/primitives/runtime/src/offchain/storage.rs @@ -28,6 +28,25 @@ pub struct StorageValueRef<'a> { kind: StorageKind, } +/// Reason for not being able to provide the stored value +#[derive(Debug, PartialEq, Eq)] +pub enum StorageRetrievalError { + /// Value found but undecodable + Undecodable, +} + +/// Possible errors when mutating a storage value. +#[derive(Debug, PartialEq, Eq)] +pub enum MutateStorageError { + /// The underlying db failed to update due to a concurrent modification. + /// Contains the new value that was not stored. + ConcurrentModification(T), + /// The function given to us to create the value to be stored failed. + /// May be used to signal that having looked at the existing value, + /// they don't want to mutate it. + ValueFunctionFailed(E) +} + impl<'a> StorageValueRef<'a> { /// Create a new reference to a value in the persistent local storage. pub fn persistent(key: &'a [u8]) -> Self { @@ -58,30 +77,40 @@ impl<'a> StorageValueRef<'a> { /// Retrieve & decode the value from storage. /// /// Note that if you want to do some checks based on the value - /// and write changes after that you should rather be using `mutate`. + /// and write changes after that, you should rather be using `mutate`. /// - /// The function returns `None` if the value was not found in storage, - /// otherwise a decoding of the value to requested type. - pub fn get(&self) -> Option> { + /// Returns the value if stored. + /// Returns an error if the value could not be decoded. + pub fn get(&self) -> Result, StorageRetrievalError> { sp_io::offchain::local_storage_get(self.kind, self.key) - .map(|val| T::decode(&mut &*val).ok()) + .map(|val| T::decode(&mut &*val) + .map_err(|_| StorageRetrievalError::Undecodable)) + .transpose() } - /// Retrieve & decode the value and set it to a new one atomically. + /// Retrieve & decode the current value and set it to a new value atomically. + /// + /// Function `mutate_val` takes as input the current value and should + /// return a new value that is attempted to be written to storage. /// - /// Function `f` should return a new value that we should attempt to write to storage. /// This function returns: - /// 1. `Ok(Ok(T))` in case the value has been successfully set. - /// 2. `Ok(Err(T))` in case the value was calculated by the passed closure `f`, - /// but it could not be stored. - /// 3. `Err(_)` in case `f` returns an error. - pub fn mutate(&self, f: F) -> Result, E> where + /// 1. `Ok(T)` in case the value has been successfully set. + /// 2. `Err(MutateStorageError::ConcurrentModification(T))` in case the value was calculated + /// by the passed closure `mutate_val`, but it could not be stored. + /// 3. `Err(MutateStorageError::ValueFunctionFailed(_))` in case `mutate_val` returns an error. + pub fn mutate(&self, mutate_val: F) -> Result> where T: codec::Codec, - F: FnOnce(Option>) -> Result + F: FnOnce(Result, StorageRetrievalError>) -> Result { let value = sp_io::offchain::local_storage_get(self.kind, self.key); - let decoded = value.as_deref().map(|mut v| T::decode(&mut v).ok()); - let val = f(decoded)?; + let decoded = value.as_deref() + .map(|mut bytes| { + T::decode(&mut bytes) + .map_err(|_| StorageRetrievalError::Undecodable) + }).transpose(); + + let val = mutate_val(decoded).map_err(|err| MutateStorageError::ValueFunctionFailed(err))?; + let set = val.using_encoded(|new_val| { sp_io::offchain::local_storage_compare_and_set( self.kind, @@ -90,11 +119,10 @@ impl<'a> StorageValueRef<'a> { new_val, ) }); - if set { - Ok(Ok(val)) + Ok(val) } else { - Ok(Err(val)) + Err(MutateStorageError::ConcurrentModification(val)) } } } @@ -117,12 +145,12 @@ mod tests { t.execute_with(|| { let val = StorageValue::persistent(b"testval"); - assert_eq!(val.get::(), None); + assert_eq!(val.get::(), Ok(None)); val.set(&15_u32); - assert_eq!(val.get::(), Some(Some(15_u32))); - assert_eq!(val.get::>(), Some(None)); + assert_eq!(val.get::(), Ok(Some(15_u32))); + assert_eq!(val.get::>(), Err(StorageRetrievalError::Undecodable)); assert_eq!( state.read().persistent_storage.get(b"testval"), Some(vec![15_u8, 0, 0, 0]) @@ -140,12 +168,12 @@ mod tests { let val = StorageValue::persistent(b"testval"); let result = val.mutate::(|val| { - assert_eq!(val, None); + assert_eq!(val, Ok(None)); Ok(16_u32) }); - assert_eq!(result, Ok(Ok(16_u32))); - assert_eq!(val.get::(), Some(Some(16_u32))); + assert_eq!(result, Ok(16_u32)); + assert_eq!(val.get::(), Ok(Some(16_u32))); assert_eq!( state.read().persistent_storage.get(b"testval"), Some(vec![16_u8, 0, 0, 0]) @@ -153,10 +181,10 @@ mod tests { // mutate again, but this time early-exit. let res = val.mutate::(|val| { - assert_eq!(val, Some(Some(16_u32))); + assert_eq!(val, Ok(Some(16_u32))); Err(()) }); - assert_eq!(res, Err(())); + assert_eq!(res, Err(MutateStorageError::ValueFunctionFailed(()))); }) } } diff --git a/primitives/runtime/src/offchain/storage_lock.rs b/primitives/runtime/src/offchain/storage_lock.rs index c3e63a7924d7b..3189a814e06fd 100644 --- a/primitives/runtime/src/offchain/storage_lock.rs +++ b/primitives/runtime/src/offchain/storage_lock.rs @@ -61,7 +61,7 @@ //! } //! ``` -use crate::offchain::storage::StorageValueRef; +use crate::offchain::storage::{StorageRetrievalError, MutateStorageError, StorageValueRef}; use crate::traits::AtLeast32BitUnsigned; use codec::{Codec, Decode, Encode}; use sp_core::offchain::{Duration, Timestamp}; @@ -279,19 +279,20 @@ impl<'a, L: Lockable> StorageLock<'a, L> { /// Extend active lock's deadline fn extend_active_lock(&mut self) -> Result<::Deadline, ()> { - let res = self.value_ref.mutate(|s: Option>| -> Result<::Deadline, ()> { + let res = self.value_ref.mutate( + |s: Result, StorageRetrievalError>| -> Result<::Deadline, ()> { match s { // lock is present and is still active, extend the lock. - Some(Some(deadline)) if !::has_expired(&deadline) => + Ok(Some(deadline)) if !::has_expired(&deadline) => Ok(self.lockable.deadline()), // other cases _ => Err(()), } }); match res { - Ok(Ok(deadline)) => Ok(deadline), - Ok(Err(_)) => Err(()), - Err(e) => Err(e), + Ok(deadline) => Ok(deadline), + Err(MutateStorageError::ConcurrentModification(_)) => Err(()), + Err(MutateStorageError::ValueFunctionFailed(e)) => Err(e), } } @@ -301,25 +302,25 @@ impl<'a, L: Lockable> StorageLock<'a, L> { new_deadline: L::Deadline, ) -> Result<(), ::Deadline> { let res = self.value_ref.mutate( - |s: Option>| + |s: Result, StorageRetrievalError>| -> Result<::Deadline, ::Deadline> { match s { // no lock set, we can safely acquire it - None => Ok(new_deadline), + Ok(None) => Ok(new_deadline), // write was good, but read failed - Some(None) => Ok(new_deadline), + Err(_) => Ok(new_deadline), // lock is set, but it is expired. We can re-acquire it. - Some(Some(deadline)) if ::has_expired(&deadline) => + Ok(Some(deadline)) if ::has_expired(&deadline) => Ok(new_deadline), // lock is present and is still active - Some(Some(deadline)) => Err(deadline), + Ok(Some(deadline)) => Err(deadline), } }, ); match res { - Ok(Ok(_)) => Ok(()), - Ok(Err(deadline)) => Err(deadline), - Err(e) => Err(e), + Ok(_) => Ok(()), + Err(MutateStorageError::ConcurrentModification(deadline)) => Err(deadline), + Err(MutateStorageError::ValueFunctionFailed(e)) => Err(e), } } @@ -488,14 +489,14 @@ mod tests { val.set(&VAL_1); - assert_eq!(val.get::(), Some(Some(VAL_1))); + assert_eq!(val.get::(), Ok(Some(VAL_1))); } { let _guard = lock.lock(); val.set(&VAL_2); - assert_eq!(val.get::(), Some(Some(VAL_2))); + assert_eq!(val.get::(), Ok(Some(VAL_2))); } }); // lock must have been cleared at this point @@ -518,7 +519,7 @@ mod tests { val.set(&VAL_1); - assert_eq!(val.get::(), Some(Some(VAL_1))); + assert_eq!(val.get::(), Ok(Some(VAL_1))); guard.forget(); }); diff --git a/primitives/trie/Cargo.toml b/primitives/trie/Cargo.toml index 9584ae678d409..bf91fff31b8b6 100644 --- a/primitives/trie/Cargo.toml +++ b/primitives/trie/Cargo.toml @@ -21,7 +21,7 @@ harness = false codec = { package = "parity-scale-codec", version = "2.0.0", default-features = false } sp-std = { version = "3.0.0", default-features = false, path = "../std" } hash-db = { version = "0.15.2", default-features = false } -trie-db = { version = "0.22.3", default-features = false } +trie-db = { version = "0.22.5", default-features = false } trie-root = { version = "0.16.0", default-features = false } memory-db = { version = "0.26.0", default-features = false } sp-core = { version = "3.0.0", default-features = false, path = "../core" }