Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: State Sync from External Storage #8789

Merged
merged 89 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
0039c20
feat: Dump state to S3
nikurt Feb 28, 2023
e9cb32c
fix
nikurt Feb 28, 2023
95ff6dc
Metrics
nikurt Mar 1, 2023
c3a0112
Metrics
nikurt Mar 1, 2023
85a8ded
Metrics
nikurt Mar 1, 2023
4f0d69c
Metrics
nikurt Mar 1, 2023
556248f
Add config options, change state part file naming - include num_parts.
nikurt Mar 1, 2023
b15dd00
Add config options, change state part file naming - include num_parts.
nikurt Mar 1, 2023
3e17aba
Add config options, change state part file naming - include num_parts.
nikurt Mar 1, 2023
8105005
Refactor state syncing to avoid very long functions.
nikurt Mar 2, 2023
cb2731e
Syncing works
nikurt Mar 8, 2023
98cd7cf
Metrics
nikurt Mar 9, 2023
712ca55
Fix
nikurt Mar 9, 2023
bc953ea
Fields are not optional
nikurt Mar 9, 2023
61671f5
.
nikurt Mar 13, 2023
0667d8b
Throttling of requests now works! :pray:
nikurt Mar 13, 2023
0063fa8
Too much output
nikurt Mar 14, 2023
f9b7f24
Timeouts
nikurt Mar 14, 2023
2195391
Use colour for state sync phase display
nikurt Mar 14, 2023
8b1f311
Fix DB corruption
nikurt Mar 15, 2023
b224412
feat: use split store in the view client (#8656)
wacban Mar 2, 2023
5846e1a
feat: Dump state of every epoch to S3 (#8661)
nikurt Mar 10, 2023
4cb5139
feat: Dump state to S3
nikurt Feb 28, 2023
afcce1b
Merge
nikurt Mar 15, 2023
8408ce7
Fix: Rework the state-parts tool API
nikurt Mar 15, 2023
c61ff2a
Merge
nikurt Mar 15, 2023
b682424
Print state_root
nikurt Mar 16, 2023
4f43f8c
Print state_root
nikurt Mar 16, 2023
161a357
Changelog
nikurt Mar 20, 2023
020d7f9
Merge
nikurt Mar 20, 2023
4910fcc
Merge
nikurt Mar 20, 2023
c2fa4a0
Merge
nikurt Mar 20, 2023
14e202f
Handle partial state parts dumps
nikurt Mar 20, 2023
0da5f3b
More output
nikurt Mar 20, 2023
610e820
More output
nikurt Mar 20, 2023
296dda3
Fix off-by-one error in the state-part tool.
nikurt Mar 20, 2023
11e965a
fmt
nikurt Mar 20, 2023
95c1ad5
fmt
nikurt Mar 20, 2023
dc89915
fmt
nikurt Mar 20, 2023
f14e8aa
read-state-header
nikurt Mar 20, 2023
15d1fbb
Dump correct state root in state dump
nikurt Mar 20, 2023
f390b19
Merge
nikurt Mar 20, 2023
89134c5
Don't retry S3 errors until the timeout
nikurt Mar 21, 2023
5a1acff
Tune verbosity
nikurt Mar 21, 2023
4a9aa86
Timing
nikurt Mar 21, 2023
56dc868
S3 downloads should now work.
nikurt Mar 21, 2023
74773e0
Metrics for part application.
nikurt Mar 21, 2023
81d511f
Metrics for part application.
nikurt Mar 21, 2023
6db2638
Metrics for part application.
nikurt Mar 21, 2023
51dbd55
More metrics for part download
nikurt Mar 22, 2023
6022323
Ready for review
nikurt Mar 23, 2023
cca8bdc
Rename config field
nikurt Mar 23, 2023
b800bdc
Fix retry metric
nikurt Mar 23, 2023
1cd3a72
Minimize
nikurt Mar 23, 2023
b319c51
fix: apply `log_summary_style` to state sync phase message (#8735)
nikurt Mar 21, 2023
6b09e0d
[Runtime Epoch Split] (3/n) Add ability to get Arc<EpochManagerAdapte…
robin-near Mar 22, 2023
fdaa79e
feat: Disable state sync by default because it's unreliable (#8730)
nikurt Mar 23, 2023
89297d4
Merge
nikurt Mar 23, 2023
699954f
Merge
nikurt Mar 23, 2023
1244ac3
Merge
nikurt Mar 23, 2023
ad7ded8
Merge
nikurt Mar 23, 2023
6691bf7
Doc
nikurt Mar 23, 2023
1cc6f05
Fix test
nikurt Mar 23, 2023
0ad29cd
Fix test
nikurt Mar 24, 2023
438e343
Move some code to #8794
nikurt Mar 24, 2023
ea0b82a
chore: update changelog with 1.32.1 and 1.32.2 (#8869)
wacban Apr 3, 2023
e3baa8e
feat: override NetworkConfig from JSON config (#8871)
VanBarbascu Apr 4, 2023
5a0a4fb
feat: Dump state to S3
nikurt Feb 28, 2023
526225d
Fix DB corruption
nikurt Mar 15, 2023
f5d9545
Merge
nikurt Mar 15, 2023
cc49285
Changelog
nikurt Mar 20, 2023
8067add
More output
nikurt Mar 20, 2023
4c53eac
Merge
nikurt Apr 5, 2023
cd34b4a
Merge
nikurt Apr 5, 2023
8f2c9b4
.
nikurt Apr 5, 2023
faa8a10
Merge
nikurt Apr 5, 2023
cfce87c
Introduce StateSyncInner instead of StateSyncMode
nikurt Apr 6, 2023
e71a0ea
merge
nikurt Apr 6, 2023
b01050f
merge
nikurt Apr 6, 2023
b61749a
Re-fixed. Persist `new_shard_sync` when S3 download completes.
nikurt Apr 11, 2023
594cafa
.
nikurt Apr 11, 2023
a9debc5
Remove unneeded enum value and simplify target sampling.
nikurt Apr 11, 2023
00f70d4
Remove `unwrap()`s when creating an S3 bucket.
nikurt Apr 11, 2023
0855002
Merge
nikurt Apr 13, 2023
b09200c
Merge
nikurt Apr 13, 2023
27423e5
Merge
Ekleog-NEAR Apr 13, 2023
2a7454f
re-push
nikurt Apr 13, 2023
b29d2b3
fix
nikurt Apr 14, 2023
6500c42
Merge branch 'master' into nikurt-state-syncing-from-s3
nikurt Apr 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Non-protocol Changes

* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)

## 1.33.0

### Protocol Changes
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3139,7 +3139,10 @@ impl Chain {
let state_root = *chunk.take_header().take_inner().prev_state_root();
if !self.runtime_adapter.validate_state_part(&state_root, part_id, data) {
byzantine_assert!(false);
return Err(Error::Other("set_state_part failed: validate_state_part failed".into()));
return Err(Error::Other(format!(
"set_state_part failed: validate_state_part failed. state_root={:?}",
state_root
)));
}

// Saving the part data.
Expand Down
68 changes: 53 additions & 15 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use near_primitives::views::{StatusResponse, StatusSyncInfo};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};

/// Combines errors coming from chain, tx pool and block producer.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -50,6 +50,7 @@ pub enum AccountOrPeerIdOrHash {
AccountId(AccountId),
PeerId(PeerId),
Hash(CryptoHash),
ExternalStorage,
}

#[derive(Debug, serde::Serialize)]
Expand All @@ -61,18 +62,39 @@ pub struct DownloadStatus {
pub done: bool,
pub state_requests_count: u64,
pub last_target: Option<AccountOrPeerIdOrHash>,
#[serde(skip_serializing, skip_deserializing)]
pub response: Arc<Mutex<Option<Result<(u16, Vec<u8>), String>>>>,
}

impl DownloadStatus {
pub fn new(now: DateTime<Utc>) -> Self {
Self {
start_time: now,
prev_update_time: now,
run_me: Arc::new(AtomicBool::new(true)),
error: false,
done: false,
state_requests_count: 0,
last_target: None,
response: Arc::new(Mutex::new(None)),
}
}
}

impl Clone for DownloadStatus {
/// Clones an object, but it clones the value of `run_me` instead of the
/// `Arc` that wraps that value.
fn clone(&self) -> Self {
DownloadStatus {
start_time: self.start_time,
prev_update_time: self.prev_update_time,
// Creates a new `Arc` holding the same value.
run_me: Arc::new(AtomicBool::new(self.run_me.load(Ordering::SeqCst))),
error: self.error,
done: self.done,
state_requests_count: self.state_requests_count,
last_target: self.last_target.clone(),
response: self.response.clone(),
}
}
}
Expand All @@ -90,6 +112,21 @@ pub enum ShardSyncStatus {
StateSyncDone,
}

impl ShardSyncStatus {
pub fn repr(&self) -> u8 {
match self {
ShardSyncStatus::StateDownloadHeader => 0,
ShardSyncStatus::StateDownloadParts => 1,
ShardSyncStatus::StateDownloadScheduling => 2,
ShardSyncStatus::StateDownloadApplying => 3,
ShardSyncStatus::StateDownloadComplete => 4,
ShardSyncStatus::StateSplitScheduling => 5,
ShardSyncStatus::StateSplitApplying(_) => 6,
ShardSyncStatus::StateSyncDone => 7,
}
}
}

/// Manually implement compare for ShardSyncStatus to compare only based on variant name
impl PartialEq<Self> for ShardSyncStatus {
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -164,25 +201,26 @@ pub struct ShardSyncDownload {
}

impl ShardSyncDownload {
/// Creates a instance of self which includes initial statuses for shard sync and download at the given time.
pub fn new(now: DateTime<Utc>) -> Self {
/// Creates a instance of self which includes initial statuses for shard state header download at the given time.
pub fn new_download_state_header(now: DateTime<Utc>) -> Self {
Self {
downloads: vec![
DownloadStatus {
start_time: now,
prev_update_time: now,
run_me: Arc::new(AtomicBool::new(true)),
error: false,
done: false,
state_requests_count: 0,
last_target: None,
};
1
],
downloads: vec![DownloadStatus::new(now)],
status: ShardSyncStatus::StateDownloadHeader,
}
}

/// Creates a instance of self which includes initial statuses for shard state parts download at the given time.
pub fn new_download_state_parts(now: DateTime<Utc>, num_parts: u64) -> Self {
// Avoid using `vec![x; num_parts]`, because each element needs to have
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why Clone was implemented by hand for this struct though right? Where the run_me field is explicitly reallocated and copied. So could just do the same in the Clone impl for the response field too. Don't have an opinion really on which way is more clear, but could consider doing that

// its own independent value of `response`.
let mut downloads = Vec::with_capacity(num_parts as usize);
for _ in 0..num_parts {
downloads.push(DownloadStatus::new(now));
}
Self { downloads, status: ShardSyncStatus::StateDownloadParts }
}
}

/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Clone, Debug, strum::AsRefStr)]
pub enum SyncStatus {
Expand Down
1 change: 1 addition & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
reed-solomon-erasure.workspace = true
rust-s3.workspace = true
serde_json.workspace = true
strum.workspace = true
sysinfo.workspace = true
Expand Down
20 changes: 18 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,15 @@ impl Client {
config.archive,
config.state_sync_enabled,
);
let state_sync = StateSync::new(network_adapter.clone(), config.state_sync_timeout);
let state_sync = StateSync::new(
network_adapter.clone(),
config.state_sync_timeout,
&config.chain_id,
config.state_sync_from_s3_enabled,
&config.state_sync_s3_bucket,
&config.state_sync_s3_region,
config.state_sync_num_concurrent_s3_requests,
);
let num_block_producer_seats = config.num_block_producer_seats as usize;
let data_parts = runtime_adapter.num_data_parts();
let parity_parts = runtime_adapter.num_total_parts() - data_parts;
Expand Down Expand Up @@ -2121,7 +2129,15 @@ impl Client {
let (state_sync, new_shard_sync, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
(
StateSync::new(network_adapter1, state_sync_timeout),
StateSync::new(
network_adapter1,
state_sync_timeout,
&self.config.chain_id,
self.config.state_sync_from_s3_enabled,
&self.config.state_sync_s3_bucket,
&self.config.state_sync_s3_region,
self.config.state_sync_num_concurrent_s3_requests,
),
new_shard_sync,
BlocksCatchUpState::new(sync_hash, epoch_id),
)
Expand Down
97 changes: 97 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,100 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version)
])
.inc();
}

pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_stage",
"Stage of state sync per shard",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
"Number of parts downloaded",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_per_shard",
"Number of parts that need to be downloaded for the shard",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_scheduling_delay_sec",
"Delay for a request for parts from an external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
&["shard_id"],
)
.unwrap()
});
Loading