Skip to content

Commit

Permalink
feat: Add support for job priority
Browse files Browse the repository at this point in the history
Allows specifying priority when job is enqueued.

Closes #7

BREAKING CHANGE:

 - `SCHEMA_SQL` is gone. `MIGRATOR` should be used instead. This means
   ADC must have exclusive rights to SQLx migrations table. Since it's
   rarely the case - use different schema/database for ADC.
 - Interface to Queue for pushing jobs includes `priority` argument
  • Loading branch information
andoriyu committed Oct 15, 2022
1 parent bac983d commit 4f0f970
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 167 deletions.
13 changes: 6 additions & 7 deletions aide-de-camp-sqlite-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use aide_de_camp::core::{Duration, Utc};
use aide_de_camp::runner::job_router::RunnerRouter;
use aide_de_camp::runner::job_runner::JobRunner;
use aide_de_camp_sqlite::queue::SqliteQueue;
use aide_de_camp_sqlite::SCHEMA_SQL;
use aide_de_camp_sqlite::MIGRATOR;
use anyhow::anyhow;
use async_trait::async_trait;
use bincode::{Decode, Encode};
Expand Down Expand Up @@ -68,18 +68,17 @@ impl JobProcessor for BenchJob {

async fn make_pool() -> SqlitePool {
let pool = SqlitePool::connect(":memory:").await.unwrap();
{
let mut tx = pool.begin().await.unwrap();
sqlx::query(SCHEMA_SQL).execute(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
MIGRATOR.run(&pool).await.unwrap();
pool
}
async fn schedule_tasks(count: usize, interval: std::time::Duration, queue: Arc<SqliteQueue>) {
let mut delay = tokio::time::interval(interval);
for _ in 0..count {
delay.tick().await;
if let Err(e) = queue.schedule::<BenchJob>(BenchJobPayload::default()).await {
if let Err(e) = queue
.schedule::<BenchJob>(BenchJobPayload::default(), 0)
.await
{
eprintln!("Failed to schedule job: {}", e);
}
}
Expand Down
2 changes: 1 addition & 1 deletion aide-de-camp-sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ authors = ["Andrey Snow <andoriyu@gmail.com>"]

[dependencies]
aide-de-camp = { path = "../aide-de-camp", version = "0.1.1", features = ["runner"] }
sqlx = {version = "0.6.2", default-features = false , features = ["sqlite", "macros", "chrono", "offline"]}
sqlx = {version = "0.6.2", default-features = false , features = ["sqlite", "macros", "chrono", "offline", "migrate"]}
bincode = "2.0.0-rc.1"
tracing = "0.1.30"
async-trait = "0.1.52"
Expand Down
18 changes: 11 additions & 7 deletions aide-de-camp-sqlite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

A SQLite backed implementation of the job Queue.

NOTE: It is possible that a single job gets sent to two runners. This is due to SQLite lacking
**NOTE**: It is possible that a single job gets sent to two runners. This is due to SQLite lacking
row locking and `BEGIN EXCLUSIVE TRANSACTION` not working well (it was very slow) for this use
case. This is only an issue at high concurrency, in which case you probably don't want to use
SQLite in the first place. In other words, this isn't "Exactly Once" kind of queue.
SQLite in the first place. In other words, this isn't Exactly Once kind of queue.

## Schema

Expand All @@ -18,7 +18,8 @@ CREATE TABLE IF NOT EXISTS adc_queue (
retries int not null default 0,
scheduled_at INTEGER not null,
started_at INTEGER,
enqueued_at INTEGER not null default (strftime('%s', 'now'))
enqueued_at INTEGER not null default (strftime('%s', 'now')),
priority TINYINT not null default 0,
);

CREATE TABLE IF NOT EXISTS adc_dead_queue (
Expand All @@ -30,7 +31,8 @@ CREATE TABLE IF NOT EXISTS adc_dead_queue (
scheduled_at INTEGER not null,
started_at INTEGER not null,
enqueued_at INTEGER not null,
died_at INTEGER not null default (strftime('%s', 'now'))
died_at INTEGER not null default (strftime('%s', 'now')),
priority TINYINT not null default 0,
);

CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
Expand All @@ -41,13 +43,15 @@ CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
);
```

Schema also included into this crate as `SCHEMA_SQL` constant in this crate.
Crate includes [SQLx `MIGRATOR`](https://docs.rs/sqlx/0.4.0-beta.1/sqlx/macro.migrate.html) that could be used to manage schema.

**NOTE:** [SQLx doesn't support](https://github.com/launchbadge/sqlx/issues/1698) multiple migrators in the same database. That means ADC should either have dedicated schema/database or it's your responsibility to apply migrations.

## Example

```rust

use aide_de_camp_sqlite::{SqliteQueue, SCHEMA_SQL};
use aide_de_camp_sqlite::{SqliteQueue, MIGRATOR};
use aide_de_camp::prelude::{Queue, JobProcessor, JobRunner, RunnerRouter, Duration, Xid};
use async_trait::async_trait;
use sqlx::SqlitePool;
Expand All @@ -74,7 +78,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let pool = SqlitePool::connect(":memory:").await?;
// Setup schema, alternatively you can add schema to your migrations.
sqlx::query(SCHEMA_SQL).execute(&pool).await?;
MIGRATOR.run(&pool).await?;
let queue = SqliteQueue::with_pool(pool);

// Add job the queue to run next
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
CREATE TABLE IF NOT EXISTS adc_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL default 'default',
job_type TEXT not null,
payload blob not null,
retries int not null default 0,
scheduled_at INTEGER not null,
started_at INTEGER,
enqueued_at INTEGER not null default (strftime('%s', 'now'))
);

CREATE TABLE IF NOT EXISTS adc_dead_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL,
job_type TEXT not null,
payload blob not null,
retries int not null,
scheduled_at INTEGER not null,
started_at INTEGER not null,
enqueued_at INTEGER not null,
died_at INTEGER not null default (strftime('%s', 'now'))
);

CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
scheduled_at asc,
started_at asc,
queue,
job_type
);
CREATE TABLE IF NOT EXISTS adc_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL default 'default',
job_type TEXT not null,
payload blob not null,
retries int not null default 0,
scheduled_at INTEGER not null,
started_at INTEGER,
enqueued_at INTEGER not null default (strftime('%s', 'now'))
);

CREATE TABLE IF NOT EXISTS adc_dead_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL,
job_type TEXT not null,
payload blob not null,
retries int not null,
scheduled_at INTEGER not null,
started_at INTEGER not null,
enqueued_at INTEGER not null,
died_at INTEGER not null default (strftime('%s', 'now'))
);

CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
scheduled_at asc,
started_at asc,
queue,
job_type
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER table adc_queue ADD COLUMN priority tinyint default 0;
ALTER table adc_dead_queue ADD COLUMN priority tinyint default 0;
92 changes: 0 additions & 92 deletions aide-de-camp-sqlite/schema.hcl

This file was deleted.

22 changes: 14 additions & 8 deletions aide-de-camp-sqlite/sqlx-data.json
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
{
"db": "SQLite",
"2da89a793016c67949a2a228ffebd70a6dd315b20a35787598ac2c10ba78181e": {
"45bd3fdf8cae3adecfb6b2851010f7f9b6e67bb6005598b480b7a1ed59b212db": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Right": 4
"Right": 1
}
},
"query": "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at) VALUES (?1,?2,?3,?4)"
"query": "DELETE FROM adc_queue WHERE jid = ?1"
},
"45bd3fdf8cae3adecfb6b2851010f7f9b6e67bb6005598b480b7a1ed59b212db": {
"52a42447be39002c9ea32ac14a8dff9a9a459c9b16c7c76f6d380c5492b07843": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Right": 1
}
},
"query": "DELETE FROM adc_queue WHERE jid = ?1"
"query": "DELETE FROM adc_queue where jid = ?1"
},
"52a42447be39002c9ea32ac14a8dff9a9a459c9b16c7c76f6d380c5492b07843": {
"57789da912edf4d4b8c8fda95fb5ccbb139459bd6fa16ec421bf8f7d9ec209a0": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Right": 1
"Right": 5
}
},
"query": "DELETE FROM adc_queue where jid = ?1"
"query": "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at,priority) VALUES (?1,?2,?3,?4,?5)"
},
"7621f919fc2f38bb65606230b43156f609d9a6b625d97ad32c2e0bc6aba7005b": {
"describe": {
Expand Down Expand Up @@ -82,6 +82,11 @@
"name": "enqueued_at",
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "priority",
"ordinal": 8,
"type_info": "Int64"
}
],
"nullable": [
Expand All @@ -92,6 +97,7 @@
true,
true,
true,
true,
true
],
"parameters": {
Expand Down
Loading

0 comments on commit 4f0f970

Please sign in to comment.