Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

remote-externalities: batch insert key/values #14004

Merged
merged 5 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions primitives/state-machine/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ where
self.offchain_db.clone()
}

/// Batch insert key/values into backend
pub fn batch_insert(&mut self, kvs: Vec<(StorageKey, StorageValue)>) {
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
self.backend.insert(
vec![(None, kvs.into_iter().map(|(k, v)| (k, Some(v))).collect())],
self.state_version,
);
}

/// Insert key/value into backend
pub fn insert(&mut self, k: StorageKey, v: StorageValue) {
self.backend.insert(vec![(None, vec![(k, Some(v))])], self.state_version);
Expand Down
51 changes: 22 additions & 29 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,24 +638,20 @@ where
let mut processed = 0usize;
loop {
match rx.next().await.unwrap() {
Message::Batch(kv) => {
for (k, v) in kv {
processed += 1;
if processed % 50_000 == 0 || processed == keys.len() || processed == 1 {
log::info!(
target: LOG_TARGET,
"inserting keys progress = {:.0}% [{} / {}]",
(processed as f32 / keys.len() as f32) * 100f32,
processed,
keys.len(),
);
}
// skip writing the child root data.
if is_default_child_storage_key(k.as_ref()) {
continue
}
pending_ext.insert(k, v);
}
Message::Batch(kvs) => {
let kvs = kvs
.into_iter()
.filter(|(k, _)| !is_default_child_storage_key(k))
.collect::<Vec<_>>();
processed += kvs.len();
pending_ext.batch_insert(kvs);
log::info!(
target: LOG_TARGET,
"inserting keys progress = {:.0}% [{} / {}]",
(processed as f32 / keys.len() as f32) * 100f32,
processed,
keys.len(),
);
},
Message::BatchFailed(error) => {
log::error!(target: LOG_TARGET, "Batch processing failed: {:?}", error);
Expand Down Expand Up @@ -876,7 +872,7 @@ where
);
match self.rpc_get_storage(key.clone(), Some(at)).await? {
Some(value) => {
pending_ext.insert(key.clone().0, value.clone().0);
pending_ext.batch_insert(vec![(key.clone().0, value.clone().0)]);
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
keys_and_values.push((key, value));
},
None => {
Expand Down Expand Up @@ -1010,13 +1006,12 @@ where
);

info!(target: LOG_TARGET, "injecting a total of {} top keys", top.len());
for (k, v) in top {
// skip writing the child root data.
if is_default_child_storage_key(k.as_ref()) {
continue
}
inner_ext.insert(k.0, v.0);
}
let top = top
.into_iter()
.filter(|(k, _)| !is_default_child_storage_key(k.as_ref()))
.map(|(k, v)| (k.0, v.0))
.collect::<Vec<_>>();
inner_ext.batch_insert(top);

info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1052,9 +1047,7 @@ where
"extending externalities with {} manually injected key-values",
self.hashed_key_values.len()
);
for (k, v) in self.hashed_key_values {
ext.insert(k.0, v.0);
}
ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)).collect());
}

// exclude manual key values.
Expand Down