Skip to content

Commit

Permalink
Move some code to near#8794
Browse files Browse the repository at this point in the history
  • Loading branch information
nikurt committed Mar 24, 2023
1 parent 27e64f7 commit 3ae1be3
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 194 deletions.
27 changes: 9 additions & 18 deletions nearcore/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy:
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_obtain_part_elapsed_sec",
"Time needed to obtain a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_dump_num_parts_total",
Expand Down Expand Up @@ -82,21 +91,3 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy<IntGaugeVec> = Lazy::new(||
)
.unwrap()
});
pub static STATE_SYNC_APPLY_PART_DELAY: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_apply_part_delay_sec",
"Latency of applying a state part",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_obtain_part_delay_sec",
"Latency of applying a state part",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
18 changes: 2 additions & 16 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,10 +1246,6 @@ impl RuntimeAdapter for NightshadeRuntime {
%block_hash,
num_parts = part_id.total)
.entered();
let _timer = metrics::STATE_SYNC_OBTAIN_PART_DELAY
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let epoch_id = self.get_epoch_id(block_hash)?;
let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, &epoch_id)?;
let trie = self.tries.get_view_trie_for_shard(shard_uid, *state_root);
Expand All @@ -1274,17 +1270,11 @@ impl RuntimeAdapter for NightshadeRuntime {
match Trie::validate_trie_nodes_for_part(state_root, part_id, trie_nodes) {
Ok(_) => true,
// Storage error should not happen
Err(err) => {
tracing::error!(target: "state-parts", ?err, "State part storage error");
false
}
Err(_) => false,
}
}
// Deserialization error means we've got the data from malicious peer
Err(err) => {
tracing::error!(target: "state-parts", ?err, "State part deserialization error");
false
}
Err(_) => false,
}
}

Expand Down Expand Up @@ -1381,10 +1371,6 @@ impl RuntimeAdapter for NightshadeRuntime {
data: &[u8],
epoch_id: &EpochId,
) -> Result<(), Error> {
let _timer = metrics::STATE_SYNC_APPLY_PART_DELAY
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let part = BorshDeserialize::try_from_slice(data)
.expect("Part was already validated earlier, so could never fail here");
let ApplyStatePartResult { trie_changes, flat_state_delta, contract_codes } =
Expand Down
56 changes: 35 additions & 21 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use borsh::BorshSerialize;
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error};
use near_chain_configs::ClientConfig;
use near_client::sync::state::{s3_location, StateSync};
use near_client::sync::state::StateSync;
use near_crypto::PublicKey;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::hash::CryptoHash;
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn state_sync_dump(
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let state_part = match obtain_and_store_state_part(
let state_part = match get_state_part(
&runtime,
&shard_id,
&sync_hash,
Expand Down Expand Up @@ -328,8 +328,7 @@ fn set_metrics(
}
}

/// Obtains and then saves the part data.
fn obtain_and_store_state_part(
fn get_state_part(
runtime: &Arc<NightshadeRuntime>,
shard_id: &ShardId,
sync_hash: &CryptoHash,
Expand All @@ -338,13 +337,19 @@ fn obtain_and_store_state_part(
num_parts: u64,
chain: &Chain,
) -> Result<Vec<u8>, Error> {
let state_part = runtime.obtain_state_part(
*shard_id,
&sync_hash,
&state_root,
PartId::new(part_id, num_parts),
)?;
let state_part = {
let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
runtime.obtain_state_part(
*shard_id,
&sync_hash,
&state_root,
PartId::new(part_id, num_parts),
)?
};

// Save the part data.
let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?;
let mut store_update = chain.store().store().store_update();
store_update.set(DBCol::StateParts, &key, &state_part);
Expand All @@ -363,17 +368,13 @@ fn start_dumping(
let epoch_info = runtime.get_epoch_info(&epoch_id)?;
let epoch_height = epoch_info.epoch_height();
let num_shards = runtime.num_shards(&epoch_id)?;
let sync_prev_header = chain.get_block_header(&sync_hash)?;
let sync_prev_hash = sync_prev_header.prev_hash();
let prev_sync_block = chain.get_block(&sync_prev_hash)?;
if runtime.cares_about_shard(None, prev_sync_block.header().prev_hash(), shard_id, false) {
assert_eq!(num_shards, prev_sync_block.chunks().len() as u64);
let state_root = prev_sync_block.chunks()[shard_id as usize].prev_state_root();
// See `get_state_response_header()` for reference.
let state_root_node =
runtime.get_state_root_node(shard_id, &sync_prev_hash, &state_root)?;
let sync_hash_block = chain.get_block(&sync_hash)?;
if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) {
assert_eq!(num_shards, sync_hash_block.chunks().len() as u64);
let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root();
let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?;
let num_parts = get_num_state_parts(state_root_node.memory_usage);
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch");
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch");
// Note that first the state of the state machines gets changes to
// `InProgress` and it starts dumping state after a short interval.
set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height));
Expand All @@ -386,7 +387,7 @@ fn start_dumping(
num_parts,
}))
} else {
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch");
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch");
Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) }))
}
}
Expand Down Expand Up @@ -421,3 +422,16 @@ fn check_new_epoch(
}
}
}

fn s3_location(
chain_id: &str,
epoch_height: u64,
shard_id: u64,
part_id: u64,
num_parts: u64,
) -> String {
format!(
"chain_id={}/epoch_height={}/shard_id={}/state_part_{:06}_of_{:06}",
chain_id, epoch_height, shard_id, part_id, num_parts
)
}
Loading

0 comments on commit 3ae1be3

Please sign in to comment.