Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Mmr persist state (#12822)
Browse files Browse the repository at this point in the history
client/mmr: persisting gadget state across runs

Fixes #12780

* client/mmr: on init do canonicalization catch-up

* client/mmr: add more tests

* client/mmr: persist gadget progress in aux db

* client/mmr: add more tests

* client/mmr: replace async_std with tokio

* remove leftover comment

* address review comments

Signed-off-by: acatangiu <adrian@parity.io>
  • Loading branch information
acatangiu authored Dec 7, 2022
1 parent 11c5057 commit 0d153c9
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 82 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion client/merkle-mountain-range/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ sc-offchain = { version = "4.0.0-dev", path = "../offchain" }
sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" }

[dev-dependencies]
tokio = "1.17.0"
parking_lot = "0.12.1"
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" }
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
tokio = "1.17.0"
228 changes: 228 additions & 0 deletions client/merkle-mountain-range/src/aux_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// This file is part of Substrate.

// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Schema for MMR-gadget state persisted in the aux-db.
use crate::LOG_TARGET;
use codec::{Decode, Encode};
use log::{info, trace};
use sc_client_api::backend::AuxStore;
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_runtime::traits::{Block, NumberFor};

const VERSION_KEY: &[u8] = b"mmr_auxschema_version";
const GADGET_STATE: &[u8] = b"mmr_gadget_state";

const CURRENT_VERSION: u32 = 1;
pub(crate) type PersistedState<B> = NumberFor<B>;

pub(crate) fn write_current_version<B: AuxStore>(backend: &B) -> ClientResult<()> {
info!(target: LOG_TARGET, "write aux schema version {:?}", CURRENT_VERSION);
AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])
}

/// Write gadget state.
pub(crate) fn write_gadget_state<B: Block, BE: AuxStore>(
backend: &BE,
state: &PersistedState<B>,
) -> ClientResult<()> {
trace!(target: LOG_TARGET, "persisting {:?}", state);
backend.insert_aux(&[(GADGET_STATE, state.encode().as_slice())], &[])
}

fn load_decode<B: AuxStore, T: Decode>(backend: &B, key: &[u8]) -> ClientResult<Option<T>> {
match backend.get_aux(key)? {
None => Ok(None),
Some(t) => T::decode(&mut &t[..])
.map_err(|e| ClientError::Backend(format!("MMR aux DB is corrupted: {}", e)))
.map(Some),
}
}

/// Load or initialize persistent data from backend.
pub(crate) fn load_persistent<B, BE>(backend: &BE) -> ClientResult<Option<PersistedState<B>>>
where
B: Block,
BE: AuxStore,
{
let version: Option<u32> = load_decode(backend, VERSION_KEY)?;

match version {
None => (),
Some(1) => return load_decode::<_, PersistedState<B>>(backend, GADGET_STATE),
other =>
return Err(ClientError::Backend(format!("Unsupported MMR aux DB version: {:?}", other))),
}

// No persistent state found in DB.
Ok(None)
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::test_utils::{
run_test_with_mmr_gadget_pre_post_using_client, MmrBlock, MockClient, OffchainKeyType,
};
use parking_lot::Mutex;
use sp_core::offchain::{DbExternalities, StorageKind};
use sp_mmr_primitives::utils::NodesUtils;
use sp_runtime::generic::BlockId;
use std::{sync::Arc, time::Duration};
use substrate_test_runtime_client::{runtime::Block, Backend};

#[test]
fn should_load_persistent_sanity_checks() {
let client = MockClient::new();
let backend = &*client.backend;

// version not available in db -> None
assert_eq!(load_persistent::<Block, Backend>(backend).unwrap(), None);

// populate version in db
write_current_version(backend).unwrap();
// verify correct version is retrieved
assert_eq!(load_decode(backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION));

// version is available in db but state isn't -> None
assert_eq!(load_persistent::<Block, Backend>(backend).unwrap(), None);
}

#[test]
fn should_persist_progress_across_runs() {
sp_tracing::try_init_simple();

let client = Arc::new(MockClient::new());
let backend = client.backend.clone();

// version not available in db -> None
assert_eq!(load_decode::<Backend, Option<u32>>(&*backend, VERSION_KEY).unwrap(), None);
// state not available in db -> None
assert_eq!(load_persistent::<Block, Backend>(&*backend).unwrap(), None);
// run the gadget while importing and finalizing 3 blocks
run_test_with_mmr_gadget_pre_post_using_client(
client.clone(),
|_| async {},
|client| async move {
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
let a2 = client.import_block(&BlockId::Number(1), b"a2", Some(1)).await;
let a3 = client.import_block(&BlockId::Number(2), b"a3", Some(2)).await;
client.finalize_block(a3.hash(), Some(3));
tokio::time::sleep(Duration::from_millis(200)).await;
// a1, a2, a3 were canonicalized
client.assert_canonicalized(&[&a1, &a2, &a3]);
},
);

// verify previous progress was persisted and run the gadget again
run_test_with_mmr_gadget_pre_post_using_client(
client.clone(),
|client| async move {
let backend = &*client.backend;
// check there is both version and best canon available in db before running gadget
assert_eq!(load_decode(backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION));
assert_eq!(load_persistent::<Block, Backend>(backend).unwrap(), Some(3));
},
|client| async move {
let a4 = client.import_block(&BlockId::Number(3), b"a4", Some(3)).await;
let a5 = client.import_block(&BlockId::Number(4), b"a5", Some(4)).await;
let a6 = client.import_block(&BlockId::Number(5), b"a6", Some(5)).await;
client.finalize_block(a6.hash(), Some(6));
tokio::time::sleep(Duration::from_millis(200)).await;

// a4, a5, a6 were canonicalized
client.assert_canonicalized(&[&a4, &a5, &a6]);
// check persisted best canon was updated
assert_eq!(load_persistent::<Block, Backend>(&*client.backend).unwrap(), Some(6));
},
);
}

#[test]
fn should_resume_from_persisted_state() {
sp_tracing::try_init_simple();

let client = Arc::new(MockClient::new());
let blocks = Arc::new(Mutex::new(Vec::<MmrBlock>::new()));
let blocks_clone = blocks.clone();

// run the gadget while importing and finalizing 3 blocks
run_test_with_mmr_gadget_pre_post_using_client(
client.clone(),
|_| async {},
|client| async move {
let mut blocks = blocks_clone.lock();
blocks.push(client.import_block(&BlockId::Number(0), b"a1", Some(0)).await);
blocks.push(client.import_block(&BlockId::Number(1), b"a2", Some(1)).await);
blocks.push(client.import_block(&BlockId::Number(2), b"a3", Some(2)).await);
client.finalize_block(blocks.last().unwrap().hash(), Some(3));
tokio::time::sleep(Duration::from_millis(200)).await;
// a1, a2, a3 were canonicalized
let slice: Vec<&MmrBlock> = blocks.iter().collect();
client.assert_canonicalized(&slice);

// now manually move them back to non-canon/temp location
let mut offchain_db = client.offchain_db();
for mmr_block in slice {
for node in NodesUtils::right_branch_ending_in_leaf(mmr_block.leaf_idx.unwrap())
{
let canon_key = mmr_block.get_offchain_key(node, OffchainKeyType::Canon);
let val = offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key)
.unwrap();
offchain_db.local_storage_clear(StorageKind::PERSISTENT, &canon_key);

let temp_key = mmr_block.get_offchain_key(node, OffchainKeyType::Temp);
offchain_db.local_storage_set(StorageKind::PERSISTENT, &temp_key, &val);
}
}
},
);

let blocks_clone = blocks.clone();
// verify new gadget continues from block 4 and ignores 1, 2, 3 based on persisted state
run_test_with_mmr_gadget_pre_post_using_client(
client.clone(),
|client| async move {
let blocks = blocks_clone.lock();
let slice: Vec<&MmrBlock> = blocks.iter().collect();

// verify persisted state says a1, a2, a3 were canonicalized,
assert_eq!(load_persistent::<Block, Backend>(&*client.backend).unwrap(), Some(3));
// but actually they are NOT canon (we manually reverted them earlier).
client.assert_not_canonicalized(&slice);
},
|client| async move {
let a4 = client.import_block(&BlockId::Number(3), b"a4", Some(3)).await;
let a5 = client.import_block(&BlockId::Number(4), b"a5", Some(4)).await;
let a6 = client.import_block(&BlockId::Number(5), b"a6", Some(5)).await;
client.finalize_block(a6.hash(), Some(6));
tokio::time::sleep(Duration::from_millis(200)).await;

let block_1_to_3 = blocks.lock();
let slice: Vec<&MmrBlock> = block_1_to_3.iter().collect();
// verify a1, a2, a3 are still NOT canon (skipped by gadget based on data in aux db)
client.assert_not_canonicalized(&slice);
// but a4, a5, a6 were canonicalized
client.assert_canonicalized(&[&a4, &a5, &a6]);
// check persisted best canon was updated
assert_eq!(load_persistent::<Block, Backend>(&*client.backend).unwrap(), Some(6));
},
);
}
}
64 changes: 45 additions & 19 deletions client/merkle-mountain-range/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
#![warn(missing_docs)]

mod aux_schema;
mod offchain_mmr;
#[cfg(test)]
pub mod test_utils;

use std::{marker::PhantomData, sync::Arc};

use crate::offchain_mmr::OffchainMmr;
use beefy_primitives::MmrRootHash;
use futures::StreamExt;
use log::{error, trace, warn};

use log::{debug, error, trace, warn};
use sc_client_api::{Backend, BlockchainEvents, FinalityNotifications};
use sc_offchain::OffchainDb;
use sp_api::ProvideRuntimeApi;
Expand All @@ -55,50 +55,75 @@ use sp_runtime::{
generic::BlockId,
traits::{Block, Header, NumberFor},
};

use crate::offchain_mmr::OffchainMMR;
use beefy_primitives::MmrRootHash;
use sp_core::offchain::OffchainStorage;
use std::{marker::PhantomData, sync::Arc};

/// Logging target for the mmr gadget.
pub const LOG_TARGET: &str = "mmr";

struct OffchainMmrBuilder<B: Block, C, S> {
struct OffchainMmrBuilder<B: Block, BE: Backend<B>, C> {
backend: Arc<BE>,
client: Arc<C>,
offchain_db: OffchainDb<S>,
offchain_db: OffchainDb<BE::OffchainStorage>,
indexing_prefix: Vec<u8>,

_phantom: PhantomData<B>,
}

impl<B, C, S> OffchainMmrBuilder<B, C, S>
impl<B, BE, C> OffchainMmrBuilder<B, BE, C>
where
B: Block,
BE: Backend<B>,
C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B>,
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
S: OffchainStorage,
{
async fn try_build(
self,
finality_notifications: &mut FinalityNotifications<B>,
) -> Option<OffchainMMR<C, B, S>> {
) -> Option<OffchainMmr<B, BE, C>> {
while let Some(notification) = finality_notifications.next().await {
let best_block = *notification.header.number();
match self.client.runtime_api().mmr_leaf_count(&BlockId::number(best_block)) {
Ok(Ok(mmr_leaf_count)) => {
debug!(
target: LOG_TARGET,
"pallet-mmr detected at block {:?} with mmr size {:?}",
best_block,
mmr_leaf_count
);
match utils::first_mmr_block_num::<B::Header>(best_block, mmr_leaf_count) {
Ok(first_mmr_block) => {
let mut offchain_mmr = OffchainMMR {
debug!(
target: LOG_TARGET,
"pallet-mmr genesis computed at block {:?}", first_mmr_block,
);
let best_canonicalized =
match offchain_mmr::load_or_init_best_canonicalized::<B, BE>(
&*self.backend,
first_mmr_block,
) {
Ok(best) => best,
Err(e) => {
error!(
target: LOG_TARGET,
"Error loading state from aux db: {:?}", e
);
return None
},
};
let mut offchain_mmr = OffchainMmr {
backend: self.backend,
client: self.client,
offchain_db: self.offchain_db,
indexing_prefix: self.indexing_prefix,
first_mmr_block,

_phantom: Default::default(),
best_canonicalized,
};
// We need to make sure all blocks leading up to current notification
// have also been canonicalized.
offchain_mmr.canonicalize_catch_up(&notification);
// We have to canonicalize and prune the blocks in the finality
// notification that lead to building the offchain-mmr as well.
offchain_mmr.canonicalize_and_prune(&notification);
offchain_mmr.canonicalize_and_prune(notification);
return Some(offchain_mmr)
},
Err(e) => {
Expand Down Expand Up @@ -143,14 +168,14 @@ where
C: BlockchainEvents<B> + HeaderBackend<B> + HeaderMetadata<B> + ProvideRuntimeApi<B>,
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
{
async fn run(mut self, builder: OffchainMmrBuilder<B, C, BE::OffchainStorage>) {
async fn run(mut self, builder: OffchainMmrBuilder<B, BE, C>) {
let mut offchain_mmr = match builder.try_build(&mut self.finality_notifications).await {
Some(offchain_mmr) => offchain_mmr,
None => return,
};

while let Some(notification) = self.finality_notifications.next().await {
offchain_mmr.canonicalize_and_prune(&notification);
offchain_mmr.canonicalize_and_prune(notification);
}
}

Expand All @@ -174,6 +199,7 @@ where
};
mmr_gadget
.run(OffchainMmrBuilder {
backend,
client,
offchain_db,
indexing_prefix,
Expand Down
Loading

0 comments on commit 0d153c9

Please sign in to comment.