-
Notifications
You must be signed in to change notification settings - Fork 704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preserve artifact cache unless stale #1918
Changes from 5 commits
a563e25
f31d354
195bbce
99aa012
17973ef
e81b456
851a77a
a3b0fcf
b323c28
a2808f8
0657d41
0384ae7
4b1bb0a
a8bcce4
50b7ccc
b6c1a07
2ad5262
3723806
e44c451
6c19164
dad5285
1ede201
2d51b52
36a33e8
69f6a44
d3254f5
f4d22fa
39448ff
3a2c1cd
a02cb06
53e4557
d4f3083
9d2142e
931aae1
5de4e8e
a0b71c5
c85adfe
b897f1c
89ada31
868426a
652bec7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
|
||
//! PVF artifacts (final compiled code blobs). | ||
//! | ||
//! # Lifecycle of an artifact | ||
//! # Lifecycle of an artifact | ||
//! | ||
//! 1. During node start-up, the artifacts cache is cleaned up. This means that all local artifacts | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
//! stored on-disk are cleared, and we start with an empty [`Artifacts`] table. | ||
|
@@ -55,18 +55,56 @@ | |
//! older by a predefined parameter. This process is run very rarely (say, once a day). Once the | ||
//! artifact is expired it is removed from disk eagerly atomically. | ||
|
||
use crate::host::PrepareResultSender; | ||
use crate::{host::PrepareResultSender, LOG_TARGET}; | ||
use always_assert::always; | ||
use polkadot_core_primitives::Hash; | ||
use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData}; | ||
use polkadot_node_primitives::NODE_VERSION; | ||
use polkadot_parachain_primitives::primitives::ValidationCodeHash; | ||
use polkadot_primitives::ExecutorParamsHash; | ||
use std::{ | ||
collections::HashMap, | ||
path::{Path, PathBuf}, | ||
str::FromStr as _, | ||
time::{Duration, SystemTime}, | ||
}; | ||
|
||
macro_rules! concat_const { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
($x:expr, $y:expr) => {{ | ||
// ensure inputs to be strings | ||
const _: &str = $x; | ||
const _: &str = $y; | ||
|
||
const X: &[u8] = $x.as_bytes(); | ||
const Y: &[u8] = $y.as_bytes(); | ||
const XL: usize = X.len(); | ||
const YL: usize = Y.len(); | ||
const L: usize = XL + YL; | ||
eagr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
const fn concat() -> [u8; L] { | ||
eagr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut cat = [0u8; L]; | ||
let mut i = 0; | ||
while i < XL { | ||
cat[i] = X[i]; | ||
i += 1; | ||
} | ||
while i < L { | ||
cat[i] = Y[i - XL]; | ||
i += 1; | ||
} | ||
cat | ||
} | ||
|
||
// SAFETY: safe because x and y are ensured to be valid | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
unsafe { std::str::from_utf8_unchecked(&concat()) } | ||
}}; | ||
} | ||
|
||
const RUNTIME_PREFIX: &str = "wasmtime_"; | ||
const NODE_PREFIX: &str = "polkadot_v"; | ||
const ARTIFACT_PREFIX: &str = | ||
concat_const!(RUNTIME_PREFIX, concat_const!(NODE_PREFIX, NODE_VERSION)); | ||
|
||
/// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set. | ||
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] | ||
pub struct ArtifactId { | ||
|
@@ -75,9 +113,6 @@ pub struct ArtifactId { | |
} | ||
|
||
impl ArtifactId { | ||
const PREFIX: &'static str = "wasmtime_"; | ||
const NODE_VERSION_PREFIX: &'static str = "polkadot_v"; | ||
|
||
/// Creates a new artifact ID with the given hash. | ||
pub fn new(code_hash: ValidationCodeHash, executor_params_hash: ExecutorParamsHash) -> Self { | ||
Self { code_hash, executor_params_hash } | ||
|
@@ -88,38 +123,34 @@ impl ArtifactId { | |
Self::new(pvf.code_hash(), pvf.executor_params().hash()) | ||
} | ||
|
||
/// Returns the expected path to this artifact given the root of the cache. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should update the docstring. |
||
pub fn path(&self, cache_path: &Path) -> PathBuf { | ||
let file_name = format!( | ||
"{}{}{}_{:#x}_{:#x}", | ||
RUNTIME_PREFIX, NODE_PREFIX, NODE_VERSION, self.code_hash, self.executor_params_hash | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
cache_path.join(file_name) | ||
} | ||
|
||
/// Tries to recover the artifact id from the given file name. | ||
#[cfg(test)] | ||
pub fn from_file_name(file_name: &str) -> Option<Self> { | ||
use polkadot_core_primitives::Hash; | ||
use std::str::FromStr as _; | ||
pub(crate) fn from_file_name(file_name: &str) -> Option<Self> { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let file_name = file_name.strip_prefix(ARTIFACT_PREFIX)?.strip_prefix('_')?; | ||
|
||
let file_name = | ||
file_name.strip_prefix(Self::PREFIX)?.strip_prefix(Self::NODE_VERSION_PREFIX)?; | ||
// [ code hash | param hash ] | ||
let hashes: Vec<&str> = file_name.split('_').collect(); | ||
|
||
// [ node version | code hash | param hash ] | ||
let parts: Vec<&str> = file_name.split('_').collect(); | ||
let (_node_ver, code_hash_str, executor_params_hash_str) = (parts[0], parts[1], parts[2]); | ||
if hashes.len() != 2 { | ||
return None | ||
} | ||
|
||
let (code_hash_str, executor_params_hash_str) = (hashes[0], hashes[1]); | ||
|
||
let code_hash = Hash::from_str(code_hash_str).ok()?.into(); | ||
let executor_params_hash = | ||
ExecutorParamsHash::from_hash(Hash::from_str(executor_params_hash_str).ok()?); | ||
|
||
Some(Self { code_hash, executor_params_hash }) | ||
} | ||
|
||
/// Returns the expected path to this artifact given the root of the cache. | ||
pub fn path(&self, cache_path: &Path) -> PathBuf { | ||
let file_name = format!( | ||
"{}{}{}_{:#x}_{:#x}", | ||
Self::PREFIX, | ||
Self::NODE_VERSION_PREFIX, | ||
NODE_VERSION, | ||
self.code_hash, | ||
self.executor_params_hash | ||
); | ||
cache_path.join(file_name) | ||
} | ||
} | ||
|
||
/// A bundle of the artifact ID and the path. | ||
|
@@ -176,32 +207,79 @@ pub enum ArtifactState { | |
|
||
/// A container of all known artifact ids and their states. | ||
pub struct Artifacts { | ||
artifacts: HashMap<ArtifactId, ArtifactState>, | ||
inner: HashMap<ArtifactId, ArtifactState>, | ||
} | ||
|
||
impl Artifacts { | ||
/// Initialize a blank cache at the given path. This will clear everything present at the | ||
/// given path, to be populated over time. | ||
/// | ||
/// The recognized artifacts will be filled in the table and unrecognized will be removed. | ||
pub async fn new(cache_path: &Path) -> Self { | ||
// First delete the entire cache. This includes artifacts and any leftover worker dirs (see | ||
// [`WorkerDir`]). Nodes are long-running so this should populate shortly. | ||
let _ = tokio::fs::remove_dir_all(cache_path).await; | ||
#[cfg(test)] | ||
pub(crate) fn new() -> Self { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Self { inner: HashMap::new() } | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) fn len(&self) -> usize { | ||
self.inner.len() | ||
} | ||
|
||
/// Create a table with valid artifacts and prune the invalid ones. | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub async fn new_and_prune(cache_path: &Path) -> Self { | ||
let mut artifacts = Self { inner: HashMap::new() }; | ||
Self::prune_and_insert(cache_path, &mut artifacts).await; | ||
artifacts | ||
} | ||
|
||
// FIXME eagr: extremely janky, please comment on the appropriate way of setting | ||
// * `last_time_needed` set as roughly around startup time | ||
// * `prepare_stats` set as Nones, since the metadata was lost | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
async fn prune_and_insert(cache_path: impl AsRef<Path>, artifacts: &mut Artifacts) { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn is_stale(file_name: &str) -> bool { | ||
!file_name.starts_with(ARTIFACT_PREFIX) | ||
} | ||
|
||
fn insert_cache(artifacts: &mut Artifacts, id: ArtifactId) { | ||
let last_time_needed = SystemTime::now(); | ||
let prepare_stats = Default::default(); | ||
always!(artifacts | ||
.inner | ||
.insert(id, ArtifactState::Prepared { last_time_needed, prepare_stats }) | ||
.is_none()); | ||
} | ||
|
||
// Make sure that the cache path directory and all its parents are created. | ||
let cache_path = cache_path.as_ref(); | ||
let _ = tokio::fs::create_dir_all(cache_path).await; | ||
|
||
Self { artifacts: HashMap::new() } | ||
} | ||
if let Ok(mut dir) = tokio::fs::read_dir(cache_path).await { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut prunes = vec![]; | ||
|
||
loop { | ||
match dir.next_entry().await { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(None) => break, | ||
Ok(Some(entry)) => { | ||
let file_name = entry.file_name(); | ||
if let Some(file_name) = file_name.to_str() { | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if is_stale(file_name) { | ||
prunes.push(tokio::fs::remove_file(cache_path.join(file_name))); | ||
} else if let Some(id) = ArtifactId::from_file_name(file_name) { | ||
insert_cache(artifacts, id); | ||
} | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}, | ||
Err(err) => gum::error!( | ||
target: LOG_TARGET, | ||
?err, | ||
"collecting stale artifacts", | ||
), | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) fn empty() -> Self { | ||
Self { artifacts: HashMap::new() } | ||
futures::future::join_all(prunes).await; | ||
} | ||
} | ||
|
||
/// Returns the state of the given artifact by its ID. | ||
pub fn artifact_state_mut(&mut self, artifact_id: &ArtifactId) -> Option<&mut ArtifactState> { | ||
self.artifacts.get_mut(artifact_id) | ||
self.inner.get_mut(artifact_id) | ||
} | ||
|
||
/// Inform the table about the artifact with the given ID. The state will be set to "preparing". | ||
|
@@ -215,7 +293,7 @@ impl Artifacts { | |
) { | ||
// See the precondition. | ||
always!(self | ||
.artifacts | ||
.inner | ||
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 }) | ||
.is_none()); | ||
} | ||
|
@@ -233,7 +311,7 @@ impl Artifacts { | |
) { | ||
// See the precondition. | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
always!(self | ||
.artifacts | ||
.inner | ||
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats }) | ||
.is_none()); | ||
} | ||
|
@@ -244,7 +322,7 @@ impl Artifacts { | |
let now = SystemTime::now(); | ||
|
||
let mut to_remove = vec![]; | ||
for (k, v) in self.artifacts.iter() { | ||
for (k, v) in self.inner.iter() { | ||
if let ArtifactState::Prepared { last_time_needed, .. } = *v { | ||
if now | ||
.duration_since(last_time_needed) | ||
|
@@ -257,7 +335,7 @@ impl Artifacts { | |
} | ||
|
||
for artifact in &to_remove { | ||
self.artifacts.remove(artifact); | ||
self.inner.remove(artifact); | ||
} | ||
|
||
to_remove | ||
|
@@ -266,15 +344,39 @@ impl Artifacts { | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::{ArtifactId, Artifacts, NODE_VERSION}; | ||
use super::{ArtifactId, Artifacts, ARTIFACT_PREFIX, NODE_VERSION}; | ||
use polkadot_primitives::ExecutorParamsHash; | ||
use sp_core::H256; | ||
use std::{path::Path, str::FromStr}; | ||
use std::{ | ||
fs, | ||
path::{Path, PathBuf}, | ||
str::FromStr, | ||
}; | ||
|
||
fn file_name(code_hash: &str, param_hash: &str) -> String { | ||
format!("wasmtime_polkadot_v{}_0x{}_0x{}", NODE_VERSION, code_hash, param_hash) | ||
} | ||
|
||
fn fake_artifact_path<D: AsRef<Path>>(dir: D, prefix: &str) -> PathBuf { | ||
let code_hash = "1234567890123456789012345678901234567890123456789012345678901234"; | ||
let params_hash = "4321098765432109876543210987654321098765432109876543210987654321"; | ||
let file_name = format!("{}_0x{}_0x{}", prefix, code_hash, params_hash); | ||
|
||
let mut path = dir.as_ref().to_path_buf(); | ||
path.push(file_name); | ||
path | ||
} | ||
|
||
fn create_fake_artifact<D: AsRef<Path>>(dir: D, prefix: &str) { | ||
let path = fake_artifact_path(dir, prefix); | ||
fs::File::create(path).unwrap(); | ||
} | ||
|
||
#[test] | ||
fn artifact_prefix() { | ||
assert_eq!(ARTIFACT_PREFIX, format!("wasmtime_polkadot_v{}", NODE_VERSION),) | ||
} | ||
|
||
#[test] | ||
fn from_file_name() { | ||
assert!(ArtifactId::from_file_name("").is_none()); | ||
|
@@ -318,26 +420,24 @@ mod tests { | |
} | ||
|
||
#[tokio::test] | ||
async fn artifacts_removes_cache_on_startup() { | ||
let fake_cache_path = crate::worker_intf::tmppath("test-cache").await.unwrap(); | ||
let fake_artifact_path = { | ||
let mut p = fake_cache_path.clone(); | ||
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); | ||
p | ||
}; | ||
async fn remove_stale_cache_on_startup() { | ||
let cache_dir = crate::worker_intf::tmppath("test-cache").await.unwrap(); | ||
|
||
// create a tmp cache with 1 artifact. | ||
fs::create_dir_all(&cache_dir).unwrap(); | ||
|
||
std::fs::create_dir_all(&fake_cache_path).unwrap(); | ||
std::fs::File::create(fake_artifact_path).unwrap(); | ||
// 3 invalid, 1 valid | ||
create_fake_artifact(&cache_dir, ""); | ||
create_fake_artifact(&cache_dir, "wasmtime_polkadot_v"); | ||
create_fake_artifact(&cache_dir, "wasmtime_polkadot_v1.0.0"); | ||
create_fake_artifact(&cache_dir, ARTIFACT_PREFIX); | ||
|
||
// this should remove it and re-create. | ||
assert_eq!(fs::read_dir(&cache_dir).unwrap().count(), 4); | ||
|
||
let p = &fake_cache_path; | ||
Artifacts::new(p).await; | ||
let artifacts = Artifacts::new_and_prune(&cache_dir).await; | ||
|
||
assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0); | ||
assert_eq!(fs::read_dir(&cache_dir).unwrap().count(), 1); | ||
assert_eq!(artifacts.len(), 1); | ||
|
||
std::fs::remove_dir_all(fake_cache_path).unwrap(); | ||
fs::remove_dir_all(cache_dir).unwrap(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random thought, could we add an
# Artifact integrity
section here? It could explain that artifacts must remain valid to satisfy theSAFETY
constraints ofexecute_artifact
. Therefore we do two things: check file integrity and version compatibility on host startup, and also before voting against a candidate we re-check the file integrity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we do this when we have decided what to do if an artifact is found corrupted? I'll make a note of this.