Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup PVF artifact by cache limit and stale time #4662

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ pub struct Artifacts {
inner: HashMap<ArtifactId, ArtifactState>,
}

/// A condition which we use to cleanup artifacts
#[derive(Debug)]
pub enum CleanupBy {
// Inactive time after which artefact is deleted
Time(Duration),
// Max size in bytes. Reaching it older artefacts are deleted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment seems wrong, we delete least used ones. At least for me older usually means in relation to creation time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Size(u64),
}

impl Artifacts {
#[cfg(test)]
pub(crate) fn empty() -> Self {
Expand Down Expand Up @@ -251,21 +260,47 @@ impl Artifacts {
})
}

/// Remove artifacts older than the given TTL and return id and path of the removed ones.
pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> {
let now = SystemTime::now();

/// Remove artifacts older than the given TTL or the total artifacts size limit and return id
/// and path of the removed ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc comment needs fixing: we evict LRU artifacts only if we go over cache limit.

pub fn prune(&mut self, cleanup_by: &CleanupBy) -> Vec<(ArtifactId, PathBuf)> {
let mut to_remove = vec![];
for (k, v) in self.inner.iter() {
if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v {
if now
.duration_since(last_time_needed)
.map(|age| age > artifact_ttl)
.unwrap_or(false)
{
to_remove.push((k.clone(), path.clone()));

match cleanup_by {
CleanupBy::Time(artifact_ttl) => {
let now = SystemTime::now();
for (k, v) in self.inner.iter() {
if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v {
if now
.duration_since(last_time_needed)
.map(|age| age > *artifact_ttl)
.unwrap_or(false)
{
to_remove.push((k.clone(), path.clone()));
}
}
}
}
},
CleanupBy::Size(size_limit) => {
let mut total_size = 0;
let mut artifact_sizes = vec![];

for (k, v) in self.inner.iter() {
if let ArtifactState::Prepared { ref path, last_time_needed, .. } = *v {
if let Ok(metadata) = fs::metadata(path) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What bothers me here is that we're running a (possibly large) number of synchronous blocking filesystem operations in Artifacts::prune(), which is called from the PVF host's main loop running on a non-blocking threadpool. In case of I/O problems the whole PVF host will stall. I think we should either make use of tokio::fs::metadata(), or, even better, store the artifact's size as a property of the artifact itself along with the other data, and then no filesystem access is required inside prune().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, added the size to artifact's state

let size = metadata.len();
total_size += size;
artifact_sizes.push((k.clone(), path.clone(), size, last_time_needed));
}
}
}
artifact_sizes.sort_by_key(|&(_, _, _, last_time_needed)| last_time_needed);

while total_size > *size_limit {
let Some((artifact_id, path, size, _)) = artifact_sizes.pop() else { break };
to_remove.push((artifact_id, path));
total_size -= size;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A unit test to check the correctness of this behavior would be definitely appreciated :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added (and it saved me because the implementation was not correct)

},
}

for artifact in &to_remove {
Expand Down
16 changes: 8 additions & 8 deletions polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! [`ValidationHost`], that allows communication with that event-loop.

use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, CleanupBy},
execute::{self, PendingExecutionRequest},
metrics::Metrics,
prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
Expand Down Expand Up @@ -293,7 +293,7 @@ pub async fn start(
let run_host = async move {
run(Inner {
cleanup_pulse_interval: Duration::from_secs(3600),
artifact_ttl: Duration::from_secs(3600 * 24),
cleanup_by: CleanupBy::Time(Duration::from_secs(3600 * 24)),
artifacts,
to_host_rx,
to_prepare_queue_tx,
Expand Down Expand Up @@ -337,7 +337,7 @@ impl AwaitingPrepare {

struct Inner {
cleanup_pulse_interval: Duration,
artifact_ttl: Duration,
cleanup_by: CleanupBy,
artifacts: Artifacts,

to_host_rx: mpsc::Receiver<ToHost>,
Expand All @@ -359,7 +359,7 @@ struct Fatal;
async fn run(
Inner {
cleanup_pulse_interval,
artifact_ttl,
cleanup_by,
mut artifacts,
to_host_rx,
from_prepare_queue_rx,
Expand Down Expand Up @@ -415,7 +415,7 @@ async fn run(
break_if_fatal!(handle_cleanup_pulse(
&mut to_sweeper_tx,
&mut artifacts,
artifact_ttl,
&cleanup_by,
).await);
},
to_host = to_host_rx.next() => {
Expand Down Expand Up @@ -859,9 +859,9 @@ async fn enqueue_prepare_for_execute(
async fn handle_cleanup_pulse(
sweeper_tx: &mut mpsc::Sender<PathBuf>,
artifacts: &mut Artifacts,
artifact_ttl: Duration,
cleanup_by: &CleanupBy,
) -> Result<(), Fatal> {
let to_remove = artifacts.prune(artifact_ttl);
let to_remove = artifacts.prune(cleanup_by);
gum::debug!(
target: LOG_TARGET,
"PVF pruning: {} artifacts reached their end of life",
Expand Down Expand Up @@ -1032,7 +1032,7 @@ pub(crate) mod tests {

let run = run(Inner {
cleanup_pulse_interval,
artifact_ttl,
cleanup_by: CleanupBy::Time(artifact_ttl),
artifacts,
to_host_rx,
to_prepare_queue_tx,
Expand Down
Loading