diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index a79e8801c2a..8715b952353 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -2,7 +2,9 @@ use crate::app::AppState; use crate::auth::AuthCheck; -use crate::worker::jobs::{self, CheckTyposquat, UpdateDefaultVersion}; +use crate::worker::jobs::{ + self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion, +}; use axum::body::Bytes; use axum::Json; use cargo_manifest::{Dependency, DepsSet, TargetDepsSet}; @@ -442,6 +444,8 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult = LazyLock::new(|| Regex::new(r"(Message-ID|Date): [^\r\n]+\r\n").unwrap()); + static DATE_TIME_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z").unwrap()); + static INVITE_TOKEN_REGEX: LazyLock = LazyLock::new(|| Regex::new(r"/accept-invite/\w+").unwrap()); @@ -186,6 +189,7 @@ impl TestApp { .into_iter() .map(|email| { let email = EMAIL_HEADER_REGEX.replace_all(&email, ""); + let email = DATE_TIME_REGEX.replace_all(&email, "[0000-00-00T00:00:00Z]"); let email = INVITE_TOKEN_REGEX.replace_all(&email, "/accept-invite/[invite-token]"); email.to_string() }) diff --git a/src/worker/jobs/mod.rs b/src/worker/jobs/mod.rs index 58d1b504f2b..5d20fd86e7f 100644 --- a/src/worker/jobs/mod.rs +++ b/src/worker/jobs/mod.rs @@ -15,6 +15,7 @@ mod git; mod index_version_downloads_archive; mod readmes; pub mod rss; +mod send_publish_notifications; mod sync_admins; mod typosquat; mod update_default_version; @@ -29,6 +30,7 @@ pub use self::expiry_notification::SendTokenExpiryNotifications; pub use self::git::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex}; pub use self::index_version_downloads_archive::IndexVersionDownloadsArchive; pub use self::readmes::RenderAndUploadReadme; +pub use self::send_publish_notifications::SendPublishNotificationsJob; pub use self::sync_admins::SyncAdmins; pub use self::typosquat::CheckTyposquat; pub use self::update_default_version::UpdateDefaultVersion; diff --git a/src/worker/jobs/send_publish_notifications.rs b/src/worker/jobs/send_publish_notifications.rs new file mode 100644 index 00000000000..d605161b305 --- /dev/null +++ b/src/worker/jobs/send_publish_notifications.rs @@ -0,0 +1,162 @@ +use crate::email::Email; +use crate::models::OwnerKind; +use crate::schema::{crate_owners, crates, emails, users, versions}; +use crate::tasks::spawn_blocking; +use crate::worker::Environment; +use anyhow::anyhow; +use chrono::{NaiveDateTime, SecondsFormat}; +use crates_io_worker::BackgroundJob; +use diesel::prelude::*; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; +use std::sync::Arc; + +/// Background job that sends email notifications to all crate owners when a +/// new crate version is published. +#[derive(Serialize, Deserialize)] +pub struct SendPublishNotificationsJob { + version_id: i32, +} + +impl SendPublishNotificationsJob { + pub fn new(version_id: i32) -> Self { + Self { version_id } + } +} + +impl BackgroundJob for SendPublishNotificationsJob { + const JOB_NAME: &'static str = "send_publish_notifications"; + + type Context = Arc; + + async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> { + let mut conn = ctx.deadpool.get().await?; + + // Get crate name, version and other publish details + let publish_details = PublishDetails::for_version(self.version_id, &mut conn).await?; + + let publish_time = publish_details + .publish_time + .and_utc() + .to_rfc3339_opts(SecondsFormat::Secs, true); + + // Find names and email addresses of all crate owners + let recipients = crate_owners::table + .filter(crate_owners::deleted.eq(false)) + .filter(crate_owners::owner_kind.eq(OwnerKind::User)) + .filter(crate_owners::crate_id.eq(publish_details.crate_id)) + .inner_join(users::table) + .inner_join(emails::table.on(users::id.eq(emails::user_id))) + .filter(emails::verified.eq(true)) + .select((users::gh_login, emails::email)) + .load::<(String, String)>(&mut conn) + .await?; + + // Sending emails is currently a blocking operation, so we have to use + // `spawn_blocking()` to run it in a separate thread. + spawn_blocking(move || { + let results = recipients + .into_iter() + .map(|(ref recipient, email_address)| { + let krate = &publish_details.krate; + let version = &publish_details.version; + + let publisher_info = match &publish_details.publisher { + Some(publisher) if publisher == recipient => &format!( + " by your account (https://{domain}/users/{publisher})", + domain = ctx.config.domain_name + ), + Some(publisher) => &format!( + " by {publisher} (https://{domain}/users/{publisher})", + domain = ctx.config.domain_name + ), + None => "", + }; + + let email = PublishNotificationEmail { + recipient, + krate, + version, + publish_time: &publish_time, + publisher_info, + }; + + ctx.emails.send(&email_address, email).inspect_err(|err| { + warn!("Failed to send publish notification for {krate}@{version} to {email_address}: {err}") + }) + }) + .collect::>(); + + // Check if any of the emails succeeded to send, in which case we + // consider the job successful enough and not worth retrying. + match results.iter().any(|result| result.is_ok()) { + true => Ok(()), + false => Err(anyhow!("Failed to send publish notifications")), + } + }) + .await?; + + Ok(()) + } +} + +#[derive(Debug, Queryable, Selectable)] +struct PublishDetails { + #[diesel(select_expression = crates::columns::id)] + crate_id: i32, + #[diesel(select_expression = crates::columns::name)] + krate: String, + #[diesel(select_expression = versions::columns::num)] + version: String, + #[diesel(select_expression = versions::columns::created_at)] + publish_time: NaiveDateTime, + #[diesel(select_expression = users::columns::gh_login.nullable())] + publisher: Option, +} + +impl PublishDetails { + async fn for_version(version_id: i32, conn: &mut AsyncPgConnection) -> QueryResult { + versions::table + .find(version_id) + .inner_join(crates::table) + .left_join(users::table) + .select(PublishDetails::as_select()) + .first(conn) + .await + } +} + +/// Email template for notifying crate owners about a new crate version +/// being published. +#[derive(Debug, Clone)] +struct PublishNotificationEmail<'a> { + recipient: &'a str, + krate: &'a str, + version: &'a str, + publish_time: &'a str, + publisher_info: &'a str, +} + +impl Email for PublishNotificationEmail<'_> { + fn subject(&self) -> String { + let Self { krate, version, .. } = self; + format!("crates.io: Successfully published {krate}@{version}") + } + + fn body(&self) -> String { + let Self { + recipient, + krate, + version, + publish_time, + publisher_info, + } = self; + + format!( + "Hello {recipient}! + +A new version of the package {krate} ({version}) was published{publisher_info} at {publish_time}. + +If you have questions or security concerns, you can contact us at help@crates.io." + ) + } +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 04906a13c22..70bb93871d1 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -36,6 +36,7 @@ impl RunnerExt for Runner> { .register_job_type::() .register_job_type::() .register_job_type::() + .register_job_type::() .register_job_type::() .register_job_type::() .register_job_type::()