Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: State dump creates dumps at the correct point #8794

Merged
merged 10 commits into from
Mar 27, 2023
27 changes: 18 additions & 9 deletions nearcore/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy:
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_obtain_part_elapsed_sec",
"Time needed to obtain a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_dump_num_parts_total",
Expand Down Expand Up @@ -91,3 +82,21 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy<IntGaugeVec> = Lazy::new(||
)
.unwrap()
});
pub static STATE_SYNC_APPLY_PART_DELAY: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_apply_part_delay_sec",
"Latency of applying a state part",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_obtain_part_delay_sec",
"Latency of applying a state part",
nikurt marked this conversation as resolved.
Show resolved Hide resolved
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
18 changes: 16 additions & 2 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,10 @@ impl RuntimeAdapter for NightshadeRuntime {
%block_hash,
num_parts = part_id.total)
.entered();
let _timer = metrics::STATE_SYNC_OBTAIN_PART_DELAY
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let epoch_id = self.get_epoch_id(block_hash)?;
let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, &epoch_id)?;
let trie = self.tries.get_view_trie_for_shard(shard_uid, *state_root);
Expand All @@ -1270,11 +1274,17 @@ impl RuntimeAdapter for NightshadeRuntime {
match Trie::validate_trie_nodes_for_part(state_root, part_id, trie_nodes) {
Ok(_) => true,
// Storage error should not happen
Err(_) => false,
Err(err) => {
tracing::error!(target: "state-parts", ?err, "State part storage error");
false
}
}
}
// Deserialization error means we've got the data from malicious peer
nikurt marked this conversation as resolved.
Show resolved Hide resolved
Err(_) => false,
Err(err) => {
tracing::error!(target: "state-parts", ?err, "State part deserialization error");
false
}
}
}

Expand Down Expand Up @@ -1371,6 +1381,10 @@ impl RuntimeAdapter for NightshadeRuntime {
data: &[u8],
epoch_id: &EpochId,
) -> Result<(), Error> {
let _timer = metrics::STATE_SYNC_APPLY_PART_DELAY
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let part = BorshDeserialize::try_from_slice(data)
.expect("Part was already validated earlier, so could never fail here");
let ApplyStatePartResult { trie_changes, flat_state_delta, contract_codes } =
Expand Down
41 changes: 20 additions & 21 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn state_sync_dump(
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let state_part = match get_state_part(
let state_part = match obtain_and_store_state_part(
&runtime,
&shard_id,
&sync_hash,
Expand Down Expand Up @@ -328,7 +328,8 @@ fn set_metrics(
}
}

fn get_state_part(
/// Obtains and then saves the part data.
fn obtain_and_store_state_part(
runtime: &Arc<NightshadeRuntime>,
shard_id: &ShardId,
sync_hash: &CryptoHash,
Expand All @@ -337,19 +338,13 @@ fn get_state_part(
num_parts: u64,
chain: &Chain,
) -> Result<Vec<u8>, Error> {
let state_part = {
let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
runtime.obtain_state_part(
*shard_id,
&sync_hash,
&state_root,
PartId::new(part_id, num_parts),
)?
};
let state_part = runtime.obtain_state_part(
*shard_id,
&sync_hash,
&state_root,
PartId::new(part_id, num_parts),
)?;

// Save the part data.
let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?;
let mut store_update = chain.store().store().store_update();
store_update.set(DBCol::StateParts, &key, &state_part);
Expand All @@ -368,13 +363,17 @@ fn start_dumping(
let epoch_info = runtime.get_epoch_info(&epoch_id)?;
let epoch_height = epoch_info.epoch_height();
let num_shards = runtime.num_shards(&epoch_id)?;
let sync_hash_block = chain.get_block(&sync_hash)?;
if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) {
assert_eq!(num_shards, sync_hash_block.chunks().len() as u64);
let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root();
let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?;
let sync_prev_header = chain.get_block_header(&sync_hash)?;
let sync_prev_hash = sync_prev_header.prev_hash();
let prev_sync_block = chain.get_block(&sync_prev_hash)?;
if runtime.cares_about_shard(None, prev_sync_block.header().prev_hash(), shard_id, false) {
assert_eq!(num_shards, prev_sync_block.chunks().len() as u64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this whole thing, did you consider just using Chain::get_state_response_part()? The logic around finding the right state roots and such is already implemented there, so you could just call that function in the main loop of state_sync_dump() if InProgress just stored the shard ID, part_id and sync_hash. Unless maybe I'm missing something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing suggestion. Done.

let state_root = prev_sync_block.chunks()[shard_id as usize].prev_state_root();
// See `get_state_response_header()` for reference.
let state_root_node =
runtime.get_state_root_node(shard_id, &sync_prev_hash, &state_root)?;
let num_parts = get_num_state_parts(state_root_node.memory_usage);
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch");
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch");
// Note that first the state of the state machines gets changes to
// `InProgress` and it starts dumping state after a short interval.
set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height));
Expand All @@ -387,7 +386,7 @@ fn start_dumping(
num_parts,
}))
} else {
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch");
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch");
Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) }))
}
}
Expand Down
Loading