Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote-ext: fix state download stall on slow connections and reduce memory usage #1295

Merged
merged 16 commits into from
Oct 10, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use sp_runtime::{
use sp_state_machine::TestExternalities;
use spinners::{Spinner, Spinners};
use std::{
cmp::max,
cmp::{max, min},
fs,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
Expand Down Expand Up @@ -353,7 +353,8 @@ where
const PARALLEL_REQUESTS: usize = 4;
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const INITIAL_BATCH_SIZE: usize = 5000;
const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(2);
const INITIAL_BATCH_SIZE: usize = 10;
// nodes by default will not return more than 1000 keys per request
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
const KEYS_PAGE_MAX_RETRIES: usize = 12;
Expand Down Expand Up @@ -514,6 +515,7 @@ where
.insert(method, params.clone())
.map_err(|_| "Invalid batch method and/or params")?
}
let request_started = Instant::now();
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
Ok(batch_response) => batch_response,
Err(e) => {
Expand All @@ -523,20 +525,39 @@ where

log::debug!(
target: LOG_TARGET,
"Batch request failed, trying again with smaller batch size. {}",
"Batch request failed, resetting batch size to 1. Error: {}",
e.to_string()
);

return Self::get_storage_data_dynamic_batch_size(
client,
payloads,
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize),
bar,
)
.await
// Request timed out or server errored. This is very bad. Try to get things moving
// again by starting again with just 1 item.
return Self::get_storage_data_dynamic_batch_size(client, payloads, 1, bar).await
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
},
};

// Request succeeded. Decide whether to increase or decrease the batch size for the next
// request, depending on if the elapsed time was greater than or less than the target.
let request_duration = request_started.elapsed();
let next_batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
} else {
// Increase the batch size by *at most* the number of remaining payloads
min(
payloads.len(),
// Increase the batch size by *at least* 1
max(
batch_size + 1,
(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
),
)
};

log::debug!(
target: LOG_TARGET,
"Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}",
request_duration, Self::REQUEST_DURATION_TARGET, batch_size, next_batch_size
);

// Collect the data from this batch
let mut data: Vec<Option<StorageData>> = vec![];
let batch_response_len = batch_response.len();
Expand All @@ -553,7 +574,7 @@ where
let mut rest = Self::get_storage_data_dynamic_batch_size(
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
client,
remaining_payloads,
max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize),
next_batch_size,
bar,
)
.await?;
Expand Down