Skip to content

Commit

Permalink
fix: State dump creates dumps at the correct point (near#8794)
Browse files Browse the repository at this point in the history
The dump needs to be of the state at the beginning of the last block of the previous epoch.

Add latency metrics to functions `fn obtain_state_part()` and `fn apply_state_part()`.

Improves debugging to be more verbose about which state root is used for dumping and application.

Add a command `neard view-state state-parts read-state-header` to read a state header from the DB.
  • Loading branch information
nikurt committed Apr 5, 2023
1 parent 5dbd875 commit 44f8661
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 219 deletions.
27 changes: 20 additions & 7 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2699,17 +2699,12 @@ impl Chain {
)
}

pub fn get_state_response_header(
/// Computes ShardStateSyncResponseHeader.
pub fn compute_state_response_header(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
) -> Result<ShardStateSyncResponseHeader, Error> {
// Check cache
let key = StateHeaderKey(shard_id, sync_hash).try_to_vec()?;
if let Ok(Some(header)) = self.store.store().get_ser(DBCol::StateHeaders, &key) {
return Ok(header);
}

// Consistency rules:
// 1. Everything prefixed with `sync_` indicates new epoch, for which we are syncing.
// 1a. `sync_prev` means the last of the prev epoch.
Expand Down Expand Up @@ -2875,6 +2870,24 @@ impl Chain {
})
}
};
Ok(shard_state_header)
}

/// Returns ShardStateSyncResponseHeader for the given epoch and shard.
/// If the header is already available in the DB, returns the cached version and doesn't recompute it.
/// If the header was computed then it also gets cached in the DB.
pub fn get_state_response_header(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
) -> Result<ShardStateSyncResponseHeader, Error> {
// Check cache
let key = StateHeaderKey(shard_id, sync_hash).try_to_vec()?;
if let Ok(Some(header)) = self.store.store().get_ser(DBCol::StateHeaders, &key) {
return Ok(header);
}

let shard_state_header = self.compute_state_response_header(shard_id, sync_hash)?;

// Saving the header data
let mut store_update = self.store.store().store_update();
Expand Down
3 changes: 0 additions & 3 deletions core/primitives/src/syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ pub enum StateSyncDumpProgress {
/// Block hash of the first block of the epoch.
/// The dumped state corresponds to the state before applying this block.
sync_hash: CryptoHash,
/// Root of the state being dumped.
state_root: StateRoot,
/// Progress made.
parts_dumped: u64,
num_parts: u64,
},
}
27 changes: 18 additions & 9 deletions nearcore/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy:
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_obtain_part_elapsed_sec",
"Time needed to obtain a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_dump_num_parts_total",
Expand Down Expand Up @@ -91,3 +82,21 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy<IntGaugeVec> = Lazy::new(||
)
.unwrap()
});
pub static STATE_SYNC_APPLY_PART_DELAY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_apply_part_delay_sec",
"Time needed to apply a state part",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_obtain_part_delay_sec",
"Time needed to obtain a part",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
28 changes: 26 additions & 2 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,10 @@ impl RuntimeAdapter for NightshadeRuntime {
%block_hash,
num_parts = part_id.total)
.entered();
let _timer = metrics::STATE_SYNC_OBTAIN_PART_DELAY
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let epoch_id = self.get_epoch_id(block_hash)?;
let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, &epoch_id)?;
let trie = self.tries.get_view_trie_for_shard(shard_uid, *state_root);
Expand All @@ -1270,11 +1274,27 @@ impl RuntimeAdapter for NightshadeRuntime {
match Trie::validate_trie_nodes_for_part(state_root, part_id, trie_nodes) {
Ok(_) => true,
// Storage error should not happen
Err(_) => false,
Err(err) => {
tracing::error!(
target: "state-parts",
?state_root,
?part_id,
?err,
"State part storage error");
false
}
}
}
// Deserialization error means we've got the data from malicious peer
Err(_) => false,
Err(err) => {
tracing::error!(
target: "state-parts",
?state_root,
?part_id,
?err,
"State part deserialization error");
false
}
}
}

Expand Down Expand Up @@ -1371,6 +1391,10 @@ impl RuntimeAdapter for NightshadeRuntime {
data: &[u8],
epoch_id: &EpochId,
) -> Result<(), Error> {
let _timer = metrics::STATE_SYNC_APPLY_PART_DELAY
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let part = BorshDeserialize::try_from_slice(data)
.expect("Part was already validated earlier, so could never fail here");
let ApplyStatePartResult { trie_changes, flat_state_delta, contract_codes } =
Expand Down
159 changes: 80 additions & 79 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,63 +161,75 @@ async fn state_sync_dump(
epoch_id,
epoch_height,
sync_hash,
state_root,
parts_dumped,
num_parts,
})) => {
// The actual dumping of state to S3.
tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, %state_root, parts_dumped, num_parts, "Creating parts and dumping them");
let mut res = None;
for part_id in parts_dumped..num_parts {
// Dump parts sequentially synchronously.
// TODO: How to make it possible to dump state more effectively using multiple nodes?
let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
let state_header = chain.get_state_response_header(shard_id, sync_hash);
match state_header {
Ok(state_header) => {
let state_root = state_header.chunk_prev_state_root();
let num_parts =
get_num_state_parts(state_header.state_root_node().memory_usage);

let state_part = match get_state_part(
&runtime,
&shard_id,
&sync_hash,
&state_root,
part_id,
num_parts,
&chain,
) {
Ok(state_part) => state_part,
Err(err) => {
res = Some(err);
break;
let mut res = None;
// The actual dumping of state to S3.
tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, parts_dumped, "Creating parts and dumping them");
for part_id in parts_dumped..num_parts {
// Dump parts sequentially synchronously.
// TODO: How to make it possible to dump state more effectively using multiple nodes?
let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();

let state_part = match obtain_and_store_state_part(
&runtime,
&shard_id,
&sync_hash,
&state_root,
part_id,
num_parts,
&chain,
) {
Ok(state_part) => state_part,
Err(err) => {
res = Some(err);
break;
}
};
let location = s3_location(
&config.chain_id,
epoch_height,
shard_id,
part_id,
num_parts,
);
if let Err(err) =
put_state_part(&location, &state_part, &shard_id, &bucket).await
{
res = Some(err);
break;
}
update_progress(
&shard_id,
&epoch_id,
epoch_height,
&sync_hash,
part_id,
num_parts,
state_part.len(),
&chain,
);
}
if let Some(err) = res {
Err(err)
} else {
Ok(Some(StateSyncDumpProgress::AllDumped {
epoch_id,
epoch_height,
num_parts: Some(num_parts),
}))
}
};
let location =
s3_location(&config.chain_id, epoch_height, shard_id, part_id, num_parts);
if let Err(err) =
put_state_part(&location, &state_part, &shard_id, &bucket).await
{
res = Some(err);
break;
}
update_progress(
&shard_id,
&epoch_id,
epoch_height,
&sync_hash,
&state_root,
part_id,
num_parts,
state_part.len(),
&chain,
);
}
if let Some(err) = res {
Err(err)
} else {
Ok(Some(StateSyncDumpProgress::AllDumped {
epoch_id,
epoch_height,
num_parts: Some(num_parts),
}))
Err(err) => Err(err),
}
}
};
Expand Down Expand Up @@ -268,7 +280,6 @@ fn update_progress(
epoch_id: &EpochId,
epoch_height: EpochHeight,
sync_hash: &CryptoHash,
state_root: &StateRoot,
part_id: u64,
num_parts: u64,
part_len: usize,
Expand All @@ -282,9 +293,7 @@ fn update_progress(
epoch_id: epoch_id.clone(),
epoch_height,
sync_hash: *sync_hash,
state_root: *state_root,
parts_dumped: part_id + 1,
num_parts,
};
match chain.store().set_state_sync_dump_progress(*shard_id, Some(next_progress.clone())) {
Ok(_) => {
Expand Down Expand Up @@ -328,7 +337,8 @@ fn set_metrics(
}
}

fn get_state_part(
/// Obtains and then saves the part data.
fn obtain_and_store_state_part(
runtime: &Arc<NightshadeRuntime>,
shard_id: &ShardId,
sync_hash: &CryptoHash,
Expand All @@ -337,19 +347,13 @@ fn get_state_part(
num_parts: u64,
chain: &Chain,
) -> Result<Vec<u8>, Error> {
let state_part = {
let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
runtime.obtain_state_part(
*shard_id,
&sync_hash,
&state_root,
PartId::new(part_id, num_parts),
)?
};
let state_part = runtime.obtain_state_part(
*shard_id,
&sync_hash,
&state_root,
PartId::new(part_id, num_parts),
)?;

// Save the part data.
let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?;
let mut store_update = chain.store().store().store_update();
store_update.set(DBCol::StateParts, &key, &state_part);
Expand All @@ -367,27 +371,24 @@ fn start_dumping(
) -> Result<Option<StateSyncDumpProgress>, Error> {
let epoch_info = runtime.get_epoch_info(&epoch_id)?;
let epoch_height = epoch_info.epoch_height();
let num_shards = runtime.num_shards(&epoch_id)?;
let sync_hash_block = chain.get_block(&sync_hash)?;
if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) {
assert_eq!(num_shards, sync_hash_block.chunks().len() as u64);
let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root();
let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?;
let num_parts = get_num_state_parts(state_root_node.memory_usage);
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch");
let sync_prev_header = chain.get_block_header(&sync_hash)?;
let sync_prev_hash = sync_prev_header.hash();

let state_header = chain.get_state_response_header(shard_id, sync_hash)?;
let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage);
if runtime.cares_about_shard(None, sync_prev_hash, shard_id, false) {
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch");
// Note that first the state of the state machines gets changes to
// `InProgress` and it starts dumping state after a short interval.
set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height));
Ok(Some(StateSyncDumpProgress::InProgress {
epoch_id,
epoch_height,
sync_hash,
state_root,
parts_dumped: 0,
num_parts,
}))
} else {
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch");
tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch");
Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) }))
}
}
Expand Down
Loading

0 comments on commit 44f8661

Please sign in to comment.