Skip to content

Commit

Permalink
feat: Make add_job util returns the DbJob
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Feb 6, 2024
1 parent 7ef0bfe commit c216350
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 61 deletions.
4 changes: 2 additions & 2 deletions crates/migrations/src/sql/m000018.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ pub const M000018_MIGRATION: GraphileWorkerMigration = GraphileWorkerMigration {
)
from :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks
left join :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues
on job_queues.queue_name = queue_name
where tasks.identifier = identifier
on job_queues.queue_name = add_job.queue_name
where tasks.identifier = add_job.identifier
on conflict (key)
do update set
revision = jobs.revision + 1,
Expand Down
6 changes: 3 additions & 3 deletions src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use graphile_worker_task_handler::TaskDefinition;
use serde::Serialize;
use sqlx::PgPool;

use crate::{errors::GraphileWorkerError, sql::add_job::add_job, JobSpec, WorkerContext};
use crate::{errors::GraphileWorkerError, sql::add_job::add_job, DbJob, JobSpec, WorkerContext};

/// The WorkerHelpers struct provides a set of methods to add jobs to the queue
pub struct WorkerUtils {
Expand All @@ -25,7 +25,7 @@ impl WorkerUtils {
&self,
payload: T::Payload,
spec: Option<JobSpec>,
) -> Result<(), GraphileWorkerError> {
) -> Result<DbJob, GraphileWorkerError> {
let identifier = T::identifier();
let payload = serde_json::to_value(payload)?;
add_job(
Expand All @@ -45,7 +45,7 @@ impl WorkerUtils {
identifier: &str,
payload: P,
spec: Option<JobSpec>,
) -> Result<(), GraphileWorkerError>
) -> Result<DbJob, GraphileWorkerError>
where
P: Serialize,
{
Expand Down
118 changes: 118 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use chrono::{DateTime, Utc};
use getset::Getters;
use serde_json::Value;
use sqlx::FromRow;

#[derive(FromRow, Getters, Debug)]
#[getset(get = "pub")]
#[allow(dead_code)]
pub struct DbJob {
id: i64,
/// FK to tasks
job_queue_id: Option<i32>,
/// The JSON payload of the job
payload: serde_json::Value,
/// Lower number means it should run sooner
priority: i16,
/// When it was due to run
run_at: DateTime<Utc>,
/// How many times it has been attempted
attempts: i16,
/// The limit for the number of times it should be attempted
max_attempts: i16,
/// If attempts > 0, why did it fail last ?
last_error: Option<String>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
/// "job_key" - unique identifier for easy update from user code
key: Option<String>,
/// A count of the revision numbers
revision: i32,
locked_at: Option<DateTime<Utc>>,
locked_by: Option<String>,
flags: Option<Value>,
/// The task ID of the job
task_id: i32,
}

#[derive(FromRow, Getters, Debug)]
#[getset(get = "pub")]
#[allow(dead_code)]
pub struct Job {
id: i64,
/// FK to tasks
job_queue_id: Option<i32>,
/// The JSON payload of the job
payload: serde_json::Value,
/// Lower number means it should run sooner
priority: i16,
/// When it was due to run
run_at: DateTime<Utc>,
/// How many times it has been attempted
attempts: i16,
/// The limit for the number of times it should be attempted
max_attempts: i16,
/// If attempts > 0, why did it fail last ?
last_error: Option<String>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
/// "job_key" - unique identifier for easy update from user code
key: Option<String>,
/// A count of the revision numbers
revision: i32,
locked_at: Option<DateTime<Utc>>,
locked_by: Option<String>,
flags: Option<Value>,
/// The task ID of the job
task_id: i32,
/// The task identifier of the job, shorcut to task.identifier
task_identifier: String,
}

impl From<DbJob> for Job {
fn from(db_job: DbJob) -> Job {
Job {
id: db_job.id,
job_queue_id: db_job.job_queue_id,
payload: db_job.payload,
priority: db_job.priority,
run_at: db_job.run_at,
attempts: db_job.attempts,
max_attempts: db_job.max_attempts,
last_error: db_job.last_error,
created_at: db_job.created_at,
updated_at: db_job.updated_at,
key: db_job.key,
revision: db_job.revision,
locked_at: db_job.locked_at,
locked_by: db_job.locked_by,
flags: db_job.flags,
task_id: db_job.task_id,
task_identifier: "".to_string(),
}
}
}

impl Job {
pub fn from_db_job(db_job: DbJob, task_identifier: String) -> Job {
Job {
id: db_job.id,
job_queue_id: db_job.job_queue_id,
payload: db_job.payload,
priority: db_job.priority,
run_at: db_job.run_at,
attempts: db_job.attempts,
max_attempts: db_job.max_attempts,
last_error: db_job.last_error,
created_at: db_job.created_at,
updated_at: db_job.updated_at,
key: db_job.key,
revision: db_job.revision,
locked_at: db_job.locked_at,
locked_by: db_job.locked_by,
flags: db_job.flags,
task_id: db_job.task_id,
task_identifier,
}
}
}
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
pub mod builder;
pub mod errors;
pub mod helpers;
pub mod job;
pub mod runner;
pub mod sql;
pub mod streams;
pub mod utils;

pub use crate::sql::add_job::JobSpec;
pub use crate::job::*;
pub use crate::sql::add_job::{JobKeyMode, JobSpec};
pub use graphile_worker_crontab_parser::parse_crontab;
pub use graphile_worker_macros::task;
pub use graphile_worker_task_handler::*;
Expand Down
2 changes: 1 addition & 1 deletion src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{collections::HashMap, time::Instant};

use crate::errors::GraphileWorkerError;
use crate::helpers::WorkerUtils;
use crate::sql::get_job::Job;
use crate::job::Job;
use crate::sql::{get_job::get_job, task_identifiers::TaskDetails};
use crate::streams::{job_signal_stream, job_stream};
use futures::{try_join, StreamExt, TryStreamExt};
Expand Down
12 changes: 6 additions & 6 deletions src/sql/add_job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::errors::GraphileWorkerError;
use crate::{errors::GraphileWorkerError, DbJob};
use chrono::Utc;
use getset::Getters;
use sqlx::{query, PgExecutor};
use sqlx::{query_as, PgExecutor};
use tracing::info;

/// Behavior when an existing job with the same job key is found is controlled by this setting
Expand Down Expand Up @@ -59,7 +59,7 @@ pub async fn add_job(
identifier: &str,
payload: serde_json::Value,
spec: JobSpec,
) -> Result<(), GraphileWorkerError> {
) -> Result<DbJob, GraphileWorkerError> {
let sql = format!(
r#"
select * from {escaped_schema}.add_job(
Expand All @@ -78,7 +78,7 @@ pub async fn add_job(

let job_key_mode = spec.job_key_mode.map(|jkm| jkm.format().to_string());

query(&sql)
let job = query_as(&sql)
.bind(identifier)
.bind(&payload)
.bind(spec.queue_name)
Expand All @@ -88,7 +88,7 @@ pub async fn add_job(
.bind(spec.priority)
.bind(spec.flags)
.bind(job_key_mode)
.execute(executor)
.fetch_one(executor)
.await?;

info!(
Expand All @@ -97,5 +97,5 @@ pub async fn add_job(
"Job added to queue"
);

Ok(())
Ok(job)
}
2 changes: 1 addition & 1 deletion src/sql/complete_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use sqlx::{query, PgExecutor};

use crate::errors::GraphileWorkerError;

use super::get_job::Job;
use crate::Job;

pub async fn complete_job(
executor: impl for<'e> PgExecutor<'e>,
Expand Down
2 changes: 1 addition & 1 deletion src/sql/fail_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use sqlx::{query, PgExecutor};

use crate::errors::GraphileWorkerError;

use super::get_job::Job;
use crate::Job;

pub async fn fail_job(
executor: impl for<'e> PgExecutor<'e>,
Expand Down
50 changes: 11 additions & 39 deletions src/sql/get_job.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,10 @@
use chrono::prelude::*;
use getset::Getters;
use serde_json::Value;
use sqlx::{query_as, FromRow, PgExecutor};
use sqlx::{query_as, PgExecutor};

use crate::errors::Result;
use crate::DbJob;
use crate::{errors::Result, Job};

use super::task_identifiers::TaskDetails;

#[derive(FromRow, Getters, Debug)]
#[getset(get = "pub")]
#[allow(dead_code)]
pub struct Job {
id: i64,
/// FK to tasks
job_queue_id: Option<i32>,
/// The JSON payload of the job
payload: serde_json::Value,
/// Lower number means it should run sooner
priority: i16,
/// When it was due to run
run_at: DateTime<Utc>,
/// How many times it has been attempted
attempts: i16,
/// The limit for the number of times it should be attempted
max_attempts: i16,
/// If attempts > 0, why did it fail last ?
last_error: Option<String>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
/// "job_key" - unique identifier for easy update from user code
key: Option<String>,
/// A count of the revision numbers
revision: i32,
locked_at: Option<DateTime<Utc>>,
locked_by: Option<String>,
flags: Option<Value>,
/// Shorcut to tasks.identifer
task_id: i32,
}

pub async fn get_job<'e>(
executor: impl PgExecutor<'e>,
task_details: &TaskDetails,
Expand Down Expand Up @@ -81,8 +47,14 @@ pub async fn get_job<'e>(
q = q.bind(flags_to_skip);
}

let job = q.fetch_optional(executor).await?;
Ok(job)
let job: Option<DbJob> = q.fetch_optional(executor).await?;
Ok(job.map(|job| {
let task_identifier = task_details
.get(job.task_id())
.map(ToOwned::to_owned)
.unwrap_or_default();
Job::from_db_job(job, task_identifier)
}))
}

fn get_flag_clause(flags_to_skip: &Vec<String>, param_ord: u8) -> String {
Expand Down
6 changes: 2 additions & 4 deletions src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use tracing::error;

use crate::{
errors::Result,
sql::{
get_job::{get_job, Job},
task_identifiers::TaskDetails,
},
sql::{get_job::get_job, task_identifiers::TaskDetails},
Job,
};

#[derive(Debug)]
Expand Down
12 changes: 9 additions & 3 deletions tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use chrono::{DateTime, Utc};
use graphile_worker::sql::add_job::add_job;
use graphile_worker::DbJob;
use graphile_worker::JobSpec;
use graphile_worker::WorkerOptions;
use serde_json::Value;
Expand All @@ -16,7 +17,7 @@ use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

#[derive(FromRow, Debug)]
pub struct Job {
pub struct JobWithQueueName {
pub id: i64,
pub job_queue_id: Option<i32>,
pub task_identifier: String,
Expand Down Expand Up @@ -76,7 +77,7 @@ impl TestDatabase {
.concurrency(4)
}

pub async fn get_jobs(&self) -> Vec<Job> {
pub async fn get_jobs(&self) -> Vec<JobWithQueueName> {
sqlx::query_as(
r#"
select jobs.*, identifier as task_identifier, job_queues.queue_name as queue_name
Expand Down Expand Up @@ -133,7 +134,12 @@ impl TestDatabase {
.expect("Failed to get migrations")
}

pub async fn add_job(&self, identifier: &str, payload: impl serde::Serialize, spec: JobSpec) {
pub async fn add_job(
&self,
identifier: &str,
payload: impl serde::Serialize,
spec: JobSpec,
) -> DbJob {
add_job(
&self.test_pool,
"graphile_worker",
Expand Down
Loading

0 comments on commit c216350

Please sign in to comment.