Skip to content

Commit

Permalink
add: arguments example
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Aug 3, 2024
1 parent 17026ca commit cab9998
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ members = [
"examples/redis-with-msg-pack",
"examples/redis-deadpool",
"examples/redis-mq-example",
"examples/cron", "examples/catch-panic", "examples/graceful-shutdown", "examples/unmonitored-worker",
"examples/cron", "examples/catch-panic", "examples/graceful-shutdown", "examples/unmonitored-worker", "examples/fn-args",
]


Expand Down
22 changes: 22 additions & 0 deletions examples/fn-args/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "fn-args"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["limit", "tokio-comp", "catch-panic"] }
apalis-sql = { path = "../../packages/apalis-sql", features = [
"sqlite",
"tokio-comp",
] }
serde = "1"
tracing-subscriber = "0.3.11"
futures = "0.3"
tower = "0.4"


[dependencies.tracing]
default-features = false
version = "0.1"
71 changes: 71 additions & 0 deletions examples/fn-args/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use apalis::{prelude::*, utils::TokioExecutor};
use apalis_sql::{
context::SqlContext,
sqlite::{SqlitePool, SqliteStorage},
};
use serde::{Deserialize, Serialize};
use tracing::info;

#[derive(Debug, Serialize, Deserialize)]
struct SimpleJob {}

// A task can have up to 16 arguments
async fn simple_job(
_: SimpleJob, // Required, must be of the type of the job/message
worker_id: WorkerId, // The worker running the job, added by worker
_worker_ctx: Context<TokioExecutor>, // The worker context, added by worker
_sqlite: Data<SqliteStorage<SimpleJob>>, // The source, added by storage
task_id: Data<TaskId>, // The task id, added by storage
ctx: Data<SqlContext>, // The task context, added by storage
count: Data<Count>, // Our custom data added via layer
) {
// increment the counter
let current = count.fetch_add(1, Ordering::Relaxed);
info!("worker: {worker_id}; task_id: {task_id:?}, ctx: {ctx:?}, count: {current:?}");
}

async fn produce_jobs(storage: &mut SqliteStorage<SimpleJob>) {
for _ in 0..10 {
storage.push(SimpleJob {}).await.unwrap();
}
}

#[derive(Clone, Debug, Default)]
struct Count(Arc<AtomicUsize>);

impl Deref for Count {
type Target = Arc<AtomicUsize>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let mut sqlite: SqliteStorage<SimpleJob> = SqliteStorage::new(pool);
produce_jobs(&mut sqlite).await;
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.data(Count::default())
.backend(sqlite)
.build_fn(simple_job)
})
.run()
.await?;
Ok(())
}
17 changes: 1 addition & 16 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
ops::Deref,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use apalis::layers::limit::RateLimitLayer;
Expand All @@ -25,16 +21,6 @@ async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
Ok(())
}

#[derive(Clone, Debug, Default)]
struct Count(Arc<AtomicUsize>);

impl Deref for Count {
type Target = Arc<AtomicUsize>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");
Expand All @@ -50,7 +36,6 @@ async fn main() -> Result<()> {
.chain(|svc| svc.map_err(|e| Error::Failed(Arc::new(e))))
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
.backend(storage)
.build_fn(send_email);

Expand Down

0 comments on commit cab9998

Please sign in to comment.