Skip to content

Commit

Permalink
Chore/more examples (#389)
Browse files Browse the repository at this point in the history
* add: catch-panic example

* add: graceful shutdown example

* add: unmonitored example

* add: arguments example

* fix: minor updates

* fix: sql tests

* fix: minor updates
  • Loading branch information
geofmureithi authored Aug 3, 2024
1 parent e7a751c commit 39fd007
Show file tree
Hide file tree
Showing 21 changed files with 336 additions and 46 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ limit = ["tower/limit"]
## Support filtering jobs based on a predicate
filter = ["tower/filter"]
## Captures panics in executions and convert them to errors
catch-panic = ["dep:backtrace"]
catch-panic = []
## Compatibility with async-std and smol runtimes
async-std-comp = ["async-std"]
## Compatibility with tokio and actix runtimes
Expand Down Expand Up @@ -120,7 +120,7 @@ members = [
"examples/redis-with-msg-pack",
"examples/redis-deadpool",
"examples/redis-mq-example",
"examples/cron",
"examples/cron", "examples/catch-panic", "examples/graceful-shutdown", "examples/unmonitored-worker", "examples/fn-args",
]


Expand All @@ -141,7 +141,6 @@ pin-project-lite = "0.2.14"
uuid = { version = "1.8", optional = true }
ulid = { version = "1", optional = true }
serde = { version = "1.0", features = ["derive"] }
backtrace = { version = "0.3", optional = true }

[dependencies.tracing]
default-features = false
Expand Down
24 changes: 24 additions & 0 deletions examples/catch-panic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "catch-panic"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
anyhow = "1"
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["limit", "tracing", "tokio-comp", "catch-panic"] }
apalis-sql = { path = "../../packages/apalis-sql", features = ["sqlite"] }
serde = { version = "1", features = ["derive"] }
tracing-subscriber = "0.3.11"
email-service = { path = "../email-service" }


[dependencies.tracing]
default-features = false
version = "0.1"

[dependencies.sqlx]
version = "0.8"
default-features = false
features = ["sqlite", "runtime-tokio"]
54 changes: 54 additions & 0 deletions examples/catch-panic/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use anyhow::Result;
use apalis::layers::catch_panic::CatchPanicLayer;
use apalis::utils::TokioExecutor;
use apalis::{layers::tracing::TraceLayer, prelude::*};
use apalis_sql::sqlite::SqliteStorage;

use email_service::Email;
use sqlx::SqlitePool;

async fn produce_emails(storage: &mut SqliteStorage<Email>) -> Result<()> {
for i in 0..2 {
storage
.push(Email {
to: format!("test{i}@example.com"),
text: "Test background job from apalis".to_string(),
subject: "Background email job".to_string(),
})
.await?;
}
Ok(())
}

async fn send_email(_: Email) {
unimplemented!("panic from unimplemented")
}

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=info");
tracing_subscriber::fmt::init();

let pool = SqlitePool::connect("sqlite::memory:").await?;
// Do migrations: Mainly for "sqlite::memory:"
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");

let mut email_storage: SqliteStorage<Email> = SqliteStorage::new(pool.clone());

produce_emails(&mut email_storage).await?;

Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.layer(CatchPanicLayer::new())
.layer(TraceLayer::new())
.backend(email_storage)
.build_fn(send_email)
})
.on_event(|e| tracing::info!("{e:?}"))
.run()
.await?;
Ok(())
}
4 changes: 3 additions & 1 deletion examples/email-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub async fn send_email(job: Email) -> Result<(), Error> {
}
Err(email_address::Error::InvalidCharacter) => {
log::error!("Killed send email job. Invalid character {}", job.to);
Err(Error::Abort(String::from("Invalid character. Job killed")))
Err(Error::Abort(Arc::new(Box::new(
email_address::Error::InvalidCharacter,
))))
}
Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))),
}
Expand Down
22 changes: 22 additions & 0 deletions examples/fn-args/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "fn-args"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["limit", "tokio-comp", "catch-panic"] }
apalis-sql = { path = "../../packages/apalis-sql", features = [
"sqlite",
"tokio-comp",
] }
serde = "1"
tracing-subscriber = "0.3.11"
futures = "0.3"
tower = "0.4"


[dependencies.tracing]
default-features = false
version = "0.1"
71 changes: 71 additions & 0 deletions examples/fn-args/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use apalis::{prelude::*, utils::TokioExecutor};
use apalis_sql::{
context::SqlContext,
sqlite::{SqlitePool, SqliteStorage},
};
use serde::{Deserialize, Serialize};
use tracing::info;

#[derive(Debug, Serialize, Deserialize)]
struct SimpleJob {}

// A task can have up to 16 arguments
async fn simple_job(
_: SimpleJob, // Required, must be of the type of the job/message
worker_id: WorkerId, // The worker running the job, added by worker
_worker_ctx: Context<TokioExecutor>, // The worker context, added by worker
_sqlite: Data<SqliteStorage<SimpleJob>>, // The source, added by storage
task_id: Data<TaskId>, // The task id, added by storage
ctx: Data<SqlContext>, // The task context, added by storage
count: Data<Count>, // Our custom data added via layer
) {
// increment the counter
let current = count.fetch_add(1, Ordering::Relaxed);
info!("worker: {worker_id}; task_id: {task_id:?}, ctx: {ctx:?}, count: {current:?}");
}

async fn produce_jobs(storage: &mut SqliteStorage<SimpleJob>) {
for _ in 0..10 {
storage.push(SimpleJob {}).await.unwrap();
}
}

#[derive(Clone, Debug, Default)]
struct Count(Arc<AtomicUsize>);

impl Deref for Count {
type Target = Arc<AtomicUsize>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let mut sqlite: SqliteStorage<SimpleJob> = SqliteStorage::new(pool);
produce_jobs(&mut sqlite).await;
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.data(Count::default())
.backend(sqlite)
.build_fn(simple_job)
})
.run()
.await?;
Ok(())
}
20 changes: 20 additions & 0 deletions examples/graceful-shutdown/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "graceful-shutdown"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
thiserror = "1"
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["limit", "tokio-comp", "catch-panic"] }
apalis-sql = { path = "../../packages/apalis-sql", features = ["sqlite", "tokio-comp"]}
serde = "1"
tracing-subscriber = "0.3.11"
futures = "0.3"
tower = "0.4"


[dependencies.tracing]
default-features = false
version = "0.1"
48 changes: 48 additions & 0 deletions examples/graceful-shutdown/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::time::Duration;

use apalis::{prelude::*, utils::TokioExecutor};
use apalis_sql::sqlite::{SqlitePool, SqliteStorage};
use serde::{Deserialize, Serialize};
use tracing::info;

#[derive(Debug, Serialize, Deserialize)]
struct LongRunningJob {}

async fn long_running_task(_task: LongRunningJob, worker_ctx: Context<TokioExecutor>) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Do some hard thing
info!("is_shutting_down: {}", worker_ctx.is_shutting_down(),);
if worker_ctx.is_shutting_down() {
info!("saving the job state");
break;
}
}
}

async fn produce_jobs(storage: &mut SqliteStorage<LongRunningJob>) {
storage.push(LongRunningJob {}).await.unwrap();
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let mut sqlite: SqliteStorage<LongRunningJob> = SqliteStorage::new(pool);
produce_jobs(&mut sqlite).await;
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.backend(sqlite)
.build_fn(long_running_task)
})
// Wait 10 seconds after shutdown is triggered to allow any incomplete jobs to complete
.shutdown_timeout(Duration::from_secs(10))
// Use .run() if you don't want without signals
.run_with_signal(tokio::signal::ctrl_c()) // This will wait for ctrl+c then gracefully shutdown
.await?;
Ok(())
}
4 changes: 2 additions & 2 deletions examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<T, C> Clone for RedisMq<T, C> {
conn: self.conn.clone(),
msg_type: PhantomData,
config: self.config.clone(),
codec: self.codec.clone(),
codec: self.codec,
}
}
}
Expand Down Expand Up @@ -86,7 +86,7 @@ where
_res: &Result<Res, apalis_core::error::Error>,
) -> Result<(), Self::AckError> {
self.conn
.delete_message(self.config.get_namespace(), &ctx)
.delete_message(self.config.get_namespace(), ctx)
.await?;
Ok(())
}
Expand Down
17 changes: 1 addition & 16 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
ops::Deref,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use apalis::layers::limit::RateLimitLayer;
Expand All @@ -25,16 +21,6 @@ async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
Ok(())
}

#[derive(Clone, Debug, Default)]
struct Count(Arc<AtomicUsize>);

impl Deref for Count {
type Target = Arc<AtomicUsize>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");
Expand All @@ -50,7 +36,6 @@ async fn main() -> Result<()> {
.chain(|svc| svc.map_err(|e| Error::Failed(Arc::new(e))))
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
.backend(storage)
.build_fn(send_email);

Expand Down
22 changes: 22 additions & 0 deletions examples/unmonitored-worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "unmonitored-worker"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["limit", "tokio-comp", "catch-panic"] }
apalis-sql = { path = "../../packages/apalis-sql", features = [
"sqlite",
"tokio-comp",
] }
serde = "1"
tracing-subscriber = "0.3.11"
futures = "0.3"
tower = "0.4"


[dependencies.tracing]
default-features = false
version = "0.1"
41 changes: 41 additions & 0 deletions examples/unmonitored-worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::time::Duration;

use apalis::{prelude::*, utils::TokioExecutor};
use apalis_sql::sqlite::{SqlitePool, SqliteStorage};
use serde::{Deserialize, Serialize};
use tracing::info;

#[derive(Debug, Serialize, Deserialize)]
struct SelfMonitoringJob {}

async fn self_monitoring_task(task: SelfMonitoringJob, worker_ctx: Context<TokioExecutor>) {
info!("task: {:?}, {:?}", task, worker_ctx);
tokio::time::sleep(Duration::from_secs(5)).await; // Do some hard thing
info!("done with task, stopping worker gracefully");
// use worker_ctx.force_stop() to stop immediately
worker_ctx.stop();
}

async fn produce_jobs(storage: &mut SqliteStorage<SelfMonitoringJob>) {
storage.push(SelfMonitoringJob {}).await.unwrap();
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let mut sqlite: SqliteStorage<SelfMonitoringJob> = SqliteStorage::new(pool);
produce_jobs(&mut sqlite).await;

WorkerBuilder::new("tasty-banana")
.backend(sqlite)
.build_fn(self_monitoring_task)
.with_executor(TokioExecutor)
.run()
.await;
Ok(())
}
Loading

0 comments on commit 39fd007

Please sign in to comment.