Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert a too full CDN invalidation queue into a full purge #2636

Merged
merged 1 commit into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

193 changes: 193 additions & 0 deletions src/cdn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,67 @@ impl CdnBackend {
}
}

/// fully invalidate the CDN distribution, also emptying the queue.
#[instrument(skip(conn))]
pub(crate) async fn full_invalidation(
config: &Config,
cdn: &CdnBackend,
metrics: &InstanceMetrics,
conn: &mut sqlx::PgConnection,
distribution_id: &str,
) -> Result<()> {
let mut transaction = conn.begin().await?;

let now = Utc::now();
for row in sqlx::query!(
"SELECT queued
FROM cdn_invalidation_queue
WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL
FOR UPDATE",
distribution_id,
)
.fetch_all(&mut *transaction)
.await?
{
if let Ok(duration) = (now - row.queued).to_std() {
// This can only fail when the duration is negative, which can't happen anyways
metrics
.cdn_queue_time
.with_label_values(&[distribution_id])
.observe(duration_to_seconds(duration));
}
}

match cdn
.create_invalidation(distribution_id, &["/*"])
.await
.context("error creating new invalidation")
{
Ok(invalidation) => {
sqlx::query!(
"UPDATE cdn_invalidation_queue
SET
created_in_cdn = CURRENT_TIMESTAMP,
cdn_reference = $1
WHERE
cdn_distribution_id = $2 AND created_in_cdn IS NULL",
invalidation.invalidation_id,
distribution_id,
)
.execute(&mut *transaction)
.await?;

transaction.commit().await?;
}
Err(err) => return Err(err),
};

Ok(())
}

#[instrument(skip(conn))]
pub(crate) async fn handle_queued_invalidation_requests(
config: &Config,
cdn: &CdnBackend,
metrics: &InstanceMetrics,
conn: &mut sqlx::PgConnection,
Expand Down Expand Up @@ -385,6 +444,24 @@ pub(crate) async fn handle_queued_invalidation_requests(
return Ok(());
}

if let Some(min_queued) = sqlx::query_scalar!(
"SELECT
min(queued)
FROM cdn_invalidation_queue
WHERE
cdn_distribution_id = $1 AND
created_in_cdn IS NULL",
distribution_id
)
.fetch_one(&mut *conn)
.await?
{
if (now - min_queued).to_std().unwrap_or_default() >= config.cdn_max_queued_age {
full_invalidation(config, cdn, metrics, conn, distribution_id).await?;
return Ok(());
}
}

// create new an invalidation for the queued path patterns
let mut transaction = conn.begin().await?;
let mut path_patterns: Vec<String> = Vec::new();
Expand Down Expand Up @@ -566,6 +643,8 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution(

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::test::async_wrapper;

Expand Down Expand Up @@ -671,6 +750,111 @@ mod tests {
})
}

#[test]
fn escalate_to_full_invalidation() {
crate::test::async_wrapper(|env| async move {
env.override_config(|config| {
config.cloudfront_distribution_id_web = Some("distribution_id_web".into());
config.cloudfront_distribution_id_static = Some("distribution_id_static".into());
config.cdn_max_queued_age = Duration::from_secs(0);
});

let cdn = env.cdn().await;
let config = env.config();
let mut conn = env.async_db().await.async_conn().await;
assert!(queued_or_active_crate_invalidations(&mut conn)
.await?
.is_empty());

queue_crate_invalidation(&mut conn, &env.config(), "krate").await?;

// invalidation paths are queued.
assert_eq!(
queued_or_active_crate_invalidations(&mut conn)
.await?
.into_iter()
.map(|i| (
i.cdn_distribution_id,
i.krate,
i.path_pattern,
i.cdn_reference
))
.collect::<Vec<_>>(),
vec![
(
"distribution_id_web".into(),
"krate".into(),
"/krate*".into(),
None
),
(
"distribution_id_web".into(),
"krate".into(),
"/crate/krate*".into(),
None
),
(
"distribution_id_static".into(),
"krate".into(),
"/rustdoc/krate*".into(),
None
),
]
);

let counts =
queued_or_active_crate_invalidation_count_by_distribution(&mut conn, &config)
.await?;
assert_eq!(counts.len(), 2);
assert_eq!(*counts.get("distribution_id_web").unwrap(), 2);
assert_eq!(*counts.get("distribution_id_static").unwrap(), 1);

// queueing the invalidation doesn't create it in the CDN
assert!(active_invalidations(&cdn, "distribution_id_web").is_empty());
assert!(active_invalidations(&cdn, "distribution_id_static").is_empty());

let cdn = env.cdn().await;
let config = env.config();

// now handle the queued invalidations
handle_queued_invalidation_requests(
&config,
&cdn,
&env.instance_metrics(),
&mut conn,
"distribution_id_web",
)
.await?;
handle_queued_invalidation_requests(
&config,
&cdn,
&env.instance_metrics(),
&mut conn,
"distribution_id_static",
)
.await?;

// which creates them in the CDN
{
let ir_web = active_invalidations(&cdn, "distribution_id_web");
assert_eq!(ir_web.len(), 1);
assert_eq!(ir_web[0].path_patterns, vec!["/*"]);

let ir_static = active_invalidations(&cdn, "distribution_id_static");
assert_eq!(ir_web.len(), 1);
assert_eq!(ir_static[0].path_patterns, vec!["/*"]);
}

// the queued entries got a CDN reference attached
assert!(queued_or_active_crate_invalidations(&mut conn)
.await?
.iter()
.all(|i| i.cdn_reference.is_some() && i.created_in_cdn.is_some()));

Ok(())
});
}

#[test]
fn invalidate_a_crate() {
crate::test::async_wrapper(|env| async move {
Expand Down Expand Up @@ -734,16 +918,19 @@ mod tests {
assert!(active_invalidations(&cdn, "distribution_id_static").is_empty());

let cdn = env.cdn().await;
let config = env.config();

// now handle the queued invalidations
handle_queued_invalidation_requests(
&config,
&cdn,
&env.instance_metrics(),
&mut conn,
"distribution_id_web",
)
.await?;
handle_queued_invalidation_requests(
&config,
&cdn,
&env.instance_metrics(),
&mut conn,
Expand Down Expand Up @@ -774,13 +961,15 @@ mod tests {

// now handle again
handle_queued_invalidation_requests(
&config,
&cdn,
&env.instance_metrics(),
&mut conn,
"distribution_id_web",
)
.await?;
handle_queued_invalidation_requests(
&config,
&cdn,
&env.instance_metrics(),
&mut conn,
Expand Down Expand Up @@ -849,6 +1038,7 @@ mod tests {

// handle the queued invalidations
handle_queued_invalidation_requests(
&env.config(),
&*env.cdn().await,
&env.instance_metrics(),
&mut conn,
Expand Down Expand Up @@ -909,6 +1099,7 @@ mod tests {

// handle the queued invalidations
handle_queued_invalidation_requests(
&env.config(),
&*env.cdn().await,
&env.instance_metrics(),
&mut conn,
Expand Down Expand Up @@ -937,6 +1128,7 @@ mod tests {

// now handle again
handle_queued_invalidation_requests(
&env.config(),
&*env.cdn().await,
&env.instance_metrics(),
&mut conn,
Expand Down Expand Up @@ -976,6 +1168,7 @@ mod tests {

// run the handler
handle_queued_invalidation_requests(
&env.config(),
&*env.cdn().await,
&env.instance_metrics(),
&mut conn,
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ pub struct Config {

pub(crate) cdn_backend: CdnKind,

/// The maximum age of a queued invalidation request before it is
/// considered too old and we fall back to a full purge of the
/// distributions.
pub(crate) cdn_max_queued_age: Duration,

// CloudFront distribution ID for the web server.
// Will be used for invalidation-requests.
pub cloudfront_distribution_id_web: Option<String>,
Expand Down Expand Up @@ -201,6 +206,7 @@ impl Config {
cache_invalidatable_responses: env("DOCSRS_CACHE_INVALIDATEABLE_RESPONSES", true)?,

cdn_backend: env("DOCSRS_CDN_BACKEND", CdnKind::Dummy)?,
cdn_max_queued_age: Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?),

cloudfront_distribution_id_web: maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?,
cloudfront_distribution_id_static: maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?,
Expand Down
2 changes: 2 additions & 0 deletions src/utils/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub fn start_background_cdn_invalidator(context: &dyn Context) -> Result<(), Err
let mut conn = pool.get_async().await?;
if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() {
cdn::handle_queued_invalidation_requests(
&config,
&cdn,
&metrics,
&mut conn,
Expand All @@ -133,6 +134,7 @@ pub fn start_background_cdn_invalidator(context: &dyn Context) -> Result<(), Err
}
if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() {
cdn::handle_queued_invalidation_requests(
&config,
&cdn,
&metrics,
&mut conn,
Expand Down
Loading