-
Notifications
You must be signed in to change notification settings - Fork 632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: State Sync from External Storage #8789
feat: State Sync from External Storage #8789
Conversation
fc83762
to
262763f
Compare
Part of this PR was moved to #8794 |
I was assigned as code owner but I see already other engineers assigned to review as well. Please ping me if you want me to do the actual review (but be warned that I'm not familiar with this code). Otherwise, I'll assume there are already more capable minds looking at it. |
Before we move forward with this, I would just like to understand the motivations a bit better. What benefit do we get from implementing a periodic dump of state parts to a centralized place (AWS S3) over the existing periodic dump of the whole database? I understand that of course the size of just the state is going to be smaller than the size of the whole database, but is that a main pain point today that we want to get rid of? Also another difference of course is that here there will be some actual verification of the state parts stored on S3, but seems to me that if that's the main benefit we're going for, we could always do that on a snapshot of the whole database too. I just don't think I understand why we would upload/download state parts which are meant to be used in a decentralized syncing algorithm from/to a centralized store (S3). If we're already assuming a centralized place with paid-for ingress/egress as a temporary state sync mechanism, why bother with splitting it into parts and such rather than just spitting out the whole thing? I get that we're currently in the spot where state sync doesn't quite work, so many people rely on public snapshots on S3 anyway. But now instead the NEAR node itself is going to be aware of S3 buckets and regions and such and will explicitly call out to S3 as part of its normal activities. This feels like kind of an escalation in dependence on this centralized/closed technology over the situation where people choose to download a snapshot with a different tool (curl, wget, etc.) from a URL that happens to point to S3. If we implement state-sync-via-S3 in the node itself, now S3 is somewhat embedded into the protocol/normal operation of nodes in a way that we need to explicitly maintain in the code. Correct me if I'm wrong, but this seems to be intended as a sort of temporary solution before we implement another version of state sync that works better than the current one, is that right? With this kind of thing, I'd like to be make sure we really benefit from it before we merge it, because the long term costs of adding this as a temporary solution/workaround are not zero |
@marcelo-gonzalez I counted several key points in your comment. Feel free to also raise these questions in the design doc.
Partially yes. The whole DB dump is ~500GB, and a state dump is much smaller.
Not really. Even if we don't use the mechanism of state parts, and somehow ingest the state whole, it's also possible to validate the state. The root must match the known state root.
I'd say this is an open question. Was discussing the same topic with @Longarithm recently. If we use Flat Storage to create state parts, do we even need to reformat them as sub-tries? I suggest to keep the concept of state parts for now, and if we find that the state sync / catchup can be done more efficiently without them (without compromising trust), then we rework them.
Yes. This is a short-term solution, as mentioned here: https://near.zulipchat.com/#narrow/stream/297873-pagoda.2Fnode/topic/state.20sync.20network.20improvements/near/342065624 . Long-term bearing all the costs of ingress/egress is not optimal. We'll work on decentralizing state sync. Network Tier3 design is already thinking in that direction.
It's the least bad option available short-term. I acknowledge that removing S3 dependency from the code will be difficult. And I acknowledge that maintaining more than ways to get state parts increases maintenance costs. |
After discussions on the design doc, gave it another review here. sorry for the delays. I have not gotten the chance to debug this really, but when I tested this by reading/writing to tmp files instead of S3 (patch below), it seems to work fine on the first run, but then when I run it again it does something strange. So the set up is, I have a localnet running with nearup, where one of them has I run it once (after letting the localnet run for long enough to get state sync to be triggered), and then after it's synced press ^C. Then run it again after waiting for long enough to get state sync again, and I see that the log lines that say
So each part and shard pair is getting fetched and downloaded multiple times, which seems like a bug. And once I got:
Let me know if you arent able to reproduce and I can try doing it again and debugging further. Patch I used to test: diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs
index 49dd1ba29..45ce038c8 100644
--- a/chain/client/src/sync/state.rs
+++ b/chain/client/src/sync/state.rs
@@ -710,6 +710,7 @@ impl StateSync {
Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage));
let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts);
+ let path = std::path::Path::new("/tmp/state-dump").join(location.clone());
let download_response = download.response.clone();
let scheduled = StaticClock::utc();
near_performance_metrics::actix::spawn(std::any::type_name::<Self>(), {
@@ -723,7 +724,8 @@ impl StateSync {
as f64
/ 1e9,
);
- let result = bucket.get_object(location.clone()).await;
+ // let result = bucket.get_object(location.clone()).await;
+ let result = std::fs::read(&path);
let completed = StaticClock::utc();
finished_request(&requests_remaining);
metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY
@@ -735,9 +737,9 @@ impl StateSync {
);
match result {
Ok(response) => {
- tracing::info!(target: "sync", %shard_id, part_id, location, response_code = response.status_code(), num_bytes = response.bytes().len(), "S3 request finished");
+ tracing::info!(target: "sync", %shard_id, part_id, location, response_code = 200, num_bytes = response.len(), "S3 request finished");
let mut lock = download_response.lock().unwrap();
- *lock = Some(Ok((response.status_code(), response.bytes().to_vec())));
+ *lock = Some(Ok((200, response)));
}
Err(err) => {
tracing::info!(target: "sync", %shard_id, part_id, location, ?err, "S3 request failed");
@@ -974,7 +976,7 @@ impl StateSync {
let num_parts = shard_sync_download.downloads.len();
let mut num_parts_done = 0;
for (part_id, part_download) in shard_sync_download.downloads.iter_mut().enumerate() {
- tracing::debug!(target: "sync", %shard_id, %sync_hash, part_id, part_download.done, part_download.error, ?part_download);
+ tracing::debug!(target: "sync", %shard_id, %sync_hash, part_id, part_download.done, part_download.error);
if !part_download.done {
// Check if a download from an external storage is finished.
check_external_storage_part_response(
diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs
index cf680c9d9..fbd4a73b7 100644
--- a/nearcore/src/state_sync.rs
+++ b/nearcore/src/state_sync.rs
@@ -12,6 +12,7 @@ use near_primitives::syncing::{get_num_state_parts, StatePartKey, StateSyncDumpP
use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot};
use near_store::DBCol;
use std::sync::Arc;
+use std::path::Path;
/// Starts one a thread per tracked shard.
/// Each started thread will be dumping state parts of a single epoch to external storage.
@@ -36,16 +37,16 @@ pub fn spawn_state_sync_dump(
let s3_region = config.client_config.state_sync_s3_region.clone();
// Credentials to establish a connection are taken from environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
- let bucket = s3::Bucket::new(
- &s3_bucket,
- s3_region
- .parse::<s3::Region>()
- .map_err(|err| <std::str::Utf8Error as Into<anyhow::Error>>::into(err))?,
- s3::creds::Credentials::default().map_err(|err| {
- tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?");
- <s3::creds::error::CredentialsError as Into<anyhow::Error>>::into(err)
- })?,
- ).map_err(|err| <s3::error::S3Error as Into<anyhow::Error>>::into(err))?;
+ // let bucket = s3::Bucket::new(
+ // &s3_bucket,
+ // s3_region
+ // .parse::<s3::Region>()
+ // .map_err(|err| <std::str::Utf8Error as Into<anyhow::Error>>::into(err))?,
+ // s3::creds::Credentials::default().map_err(|err| {
+ // tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?");
+ // <s3::creds::error::CredentialsError as Into<anyhow::Error>>::into(err)
+ // })?,
+ // ).map_err(|err| <s3::error::S3Error as Into<anyhow::Error>>::into(err))?;
// Determine how many threads to start.
// TODO: Handle the case of changing the shard layout.
@@ -80,7 +81,7 @@ pub fn spawn_state_sync_dump(
chain,
runtime,
client_config,
- bucket.clone(),
+ Path::new("/tmp/state-dump/"),
node_key.clone(),
)));
arbiter_handle
@@ -115,7 +116,7 @@ async fn state_sync_dump(
chain: Chain,
runtime: Arc<NightshadeRuntime>,
config: ClientConfig,
- bucket: s3::Bucket,
+ path: &Path,
_node_key: PublicKey,
) {
tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop");
@@ -202,7 +203,7 @@ async fn state_sync_dump(
num_parts,
);
if let Err(err) =
- put_state_part(&location, &state_part, &shard_id, &bucket).await
+ put_state_part(&location, &state_part, &shard_id, path).await
{
res = Some(err);
break;
@@ -261,17 +262,19 @@ async fn put_state_part(
location: &str,
state_part: &[u8],
shard_id: &ShardId,
- bucket: &s3::Bucket,
-) -> Result<s3::request_trait::ResponseData, Error> {
+ path: &Path,
+) -> Result<(), Error> {
let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
- let put = bucket
- .put_object(&location, &state_part)
- .await
- .map_err(|err| Error::Other(err.to_string()));
+ let path = path.join(location);
+ let dir = path.parent().unwrap();
+ std::fs::create_dir_all(dir).unwrap();
+ use std::io::Write;
+ let mut file = std::fs::OpenOptions::new().write(true).create(true).open(&path).unwrap();
+ file.write_all(state_part).unwrap();
tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3");
- put
+ Ok(())
}
fn update_progress( |
4117d87
to
16d2087
Compare
couple more things: Do we want to make the downloading code aware of S3? If we made it a generic HTTP GET instead, it would allow for any backing centralized store. I think as it is right now, this is going to be relatively difficult to test (which is why I had to patch this change to test it now). If we want to merge this so that work on sharding can continue, then even testing that functionality will be difficult when S3 is a hard requirement. Just the developer experience of trying to use this in a test chain could be kind of a pain Also, I know I already mentioned this and you responded to it, but I still really don't understand why we need state parts if we're storing on S3. The whole purpose of splitting it is to split load in a decentralized protocol right? When we're downloading/uploading to S3, amazon takes care of this. This diff below (on top of the test patch to dump to tmp files instead of S3) works just fine (maybe apart from the bug I mentioned above). Plus the total size of the dump is smaller than the sum of the sizes of the state parts, which would save on storage and egress costs. Of course this diff isn't a suggestion for how exactly to implement it, since to do it right, we'd want to refactor it more extensively, and maybe rework the S3 object names diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs
index ac9a77d2c..9a097eacc 100644
--- a/chain/client-primitives/src/types.rs
+++ b/chain/client-primitives/src/types.rs
@@ -219,6 +219,13 @@ impl ShardSyncDownload {
}
Self { downloads, status: ShardSyncStatus::StateDownloadParts }
}
+
+ pub fn new_download_state_parts_external(now: DateTime<Utc>) -> Self {
+ Self {
+ downloads: vec![DownloadStatus::new(now)],
+ status: ShardSyncStatus::StateDownloadParts,
+ }
+ }
}
/// Various status sync can be in, whether it's fast sync or archival.
diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs
index 9a7485094..3b68ede3c 100644
--- a/chain/client/src/sync/state.rs
+++ b/chain/client/src/sync/state.rs
@@ -647,17 +647,11 @@ impl StateSync {
let epoch_info = chain.runtime_adapter.get_epoch_info(epoch_id).unwrap();
let epoch_height = epoch_info.epoch_height();
- let shard_state_header = chain.get_state_header(shard_id, sync_hash).unwrap();
- let state_num_parts =
- get_num_state_parts(shard_state_header.state_root_node().memory_usage);
-
for (part_id, download) in parts_to_fetch(new_shard_sync_download) {
request_part_from_external_storage(
- part_id,
download,
shard_id,
epoch_height,
- state_num_parts,
&chain_id.clone(),
bucket.clone(),
requests_remaining.clone(),
@@ -812,12 +806,17 @@ impl StateSync {
// StateDownloadHeader is the first step. We want to fetch the basic information about the state (its size, hash etc).
if shard_sync_download.downloads[0].done {
let shard_state_header = chain.get_state_header(shard_id.clone(), sync_hash)?;
- let state_num_parts =
- get_num_state_parts(shard_state_header.state_root_node().memory_usage);
// If the header was downloaded successfully - move to phase 2 (downloading parts).
// Create the vector with entry for each part.
- *shard_sync_download =
- ShardSyncDownload::new_download_state_parts(now, state_num_parts);
+ *shard_sync_download = match &self.inner {
+ StateSyncInner::Peers { .. } => ShardSyncDownload::new_download_state_parts(
+ now,
+ get_num_state_parts(shard_state_header.state_root_node().memory_usage),
+ ),
+ StateSyncInner::PartsFromExternal { .. } => {
+ ShardSyncDownload::new_download_state_parts_external(now)
+ }
+ };
need_shard = true;
} else {
let prev = shard_sync_download.downloads[0].prev_update_time;
@@ -917,8 +916,12 @@ impl StateSync {
state_parts_task_scheduler: &dyn Fn(ApplyStatePartsRequest),
) -> Result<(), near_chain::Error> {
let shard_state_header = chain.get_state_header(shard_id, sync_hash)?;
- let state_num_parts =
- get_num_state_parts(shard_state_header.state_root_node().memory_usage);
+ let state_num_parts = match &self.inner {
+ StateSyncInner::Peers { .. } => {
+ get_num_state_parts(shard_state_header.state_root_node().memory_usage)
+ }
+ StateSyncInner::PartsFromExternal { .. } => 1,
+ };
// Now apply all the parts to the chain / runtime.
// TODO: not sure why this has to happen only after all the parts were downloaded -
// as we could have done this in parallel after getting each part.
@@ -974,8 +977,12 @@ impl StateSync {
tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "State sync finalizing error");
*shard_sync_download = ShardSyncDownload::new_download_state_header(now);
let shard_state_header = chain.get_state_header(shard_id, sync_hash)?;
- let state_num_parts =
- get_num_state_parts(shard_state_header.state_root_node().memory_usage);
+ let state_num_parts = match &self.inner {
+ StateSyncInner::Peers { .. } => {
+ get_num_state_parts(shard_state_header.state_root_node().memory_usage)
+ }
+ StateSyncInner::PartsFromExternal { .. } => 1,
+ };
chain.clear_downloaded_parts(shard_id, sync_hash, state_num_parts)?;
}
}
@@ -992,8 +999,12 @@ impl StateSync {
chain: &mut Chain,
) -> Result<bool, near_chain::Error> {
let shard_state_header = chain.get_state_header(shard_id, sync_hash)?;
- let state_num_parts =
- get_num_state_parts(shard_state_header.state_root_node().memory_usage);
+ let state_num_parts = match &self.inner {
+ StateSyncInner::Peers { .. } => {
+ get_num_state_parts(shard_state_header.state_root_node().memory_usage)
+ }
+ StateSyncInner::PartsFromExternal { .. } => 1,
+ };
chain.clear_downloaded_parts(shard_id, sync_hash, state_num_parts)?;
let mut this_done = false;
@@ -1070,11 +1081,9 @@ fn parts_to_fetch(
/// Starts an asynchronous network request to external storage to fetch the given state part.
fn request_part_from_external_storage(
- part_id: u64,
download: &mut DownloadStatus,
shard_id: ShardId,
epoch_height: EpochHeight,
- num_parts: u64,
chain_id: &str,
bucket: Arc<s3::Bucket>,
requests_remaining: Arc<AtomicI64>,
@@ -1083,7 +1092,7 @@ fn request_part_from_external_storage(
return;
} else {
if !download.run_me.swap(false, Ordering::SeqCst) {
- tracing::info!(target: "sync", %shard_id, part_id, "run_me is already false");
+ tracing::info!(target: "sync", %shard_id, "run_me is already false");
return;
}
}
@@ -1091,13 +1100,13 @@ fn request_part_from_external_storage(
download.last_target =
Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage));
- let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts);
+ let location = s3_location(chain_id, epoch_height, shard_id, 0, 1);
let path = std::path::Path::new("/tmp/state-dump").join(location.clone());
let download_response = download.response.clone();
let scheduled = StaticClock::utc();
near_performance_metrics::actix::spawn("StateSync", {
async move {
- tracing::info!(target: "sync", %shard_id, part_id, location, "Getting an object from the external storage");
+ tracing::info!(target: "sync", %shard_id, location, "Getting an object from the external storage");
let started = StaticClock::utc();
metrics::STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY
.with_label_values(&[&shard_id.to_string()])
@@ -1117,12 +1126,12 @@ fn request_part_from_external_storage(
);
match result {
Ok(response) => {
- tracing::info!(target: "sync", %shard_id, part_id, location, response_code = 200, num_bytes = response.len(), "S3 request finished");
+ tracing::info!(target: "sync", %shard_id, location, response_code = 200, num_bytes = response.len(), "S3 request finished");
let mut lock = download_response.lock().unwrap();
*lock = Some(Ok((200, response)));
}
Err(err) => {
- tracing::info!(target: "sync", %shard_id, part_id, location, ?err, "S3 request failed");
+ tracing::info!(target: "sync", %shard_id, location, ?err, "S3 request failed");
let mut lock = download_response.lock().unwrap();
*lock = Some(Err(err.to_string()));
}
diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs
index 8b82929fc..5e782f0e1 100644
--- a/nearcore/src/state_sync.rs
+++ b/nearcore/src/state_sync.rs
@@ -163,8 +163,7 @@ async fn state_sync_dump(
match state_header {
Ok(state_header) => {
let state_root = state_header.chunk_prev_state_root();
- let num_parts =
- get_num_state_parts(state_header.state_root_node().memory_usage);
+ let num_parts = 1;
let mut res = None;
// The actual dumping of state to S3.
@@ -373,7 +372,7 @@ fn start_dumping(
let sync_prev_hash = sync_prev_header.hash();
let state_header = chain.get_state_response_header(shard_id, sync_hash)?;
- let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage);
+ let num_parts = 1;
if runtime.cares_about_shard(None, sync_prev_hash, shard_id, false) {
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch");
// Note that first the state of the state machines gets changes to
|
I was thinking of adding an external storage of local filesystem. That would be easy to test. Similar to your previous patch suggestion. The S3-specific part of the code will remain untested, but that is supposedly a small enough part that can be tested by an integration test reading from a real S3 bucket. HTTP GET is easier to test than S3, but still not easy enough. I wish |
I see your point, but I'd prefer this PR to solve one problem. Let's introduce the S3 mechanism now, and rework state parts separately. Restricting state parts to just 1, also needs to be carefully considered. One of the drawbacks is that it prevents the dump process from running in parallel on multiple machines. |
:( The issue was fixed and then un-fixed during one of the rebases nikurt@9660b2f#diff-5992f2a2442bd26f4474740f563ec5bb1ca3d2912c02799a8352d731b33e84d9L984 The issue was that You are right, that this new functionality needs to be tested. Will follow up with a PR that adds the functionality to read/write to local filesystem and will use that to add tests. |
yeah that makes sense
Is that actually a problem though? We have like 12 hours in each epoch to dump the state. So is one machine per shard not enough? |
Yes, that's a problem:
|
67c6c0b
to
0855002
Compare
* Adds functionality to get state parts as files from S3 * Fixes an off-by-one-block error in state dumping to S3 * * In State Dump * * In state-viewer * Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()` * New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
* Adds functionality to get state parts as files from S3 * Fixes an off-by-one-block error in state dumping to S3 * * In State Dump * * In state-viewer * Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()` * New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
* Adds functionality to get state parts as files from S3 * Fixes an off-by-one-block error in state dumping to S3 * * In State Dump * * In state-viewer * Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()` * New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
* Adds functionality to get state parts as files from S3 * Fixes an off-by-one-block error in state dumping to S3 * * In State Dump * * In state-viewer * Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()` * New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
* Adds functionality to get state parts as files from S3 * Fixes an off-by-one-block error in state dumping to S3 * * In State Dump * * In state-viewer * Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()` * New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
fn apply_state_part()
andfn obtain_state_part()
neard view-state state-parts read-state-header
to read state header stored in the DB.