diff --git a/aide-de-camp-sqlite-bench/src/main.rs b/aide-de-camp-sqlite-bench/src/main.rs index ddf1130..3e1bb01 100644 --- a/aide-de-camp-sqlite-bench/src/main.rs +++ b/aide-de-camp-sqlite-bench/src/main.rs @@ -5,7 +5,7 @@ use aide_de_camp::core::{Duration, Utc}; use aide_de_camp::runner::job_router::RunnerRouter; use aide_de_camp::runner::job_runner::JobRunner; use aide_de_camp_sqlite::queue::SqliteQueue; -use aide_de_camp_sqlite::SCHEMA_SQL; +use aide_de_camp_sqlite::MIGRATOR; use anyhow::anyhow; use async_trait::async_trait; use bincode::{Decode, Encode}; @@ -68,18 +68,17 @@ impl JobProcessor for BenchJob { async fn make_pool() -> SqlitePool { let pool = SqlitePool::connect(":memory:").await.unwrap(); - { - let mut tx = pool.begin().await.unwrap(); - sqlx::query(SCHEMA_SQL).execute(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - } + MIGRATOR.run(&pool).await.unwrap(); pool } async fn schedule_tasks(count: usize, interval: std::time::Duration, queue: Arc) { let mut delay = tokio::time::interval(interval); for _ in 0..count { delay.tick().await; - if let Err(e) = queue.schedule::(BenchJobPayload::default()).await { + if let Err(e) = queue + .schedule::(BenchJobPayload::default(), 0) + .await + { eprintln!("Failed to schedule job: {}", e); } } diff --git a/aide-de-camp-sqlite/Cargo.toml b/aide-de-camp-sqlite/Cargo.toml index 05b728b..2aaa4c6 100644 --- a/aide-de-camp-sqlite/Cargo.toml +++ b/aide-de-camp-sqlite/Cargo.toml @@ -14,7 +14,7 @@ authors = ["Andrey Snow "] [dependencies] aide-de-camp = { path = "../aide-de-camp", version = "0.1.1", features = ["runner"] } -sqlx = {version = "0.6.2", default-features = false , features = ["sqlite", "macros", "chrono", "offline"]} +sqlx = {version = "0.6.2", default-features = false , features = ["sqlite", "macros", "chrono", "offline", "migrate"]} bincode = "2.0.0-rc.1" tracing = "0.1.30" async-trait = "0.1.52" diff --git a/aide-de-camp-sqlite/README.md b/aide-de-camp-sqlite/README.md index 4d56bf8..f810bdd 100644 --- a/aide-de-camp-sqlite/README.md +++ b/aide-de-camp-sqlite/README.md @@ -2,10 +2,10 @@ A SQLite backed implementation of the job Queue. -NOTE: It is possible that a single job gets sent to two runners. This is due to SQLite lacking +**NOTE**: It is possible that a single job gets sent to two runners. This is due to SQLite lacking row locking and `BEGIN EXCLUSIVE TRANSACTION` not working well (it was very slow) for this use case. This is only an issue at high concurrency, in which case you probably don't want to use -SQLite in the first place. In other words, this isn't "Exactly Once" kind of queue. +SQLite in the first place. In other words, this isn't “Exactly Once” kind of queue. ## Schema @@ -18,7 +18,8 @@ CREATE TABLE IF NOT EXISTS adc_queue ( retries int not null default 0, scheduled_at INTEGER not null, started_at INTEGER, - enqueued_at INTEGER not null default (strftime('%s', 'now')) + enqueued_at INTEGER not null default (strftime('%s', 'now')), + priority TINYINT not null default 0, ); CREATE TABLE IF NOT EXISTS adc_dead_queue ( @@ -30,7 +31,8 @@ CREATE TABLE IF NOT EXISTS adc_dead_queue ( scheduled_at INTEGER not null, started_at INTEGER not null, enqueued_at INTEGER not null, - died_at INTEGER not null default (strftime('%s', 'now')) + died_at INTEGER not null default (strftime('%s', 'now')), + priority TINYINT not null default 0, ); CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue ( @@ -41,13 +43,15 @@ CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue ( ); ``` -Schema also included into this crate as `SCHEMA_SQL` constant in this crate. +Crate includes [SQLx `MIGRATOR`](https://docs.rs/sqlx/0.4.0-beta.1/sqlx/macro.migrate.html) that could be used to manage schema. + +**NOTE:** [SQLx doesn't support](https://github.com/launchbadge/sqlx/issues/1698) multiple migrators in the same database. That means ADC should either have dedicated schema/database or it's your responsibility to apply migrations. ## Example ```rust -use aide_de_camp_sqlite::{SqliteQueue, SCHEMA_SQL}; +use aide_de_camp_sqlite::{SqliteQueue, MIGRATOR}; use aide_de_camp::prelude::{Queue, JobProcessor, JobRunner, RunnerRouter, Duration, Xid}; use async_trait::async_trait; use sqlx::SqlitePool; @@ -74,7 +78,7 @@ async fn main() -> Result<(), Box> { let pool = SqlitePool::connect(":memory:").await?; // Setup schema, alternatively you can add schema to your migrations. - sqlx::query(SCHEMA_SQL).execute(&pool).await?; + MIGRATOR.run(&pool).await?; let queue = SqliteQueue::with_pool(pool); // Add job the queue to run next diff --git a/aide-de-camp-sqlite/sql/schema.sql b/aide-de-camp-sqlite/migrations/20221015223453_initial_schema.sql similarity index 96% rename from aide-de-camp-sqlite/sql/schema.sql rename to aide-de-camp-sqlite/migrations/20221015223453_initial_schema.sql index 6163eaf..d4fe6ad 100644 --- a/aide-de-camp-sqlite/sql/schema.sql +++ b/aide-de-camp-sqlite/migrations/20221015223453_initial_schema.sql @@ -1,29 +1,29 @@ -CREATE TABLE IF NOT EXISTS adc_queue ( - jid TEXT PRIMARY KEY, - queue TEXT NOT NULL default 'default', - job_type TEXT not null, - payload blob not null, - retries int not null default 0, - scheduled_at INTEGER not null, - started_at INTEGER, - enqueued_at INTEGER not null default (strftime('%s', 'now')) -); - -CREATE TABLE IF NOT EXISTS adc_dead_queue ( - jid TEXT PRIMARY KEY, - queue TEXT NOT NULL, - job_type TEXT not null, - payload blob not null, - retries int not null, - scheduled_at INTEGER not null, - started_at INTEGER not null, - enqueued_at INTEGER not null, - died_at INTEGER not null default (strftime('%s', 'now')) -); - -CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue ( - scheduled_at asc, - started_at asc, - queue, - job_type -); \ No newline at end of file +CREATE TABLE IF NOT EXISTS adc_queue ( + jid TEXT PRIMARY KEY, + queue TEXT NOT NULL default 'default', + job_type TEXT not null, + payload blob not null, + retries int not null default 0, + scheduled_at INTEGER not null, + started_at INTEGER, + enqueued_at INTEGER not null default (strftime('%s', 'now')) +); + +CREATE TABLE IF NOT EXISTS adc_dead_queue ( + jid TEXT PRIMARY KEY, + queue TEXT NOT NULL, + job_type TEXT not null, + payload blob not null, + retries int not null, + scheduled_at INTEGER not null, + started_at INTEGER not null, + enqueued_at INTEGER not null, + died_at INTEGER not null default (strftime('%s', 'now')) +); + +CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue ( + scheduled_at asc, + started_at asc, + queue, + job_type +); diff --git a/aide-de-camp-sqlite/migrations/20221015224053_add_priority.sql b/aide-de-camp-sqlite/migrations/20221015224053_add_priority.sql new file mode 100644 index 0000000..d4f1079 --- /dev/null +++ b/aide-de-camp-sqlite/migrations/20221015224053_add_priority.sql @@ -0,0 +1,2 @@ +ALTER table adc_queue ADD COLUMN priority tinyint default 0; +ALTER table adc_dead_queue ADD COLUMN priority tinyint default 0; \ No newline at end of file diff --git a/aide-de-camp-sqlite/schema.hcl b/aide-de-camp-sqlite/schema.hcl deleted file mode 100644 index 335cc9b..0000000 --- a/aide-de-camp-sqlite/schema.hcl +++ /dev/null @@ -1,92 +0,0 @@ -table "queue" { - schema = schema.main - column "queue" { - null = false - type = text - default = "default" - } - column "job_type" { - null = false - type = text - } - column "payload" { - null = false - type = blob - } - column "retries" { - null = false - type = int - default = 0 - } - column "scheduled_at" { - null = false - type = integer - } - column "started_at" { - null = true - type = integer - } - column "enqueued_at" { - null = false - type = integer - default = sql("strftime('%s', 'now')") - } - column "jid" { - null = true - type = text - } - primary_key { - columns = [column.jid] - } -} -table "dead_queue" { - schema = schema.main - column "queue" { - null = false - type = text - default = "default" - } - column "job_type" { - null = false - type = text - } - column "payload" { - null = false - type = blob - } - column "retries" { - null = false - type = int - default = 0 - } - column "error" { - null = true - type = text - } - column "scheduled_at" { - null = false - type = integer - } - column "started_at" { - null = true - type = integer - } - column "enqueued_at" { - null = false - type = integer - } - column "died_at" { - null = false - type = integer - default = sql("strftime('%s', 'now')") - } - column "jid" { - null = true - type = text - } - primary_key { - columns = [column.jid] - } -} -schema "main" { -} diff --git a/aide-de-camp-sqlite/sqlx-data.json b/aide-de-camp-sqlite/sqlx-data.json index 8500673..81d86db 100644 --- a/aide-de-camp-sqlite/sqlx-data.json +++ b/aide-de-camp-sqlite/sqlx-data.json @@ -1,16 +1,16 @@ { "db": "SQLite", - "2da89a793016c67949a2a228ffebd70a6dd315b20a35787598ac2c10ba78181e": { + "45bd3fdf8cae3adecfb6b2851010f7f9b6e67bb6005598b480b7a1ed59b212db": { "describe": { "columns": [], "nullable": [], "parameters": { - "Right": 4 + "Right": 1 } }, - "query": "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at) VALUES (?1,?2,?3,?4)" + "query": "DELETE FROM adc_queue WHERE jid = ?1" }, - "45bd3fdf8cae3adecfb6b2851010f7f9b6e67bb6005598b480b7a1ed59b212db": { + "52a42447be39002c9ea32ac14a8dff9a9a459c9b16c7c76f6d380c5492b07843": { "describe": { "columns": [], "nullable": [], @@ -18,17 +18,17 @@ "Right": 1 } }, - "query": "DELETE FROM adc_queue WHERE jid = ?1" + "query": "DELETE FROM adc_queue where jid = ?1" }, - "52a42447be39002c9ea32ac14a8dff9a9a459c9b16c7c76f6d380c5492b07843": { + "57789da912edf4d4b8c8fda95fb5ccbb139459bd6fa16ec421bf8f7d9ec209a0": { "describe": { "columns": [], "nullable": [], "parameters": { - "Right": 1 + "Right": 5 } }, - "query": "DELETE FROM adc_queue where jid = ?1" + "query": "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at,priority) VALUES (?1,?2,?3,?4,?5)" }, "7621f919fc2f38bb65606230b43156f609d9a6b625d97ad32c2e0bc6aba7005b": { "describe": { @@ -82,6 +82,11 @@ "name": "enqueued_at", "ordinal": 7, "type_info": "Int64" + }, + { + "name": "priority", + "ordinal": 8, + "type_info": "Int64" } ], "nullable": [ @@ -92,6 +97,7 @@ true, true, true, + true, true ], "parameters": { diff --git a/aide-de-camp-sqlite/src/lib.rs b/aide-de-camp-sqlite/src/lib.rs index 979f572..7ab8ab8 100644 --- a/aide-de-camp-sqlite/src/lib.rs +++ b/aide-de-camp-sqlite/src/lib.rs @@ -5,12 +5,13 @@ pub mod queue; pub mod types; pub use queue::SqliteQueue; -pub const SCHEMA_SQL: &str = include_str!("../sql/schema.sql"); +use sqlx::migrate::Migrator; +pub static MIGRATOR: Migrator = sqlx::migrate!(); #[cfg(test)] mod test { use crate::queue::SqliteQueue; - use crate::SCHEMA_SQL; + use crate::MIGRATOR; use aide_de_camp::core::bincode::{Decode, Encode}; use aide_de_camp::core::job_handle::JobHandle; use aide_de_camp::core::job_processor::JobProcessor; @@ -32,11 +33,7 @@ mod test { async fn make_pool(uri: &str) -> SqlitePool { let pool = SqlitePool::connect(uri).await.unwrap(); - { - let mut tx = pool.begin().await.unwrap(); - sqlx::query(SCHEMA_SQL).execute(&mut tx).await.unwrap(); - tx.commit().await.unwrap(); - } + MIGRATOR.run(&pool).await.unwrap(); pool } @@ -142,7 +139,7 @@ mod test { } // Schedule a job to run now let jid1 = queue - .schedule::(TestPayload1::default()) + .schedule::(TestPayload1::default(), 0) .await .unwrap(); @@ -170,7 +167,7 @@ mod test { // Schedule a job to run now let _jid1 = queue - .schedule::(TestPayload1::default()) + .schedule::(TestPayload1::default(), 0) .await .unwrap(); @@ -195,7 +192,7 @@ mod test { // schedule to run job tomorrow // schedule a job to run now let tomorrow_jid = queue - .schedule_in::(TestPayload1::default(), Duration::days(1)) + .schedule_in::(TestPayload1::default(), Duration::days(1), 0) .await .unwrap(); @@ -207,7 +204,7 @@ mod test { let hour_ago = { Utc::now() - Duration::hours(1) }; let hour_ago_jid = queue - .schedule_at::(TestPayload1::default(), hour_ago) + .schedule_at::(TestPayload1::default(), hour_ago, 0) .await .unwrap(); @@ -241,7 +238,7 @@ mod test { let pool = make_pool(":memory:").await; let queue = SqliteQueue::with_pool(pool); let jid = queue - .schedule::(TestPayload1::default()) + .schedule::(TestPayload1::default(), 0) .await .unwrap(); queue.cancel_job(jid).await.unwrap(); @@ -262,7 +259,10 @@ mod test { let pool = make_pool(":memory:").await; let queue = SqliteQueue::with_pool(pool); let payload = TestPayload1::default(); - let jid = queue.schedule::(payload.clone()).await.unwrap(); + let jid = queue + .schedule::(payload.clone(), 0) + .await + .unwrap(); let deleted_payload = queue.unschedule_job::(jid).await.unwrap(); assert_eq!(payload, deleted_payload); @@ -276,7 +276,7 @@ mod test { let pool = make_pool(":memory:").await; let queue = SqliteQueue::with_pool(pool); let jid = queue - .schedule::(TestPayload1::default()) + .schedule::(TestPayload1::default(), 0) .await .unwrap(); @@ -293,7 +293,10 @@ mod test { let pool = make_pool(":memory:").await; let queue = SqliteQueue::with_pool(pool); let payload = TestPayload1::default(); - let jid = queue.schedule::(payload.clone()).await.unwrap(); + let jid = queue + .schedule::(payload.clone(), 0) + .await + .unwrap(); let _job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap(); @@ -303,4 +306,22 @@ mod test { let ret = queue.unschedule_job::(jid).await; assert!(matches!(ret, Err(QueueError::JobNotFound(_)))); } + #[tokio::test] + async fn priority_polling() { + let pool = make_pool(":memory:").await; + let queue = SqliteQueue::with_pool(pool); + let hour_ago = { Utc::now() - Duration::hours(1) }; + let _hour_ago_jid = queue + .schedule_at::(TestPayload1::default(), hour_ago, 0) + .await + .unwrap(); + + let higher_priority_jid = queue + .schedule_at::(TestPayload1::default(), hour_ago, 3) + .await + .unwrap(); + + let job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap(); + assert_eq!(higher_priority_jid, job.id()); + } } diff --git a/aide-de-camp-sqlite/src/queue.rs b/aide-de-camp-sqlite/src/queue.rs index d0a6d47..2346731 100644 --- a/aide-de-camp-sqlite/src/queue.rs +++ b/aide-de-camp-sqlite/src/queue.rs @@ -35,6 +35,7 @@ impl Queue for SqliteQueue { &self, payload: J::Payload, scheduled_at: DateTime, + priority: i8, ) -> Result where J: JobProcessor + 'static, @@ -48,11 +49,12 @@ impl Queue for SqliteQueue { tracing::Span::current().record("payload_size", payload.len()); sqlx::query!( - "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at) VALUES (?1,?2,?3,?4)", + "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at,priority) VALUES (?1,?2,?3,?4,?5)", jid_string, job_type, payload, - scheduled_at + scheduled_at, + priority ) .execute(&self.pool) .await @@ -81,7 +83,7 @@ impl Queue for SqliteQueue { separated.push_bind(job_type); } } - builder.push(") limit 1) RETURNING *"); + builder.push(") ORDER BY priority DESC LIMIT 1) RETURNING *"); builder.build().bind(now) }; let row = query @@ -114,6 +116,7 @@ impl Queue for SqliteQueue { } } + #[allow(clippy::or_fun_call)] #[instrument(skip_all, err)] async fn unschedule_job(&self, job_id: Xid) -> Result where @@ -130,7 +133,7 @@ impl Queue for SqliteQueue { .await .context("Failed to remove job from the queue")? .map(|row| row.payload.unwrap_or_default()) - .ok_or_else(||QueueError::JobNotFound(job_id))?; + .ok_or(QueueError::JobNotFound(job_id))?; let (decoded, _) = bincode::decode_from_slice(&payload, self.bincode_config)?; Ok(decoded) } diff --git a/aide-de-camp/src/core/queue.rs b/aide-de-camp/src/core/queue.rs index e578e5b..6a1b76f 100644 --- a/aide-de-camp/src/core/queue.rs +++ b/aide-de-camp/src/core/queue.rs @@ -7,8 +7,12 @@ use crate::core::job_handle::JobHandle; use crate::core::job_processor::JobProcessor; use crate::core::{DateTime, Duration, Xid}; -/// An interface to queue implementation. Reponsible for pushing jobs into the queue and pulling +/// An interface to queue implementation. Responsible for pushing jobs into the queue and pulling /// jobs out of the queue. +/// +/// ### Priority +/// +/// When is enqueued one can specify priority. Jobs with higher priority will get polled first even if submitted after lower priority jobs. #[async_trait] pub trait Queue: Send + Sync { type JobHandle: JobHandle; @@ -17,17 +21,18 @@ pub trait Queue: Send + Sync { &self, payload: J::Payload, scheduled_at: DateTime, + priority: i8, ) -> Result where J: JobProcessor + 'static, J::Payload: Encode; /// Schedule a job to run next. Depending on queue backlog this may start running later than you expect. - async fn schedule(&self, payload: J::Payload) -> Result + async fn schedule(&self, payload: J::Payload, priority: i8) -> Result where J: JobProcessor + 'static, J::Payload: Encode, { - self.schedule_at::(payload, Utc::now()).await + self.schedule_at::(payload, Utc::now(), priority).await } /// Schedule a job to run at the future time relative to now. @@ -35,13 +40,14 @@ pub trait Queue: Send + Sync { &self, payload: J::Payload, scheduled_in: Duration, + priority: i8, ) -> Result where J: JobProcessor + 'static, J::Payload: Encode, { let when = Utc::now() + scheduled_in; - self.schedule_at::(payload, when).await + self.schedule_at::(payload, when, priority).await } /// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.