Skip to content

Commit

Permalink
adjust state sync timeouts (#12425)
Browse files Browse the repository at this point in the history
This PR improves configurability of state sync by separating the config
parameters for different kinds of timeouts:
- `state_sync_timeout` is used as it always has been when downloading
from external storage.
- `state_sync_p2p_timeout` is used when downloading from peers in the
network.
- `state_sync_retry_timeout` controls time between download attempts.

We also lower the number of retry attempts for p2p state sync from 5 to
3. Why? In testnet we see that requests are generally handled reliably
and if a request fails it is usually because the target node is
throttling incoming requests. In the case that peers are overwhelmed by
traffic we would rather fall back to cloud storage sooner than send more
requests (which will be throttled anyway).
  • Loading branch information
saketh-are authored Nov 11, 2024
1 parent 274c118 commit f99bb96
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 44 deletions.
9 changes: 6 additions & 3 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ impl Client {
epoch_manager.clone(),
runtime_adapter.clone(),
network_adapter.clone().into_sender(),
config.state_sync_timeout,
config.state_sync_external_timeout,
config.state_sync_p2p_timeout,
config.state_sync_retry_timeout,
&config.chain_id,
&config.state_sync.sync,
chain_sender_for_state_sync.clone(),
Expand Down Expand Up @@ -2544,7 +2546,6 @@ impl Client {
{
assert_eq!(&epoch_first_block, state_sync_info.epoch_first_block());

let state_sync_timeout = self.config.state_sync_timeout;
let block_header = self.chain.get_block(&epoch_first_block)?.header().clone();
let epoch_id = block_header.epoch_id();

Expand All @@ -2563,7 +2564,9 @@ impl Client {
self.epoch_manager.clone(),
self.runtime_adapter.clone(),
self.network_adapter.clone().into_sender(),
state_sync_timeout,
self.config.state_sync_external_timeout,
self.config.state_sync_p2p_timeout,
self.config.state_sync_retry_timeout,
&self.config.chain_id,
&self.config.state_sync.sync,
self.chain_sender_for_state_sync.clone(),
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,7 @@ impl ClientActorInner {
if block_exists {
return Ok((false, true));
}
let timeout = self.client.config.state_sync_timeout;
let timeout = self.client.config.state_sync_external_timeout;
let timeout = near_async::time::Duration::try_from(timeout);
let timeout = timeout.unwrap();

Expand Down
8 changes: 8 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,14 @@ pub(crate) static VIEW_CLIENT_MESSAGE_TIME: LazyLock<HistogramVec> = LazyLock::n
.unwrap()
});

pub(crate) static STATE_SYNC_REQUESTS_THROTTLED_TOTAL: LazyLock<IntCounter> = LazyLock::new(|| {
try_create_int_counter(
"near_state_sync_requests_throttled_total",
"Total number of state sync requests which were received and ignored",
)
.unwrap()
});

pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
Expand Down
12 changes: 7 additions & 5 deletions chain/client/src/sync/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ impl StateSync {
epoch_manager: Arc<dyn EpochManagerAdapter>,
runtime: Arc<dyn RuntimeAdapter>,
network_adapter: AsyncSender<PeerManagerMessageRequest, PeerManagerMessageResponse>,
timeout: Duration,
external_timeout: Duration,
p2p_timeout: Duration,
retry_timeout: Duration,
chain_id: &str,
sync_config: &SyncConfig,
chain_requests_sender: ChainSenderForStateSync,
Expand All @@ -102,7 +104,7 @@ impl StateSync {
clock: clock.clone(),
store: store.clone(),
request_sender: network_adapter,
request_timeout: timeout,
request_timeout: p2p_timeout,
state: peer_source_state.clone(),
}) as Arc<dyn StateSyncDownloadSource>;
let (fallback_source, num_attempts_before_fallback, num_concurrent_requests) =
Expand All @@ -118,7 +120,7 @@ impl StateSync {
let bucket = create_bucket_readonly(
&bucket,
&region,
timeout.max(Duration::ZERO).unsigned_abs(),
external_timeout.max(Duration::ZERO).unsigned_abs(),
);
if let Err(err) = bucket {
panic!("Failed to create an S3 bucket: {}", err);
Expand All @@ -144,7 +146,7 @@ impl StateSync {
store: store.clone(),
chain_id: chain_id.to_string(),
conn: external,
timeout,
timeout: external_timeout,
}) as Arc<dyn StateSyncDownloadSource>;
(
Some(fallback_source),
Expand All @@ -164,7 +166,7 @@ impl StateSync {
num_attempts_before_fallback,
header_validation_sender: chain_requests_sender.clone().into_sender(),
runtime: runtime.clone(),
retry_timeout: timeout, // TODO: This is not what timeout meant. Introduce a new parameter.
retry_timeout,
task_tracker: downloading_task_tracker.clone(),
});

Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/view_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ impl Handler<StateRequestHeader> for ViewClientActorInner {
.start_timer();
let StateRequestHeader { shard_id, sync_hash } = msg;
if self.throttle_state_sync_request() {
tracing::debug!(target: "sync", ?sync_hash, "Throttle state sync requests");
metrics::STATE_SYNC_REQUESTS_THROTTLED_TOTAL.inc();
return None;
}
let header = match self.chain.check_sync_hash_validity(&sync_hash) {
Expand Down Expand Up @@ -1424,7 +1424,7 @@ impl Handler<StateRequestPart> for ViewClientActorInner {
.start_timer();
let StateRequestPart { shard_id, sync_hash, part_id } = msg;
if self.throttle_state_sync_request() {
tracing::debug!(target: "sync", ?sync_hash, "Throttle state sync requests");
metrics::STATE_SYNC_REQUESTS_THROTTLED_TOTAL.inc();
return None;
}
if let Err(err) = self.has_state_snapshot(&sync_hash, shard_id) {
Expand Down
24 changes: 19 additions & 5 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub const DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL: u32 =

/// The default number of attempts to obtain a state part from peers in the network
/// before giving up and downloading it from external storage.
pub const DEFAULT_EXTERNAL_STORAGE_FALLBACK_THRESHOLD: u64 = 5;
pub const DEFAULT_EXTERNAL_STORAGE_FALLBACK_THRESHOLD: u64 = 3;

/// Configuration for garbage collection.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
Expand Down Expand Up @@ -308,10 +308,18 @@ pub fn default_header_sync_stall_ban_timeout() -> Duration {
Duration::seconds(120)
}

pub fn default_state_sync_timeout() -> Duration {
pub fn default_state_sync_external_timeout() -> Duration {
Duration::seconds(60)
}

pub fn default_state_sync_p2p_timeout() -> Duration {
Duration::seconds(10)
}

pub fn default_state_sync_retry_timeout() -> Duration {
Duration::seconds(1)
}

pub fn default_header_sync_expected_height_per_second() -> u64 {
10
}
Expand Down Expand Up @@ -441,8 +449,12 @@ pub struct ClientConfig {
pub header_sync_stall_ban_timeout: Duration,
/// Expected increase of header head height per second during header sync
pub header_sync_expected_height_per_second: u64,
/// How long to wait for a response during state sync
pub state_sync_timeout: Duration,
/// How long to wait for a response from centralized state sync
pub state_sync_external_timeout: Duration,
/// How long to wait for a response from p2p state sync
pub state_sync_p2p_timeout: Duration,
/// How long to wait between attempts to obtain a state part
pub state_sync_retry_timeout: Duration,
/// Minimum number of peers to start syncing.
pub min_num_peers: usize,
/// Period between logging summary information.
Expand Down Expand Up @@ -584,7 +596,9 @@ impl ClientConfig {
header_sync_initial_timeout: Duration::seconds(10),
header_sync_progress_timeout: Duration::seconds(2),
header_sync_stall_ban_timeout: Duration::seconds(30),
state_sync_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_external_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_p2p_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_retry_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
header_sync_expected_height_per_second: 1,
min_num_peers: 1,
log_summary_period: Duration::seconds(10),
Expand Down
3 changes: 2 additions & 1 deletion core/chain-configs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub use client_config::{
default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout,
default_log_summary_period, default_orphan_state_witness_max_size,
default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit,
default_state_sync_enabled, default_state_sync_timeout, default_sync_check_period,
default_state_sync_enabled, default_state_sync_external_timeout,
default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period,
default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period,
default_transaction_pool_size_limit, default_trie_viewer_state_size_limit,
default_tx_routing_height_horizon, default_view_client_threads,
Expand Down
4 changes: 2 additions & 2 deletions docs/misc/state_sync_from_external_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ your `config.json` file:
}
},
"consensus": {
"state_sync_timeout": {
"state_sync_external_timeout": {
"secs": 30,
"nanos": 0
}
Expand All @@ -72,7 +72,7 @@ shards that can be downloaded in parallel during state sync.
across all shards that can be downloaded in parallel during catchup. Generally,
this number should not be higher than `num_concurrent_requests`. Keep it
reasonably low to allow the node to process chunks of other shards.
* `consensus.state_sync_timeout` determines the max duration of an attempt to download a
* `consensus.state_sync_external_timeout` determines the max duration of an attempt to download a
state part. Setting it too low may cause too many unsuccessful attempts.

### Amazon S3
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ impl TestLoopBuilder {
let mut client_config = ClientConfig::test(true, 600, 2000, 4, is_archival, true, false);
client_config.max_block_wait_delay = Duration::seconds(6);
client_config.state_sync_enabled = true;
client_config.state_sync_timeout = Duration::milliseconds(100);
client_config.state_sync_external_timeout = Duration::milliseconds(100);
client_config.state_sync_p2p_timeout = Duration::milliseconds(100);
client_config.state_sync_retry_timeout = Duration::milliseconds(100);
if let Some(num_epochs) = self.gc_num_epochs_to_keep {
client_config.gc.gc_num_epochs_to_keep = num_epochs;
}
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,9 @@ fn sync_state_dump() {
near2.client_config.block_fetch_horizon = block_fetch_horizon;
near2.client_config.tracked_shards = vec![ShardId::new(0)]; // Track all shards.
near2.client_config.state_sync_enabled = true;
near2.client_config.state_sync_timeout = Duration::seconds(2);
near2.client_config.state_sync_external_timeout =
Duration::seconds(2);
near2.client_config.state_sync_p2p_timeout = Duration::seconds(2);
near2.client_config.state_sync.sync =
SyncConfig::ExternalStorage(ExternalStorageConfig {
location: Filesystem {
Expand Down
21 changes: 16 additions & 5 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use near_chain_configs::{
default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout,
default_log_summary_period, default_orphan_state_witness_max_size,
default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit,
default_state_sync_enabled, default_state_sync_timeout, default_sync_check_period,
default_state_sync_enabled, default_state_sync_external_timeout,
default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period,
default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period,
default_transaction_pool_size_limit, default_trie_viewer_state_size_limit,
default_tx_routing_height_horizon, default_view_client_threads,
Expand Down Expand Up @@ -155,9 +156,15 @@ pub struct Consensus {
#[serde(with = "near_async::time::serde_duration_as_std")]
pub header_sync_stall_ban_timeout: Duration,
/// How much to wait for a state sync response before re-requesting
#[serde(default = "default_state_sync_timeout")]
#[serde(default = "default_state_sync_external_timeout")]
#[serde(with = "near_async::time::serde_duration_as_std")]
pub state_sync_timeout: Duration,
pub state_sync_external_timeout: Duration,
#[serde(default = "default_state_sync_p2p_timeout")]
#[serde(with = "near_async::time::serde_duration_as_std")]
pub state_sync_p2p_timeout: Duration,
#[serde(default = "default_state_sync_retry_timeout")]
#[serde(with = "near_async::time::serde_duration_as_std")]
pub state_sync_retry_timeout: Duration,
/// Expected increase of header head weight per second during header sync
#[serde(default = "default_header_sync_expected_height_per_second")]
pub header_sync_expected_height_per_second: u64,
Expand Down Expand Up @@ -198,7 +205,9 @@ impl Default for Consensus {
header_sync_initial_timeout: default_header_sync_initial_timeout(),
header_sync_progress_timeout: default_header_sync_progress_timeout(),
header_sync_stall_ban_timeout: default_header_sync_stall_ban_timeout(),
state_sync_timeout: default_state_sync_timeout(),
state_sync_external_timeout: default_state_sync_external_timeout(),
state_sync_p2p_timeout: default_state_sync_p2p_timeout(),
state_sync_retry_timeout: default_state_sync_retry_timeout(),
header_sync_expected_height_per_second: default_header_sync_expected_height_per_second(
),
sync_check_period: default_sync_check_period(),
Expand Down Expand Up @@ -551,7 +560,9 @@ impl NearConfig {
header_sync_expected_height_per_second: config
.consensus
.header_sync_expected_height_per_second,
state_sync_timeout: config.consensus.state_sync_timeout,
state_sync_external_timeout: config.consensus.state_sync_external_timeout,
state_sync_p2p_timeout: config.consensus.state_sync_p2p_timeout,
state_sync_retry_timeout: config.consensus.state_sync_retry_timeout,
min_num_peers: config.consensus.min_num_peers,
log_summary_period: config.log_summary_period,
produce_empty_blocks: config.consensus.produce_empty_blocks,
Expand Down
8 changes: 5 additions & 3 deletions pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,11 @@ def apply_config_changes(node_dir: str,
'consensus.block_header_fetch_horizon',
'consensus.min_block_production_delay',
'consensus.max_block_production_delay',
'consensus.max_block_wait_delay', 'consensus.state_sync_timeout',
'expected_shutdown', 'log_summary_period', 'max_gas_burnt_view',
'rosetta_rpc', 'save_trie_changes', 'split_storage', 'state_sync',
'consensus.max_block_wait_delay',
'consensus.state_sync_external_timeout',
'consensus.state_sync_p2p_timeout', 'expected_shutdown',
'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc',
'save_trie_changes', 'split_storage', 'state_sync',
'state_sync_enabled', 'store.state_snapshot_enabled',
'store.state_snapshot_config.state_snapshot_type',
'tracked_shard_schedule', 'cold_store',
Expand Down
12 changes: 10 additions & 2 deletions pytest/lib/state_sync_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ def get_state_sync_configs_pair(tracked_shards=[0]):
"tracked_shards": [0], # Track all shards
}
config_sync = {
"consensus.state_sync_timeout": {
"consensus.state_sync_external_timeout": {
"secs": 0,
"nanos": 500000000
},
"consensus.state_sync_p2p_timeout": {
"secs": 0,
"nanos": 500000000
},
Expand All @@ -58,7 +62,11 @@ def get_state_sync_configs_pair(tracked_shards=[0]):
def get_state_sync_config_combined():
state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / "state_parts")
config = {
"consensus.state_sync_timeout": {
"consensus.state_sync_external_timeout": {
"secs": 0,
"nanos": 500000000
},
"consensus.state_sync_p2p_timeout": {
"secs": 0,
"nanos": 500000000
},
Expand Down
6 changes: 5 additions & 1 deletion pytest/tests/sanity/epoch_switches.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
"nanos": 0
},
"consensus": {
"state_sync_timeout": {
"state_sync_external_timeout": {
"secs": 0,
"nanos": 500000000
},
"state_sync_p2p_timeout": {
"secs": 0,
"nanos": 500000000
}
Expand Down
24 changes: 20 additions & 4 deletions pytest/tests/sanity/rpc_tx_forwarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
0: {
"tracked_shards": [],
"consensus": {
"state_sync_timeout": {
"state_sync_external_timeout": {
"secs": 2,
"nanos": 0
},
"state_sync_p2p_timeout": {
"secs": 2,
"nanos": 0
}
Expand All @@ -29,7 +33,11 @@
1: {
"tracked_shards": [],
"consensus": {
"state_sync_timeout": {
"state_sync_external_timeout": {
"secs": 2,
"nanos": 0
},
"state_sync_p2p_timeout": {
"secs": 2,
"nanos": 0
}
Expand All @@ -38,7 +46,11 @@
2: {
"tracked_shards": [],
"consensus": {
"state_sync_timeout": {
"state_sync_external_timeout": {
"secs": 2,
"nanos": 0
},
"state_sync_p2p_timeout": {
"secs": 2,
"nanos": 0
}
Expand All @@ -47,7 +59,11 @@
3: {
"tracked_shards": [0, 1, 2, 3],
"consensus": {
"state_sync_timeout": {
"state_sync_external_timeout": {
"secs": 2,
"nanos": 0
},
"state_sync_p2p_timeout": {
"secs": 2,
"nanos": 0
}
Expand Down
Loading

0 comments on commit f99bb96

Please sign in to comment.