From c23d51d85174ceee1e162eaab6ea07a22558cdb1 Mon Sep 17 00:00:00 2001 From: apiraino Date: Tue, 20 Feb 2024 13:08:36 +0100 Subject: [PATCH] Import pull request assignments into triagebot General overview at: rust-lang#1753 - Added a new DB table with the fields to track how many PRs are assigned to a contributor - Initial DB table population with a one-off job, manually run. --- github-graphql/PullRequestsOpen.gql | 26 ++++++ github-graphql/README.md | 31 +++++++ github-graphql/src/lib.rs | 42 ++++++++++ src/db.rs | 11 ++- src/db/notifications.rs | 3 +- src/github.rs | 84 +++++++++++++++++++ src/handlers.rs | 1 + src/handlers/notification.rs | 2 +- .../pull_requests_assignment_update.rs | 72 ++++++++++++++++ src/jobs.rs | 10 +-- src/main.rs | 40 ++++++++- 11 files changed, 313 insertions(+), 9 deletions(-) create mode 100644 github-graphql/PullRequestsOpen.gql create mode 100644 github-graphql/README.md create mode 100644 src/handlers/pull_requests_assignment_update.rs diff --git a/github-graphql/PullRequestsOpen.gql b/github-graphql/PullRequestsOpen.gql new file mode 100644 index 00000000..803773ed --- /dev/null +++ b/github-graphql/PullRequestsOpen.gql @@ -0,0 +1,26 @@ +query PullRequestsOpen ($repo_owner: String!, $repo_name: String!, $after: String) { + repository(owner: $repo_owner, name: $repo_name) { + pullRequests(first: 100, after: $after, states:OPEN) { + pageInfo { + hasNextPage + endCursor + } + nodes { + number + updatedAt + createdAt + assignees(first: 10) { + nodes { + login + databaseId + } + } + labels(first:5, orderBy:{field:NAME, direction:DESC}) { + nodes { + name + } + } + } + } + } +} diff --git a/github-graphql/README.md b/github-graphql/README.md new file mode 100644 index 00000000..ba42b79a --- /dev/null +++ b/github-graphql/README.md @@ -0,0 +1,31 @@ +# How to use GraphQL with Rust + +# GUI Clients (Electron apps) + +Use a client to experiment and build your GraphQL query/mutation. + +https://insomnia.rest/download + +https://docs.usebruno.com + +Once you're happy with the result, save your query in a `.gql` file in this directory. It will serve as +documentation on how to reproduce the Rust boilerplate. + +# Cynic CLI + +Introspect a schema and save it locally: + +```sh +cynic introspect \ + -H "User-Agent: cynic/3.4.3" \ + -H "Authorization: Bearer [GITHUB_TOKEN]" \ + "https://api.github.com/graphql" \ + -o schemas/github.graphql +``` + +Execute a GraphQL query/mutation and save locally the Rust boilerplate: + +``` sh +cynic querygen --schema schemas/github.graphql --query query.gql +``` + diff --git a/github-graphql/src/lib.rs b/github-graphql/src/lib.rs index f18c77ef..48b06264 100644 --- a/github-graphql/src/lib.rs +++ b/github-graphql/src/lib.rs @@ -89,6 +89,7 @@ pub mod queries { #[derive(cynic::QueryFragment, Debug)] pub struct User { pub login: String, + pub database_id: Option, } #[derive(cynic::QueryFragment, Debug)] @@ -385,3 +386,44 @@ pub mod project_items { pub date: Option, } } + +/// Retrieve all pull requests waiting on review from T-compiler +/// GraphQL query: see file github-graphql/PullRequestsOpen.gql +pub mod pull_requests_open { + use crate::queries::{LabelConnection, PullRequestConnection, UserConnection}; + + use super::queries::DateTime; + use super::schema; + + #[derive(cynic::QueryVariables, Clone, Debug)] + pub struct PullRequestsOpenVariables<'a> { + pub repo_owner: &'a str, + pub repo_name: &'a str, + pub after: Option, + } + + #[derive(cynic::QueryFragment, Debug)] + #[cynic(graphql_type = "Query", variables = "PullRequestsOpenVariables")] + pub struct PullRequestsOpen { + #[arguments(owner: $repo_owner, name: $repo_name)] + pub repository: Option, + } + + #[derive(cynic::QueryFragment, Debug)] + #[cynic(variables = "PullRequestsOpenVariables")] + pub struct Repository { + #[arguments(first: 100, after: $after, states: "OPEN")] + pub pull_requests: PullRequestConnection, + } + + #[derive(cynic::QueryFragment, Debug)] + pub struct PullRequest { + pub number: i32, + pub updated_at: DateTime, + pub created_at: DateTime, + #[arguments(first: 10)] + pub assignees: UserConnection, + #[arguments(first: 5, orderBy: { direction: "DESC", field: "NAME" })] + pub labels: Option, + } +} diff --git a/src/db.rs b/src/db.rs index 272601b0..dc3f2086 100644 --- a/src/db.rs +++ b/src/db.rs @@ -303,9 +303,18 @@ CREATE TABLE jobs ( ); ", " -CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index +CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index ON jobs ( name, scheduled_at ); ", + " +CREATE table review_prefs ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + user_id BIGINT REFERENCES users(user_id), + assigned_prs INT[] NOT NULL DEFAULT array[]::INT[] +);", + " +CREATE UNIQUE INDEX review_prefs_user_id ON review_prefs(user_id); + ", ]; diff --git a/src/db/notifications.rs b/src/db/notifications.rs index 5b185793..4a7859bd 100644 --- a/src/db/notifications.rs +++ b/src/db/notifications.rs @@ -15,7 +15,8 @@ pub struct Notification { pub team_name: Option, } -pub async fn record_username(db: &DbClient, user_id: i64, username: String) -> anyhow::Result<()> { +/// Add a new user (if not existing) +pub async fn record_username(db: &DbClient, user_id: i64, username: &str) -> anyhow::Result<()> { db.execute( "INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING", &[&user_id, &username], diff --git a/src/github.rs b/src/github.rs index b987599b..f7d9e845 100644 --- a/src/github.rs +++ b/src/github.rs @@ -2592,6 +2592,90 @@ async fn project_items_by_status( Ok(all_items) } +/// Retrieve all pull requests in status OPEN that are not drafts +pub async fn retrieve_pull_requests( + repo: &Repository, + client: &GithubClient, +) -> anyhow::Result> { + use cynic::QueryBuilder; + use github_graphql::pull_requests_open::{PullRequestsOpen, PullRequestsOpenVariables}; + + let repo_owner = repo.owner(); + let repo_name = repo.name(); + + let mut prs = vec![]; + + let mut vars = PullRequestsOpenVariables { + repo_owner, + repo_name, + after: None, + }; + loop { + let query = PullRequestsOpen::build(vars.clone()); + let req = client.post(&client.graphql_url); + let req = req.json(&query); + + let data: cynic::GraphQlResponse = client.json(req).await?; + if let Some(errors) = data.errors { + anyhow::bail!("There were graphql errors. {:?}", errors); + } + let repository = data + .data + .ok_or_else(|| anyhow::anyhow!("No data returned."))? + .repository + .ok_or_else(|| anyhow::anyhow!("No repository."))?; + prs.extend(repository.pull_requests.nodes); + + let page_info = repository.pull_requests.page_info; + if !page_info.has_next_page || page_info.end_cursor.is_none() { + break; + } + vars.after = page_info.end_cursor; + } + + let mut prs_processed: Vec<_> = vec![]; + let _: Vec<_> = prs + .into_iter() + .filter_map(|pr| { + if pr.is_draft { + return None; + } + + // exclude rollup PRs + let labels = pr + .labels + .map(|l| l.nodes) + .unwrap_or_default() + .into_iter() + .map(|node| node.name) + .collect::>(); + if labels.iter().any(|label| label == "rollup") { + return None; + } + + let _: Vec<_> = pr + .assignees + .nodes + .iter() + .map(|user| { + let user_id = user.database_id.expect("checked") as i64; + prs_processed.push(( + User { + login: user.login.clone(), + id: Some(user_id), + }, + pr.number, + )); + }) + .collect(); + Some(true) + }) + .collect(); + prs_processed.sort_by(|a, b| a.0.id.cmp(&b.0.id)); + + Ok(prs_processed) +} + pub enum DesignMeetingStatus { Proposed, Scheduled, diff --git a/src/handlers.rs b/src/handlers.rs index 4838760e..d266c181 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -39,6 +39,7 @@ mod notification; mod notify_zulip; mod ping; mod prioritize; +pub mod pull_requests_assignment_update; mod relabel; mod review_requested; mod review_submitted; diff --git a/src/handlers/notification.rs b/src/handlers/notification.rs index 718b2b24..09a775d1 100644 --- a/src/handlers/notification.rs +++ b/src/handlers/notification.rs @@ -100,7 +100,7 @@ pub async fn handle(ctx: &Context, event: &Event) -> anyhow::Result<()> { continue; } - if let Err(err) = notifications::record_username(&client, user.id.unwrap(), user.login) + if let Err(err) = notifications::record_username(&client, user.id.unwrap(), &user.login) .await .context("failed to record username") { diff --git a/src/handlers/pull_requests_assignment_update.rs b/src/handlers/pull_requests_assignment_update.rs new file mode 100644 index 00000000..82580d03 --- /dev/null +++ b/src/handlers/pull_requests_assignment_update.rs @@ -0,0 +1,72 @@ +use std::collections::HashMap; + +use crate::db::notifications::record_username; +use crate::github::retrieve_pull_requests; +use crate::jobs::Job; +use anyhow::Context as _; +use async_trait::async_trait; +use tokio_postgres::Client as DbClient; + +pub struct PullRequestAssignmentUpdate; + +#[async_trait] +impl Job for PullRequestAssignmentUpdate { + fn name(&self) -> &'static str { + "pull_request_assignment_update" + } + + async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> { + let db = ctx.db.get().await; + let gh = &ctx.github; + + tracing::trace!("starting pull_request_assignment_update"); + + let rust_repo = gh.repository("rust-lang/rust").await?; + let prs = retrieve_pull_requests(&rust_repo, &gh).await?; + + // delete all PR assignments before populating + init_table(&db).await?; + + // aggregate by user first + let aggregated = prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| { + let (_, prs) = acc + .entry(user.id.unwrap()) + .or_insert_with(|| (user, Vec::new())); + prs.push(pr); + acc + }); + + // populate the table + for (_user_id, (assignee, prs)) in &aggregated { + let assignee_id = assignee.id.expect("checked"); + let _ = record_username(&db, assignee_id, &assignee.login).await; + create_team_member_workqueue(&db, assignee_id, &prs).await?; + } + + Ok(()) + } +} + +/// Truncate the review prefs table +async fn init_table(db: &DbClient) -> anyhow::Result { + let res = db + .execute("UPDATE review_prefs SET assigned_prs='{}';", &[]) + .await?; + Ok(res) +} + +/// Create a team member work queue +async fn create_team_member_workqueue( + db: &DbClient, + user_id: i64, + prs: &Vec, +) -> anyhow::Result { + let q = " +INSERT INTO review_prefs (user_id, assigned_prs) VALUES ($1, $2) +ON CONFLICT (user_id) +DO UPDATE SET assigned_prs = $2 +WHERE review_prefs.user_id=$1"; + db.execute(q, &[&user_id, prs]) + .await + .context("Insert DB error") +} diff --git a/src/jobs.rs b/src/jobs.rs index 15d440d1..e86d68b9 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -58,12 +58,12 @@ use crate::{ }, }; -// How often new cron-based jobs will be placed in the queue. -// This is the minimum period *between* a single cron task's executions. +/// How often new cron-based jobs will be placed in the queue. +/// This is the minimum period *between* a single cron task's executions. pub const JOB_SCHEDULING_CADENCE_IN_SECS: u64 = 1800; -// How often the database is inspected for jobs which need to execute. -// This is the granularity at which events will occur. +/// How often the database is inspected for jobs which need to execute. +/// This is the granularity at which events will occur. pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60; // The default jobs to schedule, repeatedly. @@ -119,7 +119,7 @@ fn jobs_defined() { unique_all_job_names.dedup(); assert_eq!(all_job_names, unique_all_job_names); - // Also ensure that our defalt jobs are release jobs + // Also ensure that our default jobs are release jobs let default_jobs = default_jobs(); default_jobs .iter() diff --git a/src/main.rs b/src/main.rs index 444ada99..c1767857 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,9 @@ use tokio::{task, time}; use tower::{Service, ServiceExt}; use tracing as log; use tracing::Instrument; +use triagebot::handlers::pull_requests_assignment_update::PullRequestAssignmentUpdate; use triagebot::jobs::{ - default_jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS, + default_jobs, Job, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS, }; use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName}; @@ -261,6 +262,14 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> { octocrab: oc, }); + // Run all jobs that don't have a schedule (one-off jobs) + // TODO: Ideally JobSchedule.schedule should become an `Option` + // and here we run all those with schedule=None + if !is_scheduled_jobs_disabled() { + spawn_job_oneoffs(ctx.clone()).await; + } + + // Run all jobs that have a schedule (recurring jobs) if !is_scheduled_jobs_disabled() { spawn_job_scheduler(); spawn_job_runner(ctx.clone()); @@ -310,6 +319,35 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> { Ok(()) } +/// Spawns a background tokio task which runs all jobs having no schedule +/// i.e. manually executed at the end of the triagebot startup +// - jobs are not guaranteed to start in sequence (care is to be taken to ensure thet are completely independent one from the other) +// - the delay between jobs start is not guaranteed to be precise +async fn spawn_job_oneoffs(ctx: Arc) { + let jobs: Vec> = vec![Box::new(PullRequestAssignmentUpdate)]; + + for (idx, job) in jobs.into_iter().enumerate() { + let ctx = ctx.clone(); + task::spawn(async move { + // Allow some spacing between starting jobs + let delay = idx as u64 * 2; + time::sleep(time::Duration::from_secs(delay)).await; + match job.run(&ctx, &serde_json::Value::Null).await { + Ok(_) => { + log::trace!("job successfully executed (name={})", &job.name()); + } + Err(e) => { + log::error!( + "job failed on execution (name={:?}, error={:?})", + job.name(), + e + ); + } + } + }); + } +} + /// Spawns a background tokio task which runs continuously to queue up jobs /// to be run by the job runner. ///