Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introducing WorkerBuilderExt #428

Merged
merged 10 commits into from
Nov 21, 2024
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ async fn main() -> {
let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
let storage = RedisStorage::new(conn);
Monitor::new()
.register_with_count(2, {
.register({
WorkerBuilder::new(format!("email-worker"))
.concurrency(2)
.data(0usize)
.backend(storage)
.build_fn(send_email)
Expand Down
10 changes: 5 additions & 5 deletions examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use actix_web::rt::signal;
use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::Result;
use apalis::layers::tracing::TraceLayer;
use apalis::prelude::*;
use apalis::utils::TokioExecutor;

use apalis_redis::RedisStorage;
use futures::future;

Expand Down Expand Up @@ -41,10 +40,11 @@ async fn main() -> Result<()> {
.await?;
Ok(())
};
let worker = Monitor::<TokioExecutor>::new()
.register_with_count(2, {
let worker = Monitor::new()
.register({
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.enable_tracing()
// .concurrency(2)
.backend(storage)
.build_fn(send_email)
})
Expand Down
28 changes: 6 additions & 22 deletions examples/async-std-runtime/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{future::Future, str::FromStr, time::Duration};
use std::{str::FromStr, time::Duration};

use anyhow::Result;
use apalis::{
layers::{retry::RetryLayer, retry::RetryPolicy, tracing::MakeSpan, tracing::TraceLayer},
layers::{retry::RetryPolicy, tracing::MakeSpan, tracing::TraceLayer},
prelude::*,
};
use apalis_cron::{CronStream, Schedule};
use chrono::{DateTime, Utc};
use tracing::{debug, info, Instrument, Level, Span};

type WorkerCtx = Data<Context<AsyncStdExecutor>>;
type WorkerCtx = Worker<Context>;

#[derive(Default, Debug, Clone)]
struct Reminder(DateTime<Utc>);
Expand All @@ -26,7 +26,7 @@ async fn send_in_background(reminder: Reminder) {
}
async fn send_reminder(reminder: Reminder, worker: WorkerCtx) -> bool {
// this will happen in the workers background and wont block the next tasks
worker.spawn(send_in_background(reminder).in_current_span());
async_std::task::spawn(worker.track(send_in_background(reminder).in_current_span()));
false
}

Expand All @@ -42,12 +42,12 @@ async fn main() -> Result<()> {

let schedule = Schedule::from_str("1/1 * * * * *").unwrap();
let worker = WorkerBuilder::new("daily-cron-worker")
.layer(RetryLayer::new(RetryPolicy::retries(5)))
.retry(RetryPolicy::retries(5))
.layer(TraceLayer::new().make_span_with(ReminderSpan::new()))
.backend(CronStream::new(schedule))
.build_fn(send_reminder);

Monitor::<AsyncStdExecutor>::new()
Monitor::new()
.register(worker)
.on_event(|e| debug!("Worker event: {e:?}"))
.run_with_signal(async {
Expand All @@ -59,22 +59,6 @@ async fn main() -> Result<()> {
Ok(())
}

#[derive(Clone, Debug, Default)]
pub struct AsyncStdExecutor;

impl AsyncStdExecutor {
/// A new async-std executor
pub fn new() -> Self {
Self
}
}

impl Executor for AsyncStdExecutor {
fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static) {
async_std::task::spawn(fut);
}
}

#[derive(Debug, Clone)]
pub struct ReminderSpan {
level: Level,
Expand Down
6 changes: 3 additions & 3 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! cd examples && cargo run -p axum-example
//! ```
use anyhow::Result;
use apalis::layers::tracing::TraceLayer;

use apalis::prelude::*;
use apalis_redis::RedisStorage;
use axum::{
Expand Down Expand Up @@ -73,10 +73,10 @@ async fn main() -> Result<()> {
.map_err(|e| Error::new(std::io::ErrorKind::Interrupted, e))
};
let monitor = async {
Monitor::<TokioExecutor>::new()
Monitor::new()
.register({
WorkerBuilder::new("tasty-pear")
.layer(TraceLayer::new())
.enable_tracing()
.backend(storage.clone())
.build_fn(send_email)
})
Expand Down
1 change: 1 addition & 0 deletions examples/basics/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tower::{Layer, Service};
use tracing::info;

/// A layer that logs a job info before it starts
#[derive(Debug, Clone)]
pub struct LogLayer {
target: &'static str,
}
Expand Down
30 changes: 16 additions & 14 deletions examples/basics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ mod service;

use std::{sync::Arc, time::Duration};

use apalis::{
layers::{catch_panic::CatchPanicLayer, tracing::TraceLayer},
prelude::*,
};
use apalis::{layers::catch_panic::CatchPanicLayer, prelude::*};
use apalis_sql::sqlite::{SqlitePool, SqliteStorage};

use email_service::Email;
use layer::LogLayer;

use tracing::{log::info, Instrument, Span};

type WorkerCtx = Context<TokioExecutor>;
type WorkerCtx = Context;

use crate::{cache::ValidEmailCache, service::EmailService};

Expand Down Expand Up @@ -72,14 +69,16 @@ async fn send_email(
// This can be important for starting long running jobs that don't block the queue
// Its also possible to acquire context types and clone them into the futures context.
// They will also be gracefully shutdown if [`Monitor`] has a shutdown signal
worker_ctx.spawn(
async move {
if cache::fetch_validity(email_to, &cache_clone).await {
svc.send(email).await;
info!("Email added to cache")
tokio::spawn(
worker_ctx.track(
async move {
if cache::fetch_validity(email_to, &cache_clone).await {
svc.send(email).await;
info!("Email added to cache")
}
}
}
.instrument(Span::current()), // Its still gonna use the jobs current tracing span. Important eg using sentry.
.instrument(Span::current()),
), // Its still gonna use the jobs current tracing span. Important eg using sentry.
);
}

Expand All @@ -102,10 +101,12 @@ async fn main() -> Result<(), std::io::Error> {
let sqlite: SqliteStorage<Email> = SqliteStorage::new(pool);
produce_jobs(&sqlite).await;

Monitor::<TokioExecutor>::new()
Monitor::new()
.register({
WorkerBuilder::new("tasty-banana")
// This handles any panics that may occur in any of the layers below
// .catch_panic()
// Or just to customize
.layer(CatchPanicLayer::with_panic_handler(|e| {
let panic_info = if let Some(s) = e.downcast_ref::<&str>() {
s.to_string()
Expand All @@ -114,9 +115,10 @@ async fn main() -> Result<(), std::io::Error> {
} else {
"Unknown panic".to_string()
};
// Abort tells the backend to kill job
Error::Abort(Arc::new(Box::new(PanicError::Panic(panic_info))))
}))
.layer(TraceLayer::new())
.enable_tracing()
.layer(LogLayer::new("some-log-example"))
// Add shared context to all jobs executed by this worker
.data(EmailService::new())
Expand Down
14 changes: 7 additions & 7 deletions examples/catch-panic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::Result;
use apalis::layers::catch_panic::CatchPanicLayer;
use apalis::utils::TokioExecutor;
use apalis::{layers::tracing::TraceLayer, prelude::*};
use apalis::prelude::*;

use apalis_sql::sqlite::SqliteStorage;

use email_service::Email;
Expand Down Expand Up @@ -39,11 +38,12 @@ async fn main() -> Result<()> {

produce_emails(&mut email_storage).await?;

Monitor::<TokioExecutor>::new()
.register_with_count(2, {
Monitor::new()
.register({
WorkerBuilder::new("tasty-banana")
.layer(CatchPanicLayer::new())
.layer(TraceLayer::new())
.catch_panic()
.enable_tracing()
.concurrency(2)
.backend(email_storage)
.build_fn(send_email)
})
Expand Down
15 changes: 5 additions & 10 deletions examples/cron/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use apalis::layers::tracing::TraceLayer;
use apalis::prelude::*;
use apalis::utils::TokioExecutor;

use apalis_cron::CronStream;
use apalis_cron::Schedule;
use chrono::{DateTime, Utc};
use std::str::FromStr;
use std::time::Duration;
use tower::limit::RateLimitLayer;
// use std::time::Duration;
use tower::load_shed::LoadShedLayer;

#[derive(Clone)]
Expand All @@ -32,15 +31,11 @@ async fn send_reminder(job: Reminder, svc: Data<FakeService>) {
async fn main() {
let schedule = Schedule::from_str("1/1 * * * * *").unwrap();
let worker = WorkerBuilder::new("morning-cereal")
.layer(TraceLayer::new())
.enable_tracing()
.layer(LoadShedLayer::new()) // Important when you have layers that block the service
.layer(RateLimitLayer::new(1, Duration::from_secs(2)))
.rate_limit(1, Duration::from_secs(2))
.data(FakeService)
.backend(CronStream::new(schedule))
.build_fn(send_reminder);
Monitor::<TokioExecutor>::new()
.register_with_count(2, worker)
.run()
.await
.unwrap();
Monitor::new().register(worker).run().await.unwrap();
}
22 changes: 11 additions & 11 deletions examples/fn-args/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
},
};

use apalis::{prelude::*, utils::TokioExecutor};
use apalis::prelude::*;
use apalis_sql::{
context::SqlContext,
sqlite::{SqlitePool, SqliteStorage},
Expand All @@ -19,18 +19,17 @@ struct SimpleJob {}

// A task can have up to 16 arguments
async fn simple_job(
_: SimpleJob, // Required, must be of the type of the job/message
worker_id: Data<WorkerId>, // The worker running the job, added by worker
_worker_ctx: Data<Context<TokioExecutor>>, // The worker context, added by worker
_: SimpleJob, // Required, must be of the type of the job/message
worker: Worker<Context>, // The worker and its context, added by worker
_sqlite: Data<SqliteStorage<SimpleJob>>, // The source, added by storage
task_id: TaskId, // The task id, added by storage
attempt: Attempt, // The current attempt
ctx: SqlContext, // The task context provided by the backend
count: Data<Count>, // Our custom data added via layer
task_id: TaskId, // The task id, added by storage
attempt: Attempt, // The current attempt
ctx: SqlContext, // The task context provided by the backend
count: Data<Count>, // Our custom data added via layer
) {
// increment the counter
let current = count.fetch_add(1, Ordering::Relaxed);
info!("worker: {worker_id:?}; task_id: {task_id:?}, ctx: {ctx:?}, attempt:{attempt:?} count: {current:?}");
info!("worker: {worker:?}; task_id: {task_id:?}, ctx: {ctx:?}, attempt:{attempt:?} count: {current:?}");
}

async fn produce_jobs(storage: &mut SqliteStorage<SimpleJob>) {
Expand Down Expand Up @@ -59,11 +58,12 @@ async fn main() -> Result<(), std::io::Error> {
.expect("unable to run migrations for sqlite");
let mut sqlite: SqliteStorage<SimpleJob> = SqliteStorage::new(pool);
produce_jobs(&mut sqlite).await;
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
Monitor::new()
.register({
WorkerBuilder::new("tasty-banana")
.data(Count::default())
.data(sqlite.clone())
.concurrency(2)
.backend(sqlite)
.build_fn(simple_job)
})
Expand Down
22 changes: 13 additions & 9 deletions examples/graceful-shutdown/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use std::time::Duration;

use apalis::{prelude::*, utils::TokioExecutor};
use apalis::prelude::*;
use apalis_sql::sqlite::{SqlitePool, SqliteStorage};
use serde::{Deserialize, Serialize};
use tracing::info;

#[derive(Debug, Serialize, Deserialize)]
struct LongRunningJob {}

async fn long_running_task(_task: LongRunningJob, worker_ctx: Context<TokioExecutor>) {
async fn long_running_task(_task: LongRunningJob, worker: Worker<Context>) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Do some hard thing
info!("is_shutting_down: {}", worker_ctx.is_shutting_down(),);
if worker_ctx.is_shutting_down() {
info!("is_shutting_down: {}", worker.is_shutting_down());
if worker.is_shutting_down() {
info!("saving the job state");
break;
}
tokio::time::sleep(Duration::from_secs(3)).await; // Do some hard thing
}
info!("Shutdown complete!");
}

async fn produce_jobs(storage: &mut SqliteStorage<LongRunningJob>) {
Expand All @@ -33,14 +34,17 @@ async fn main() -> Result<(), std::io::Error> {
.expect("unable to run migrations for sqlite");
let mut sqlite: SqliteStorage<LongRunningJob> = SqliteStorage::new(pool);
produce_jobs(&mut sqlite).await;
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
Monitor::new()
.register({
WorkerBuilder::new("tasty-banana")
.concurrency(2)
.enable_tracing()
.backend(sqlite)
.build_fn(long_running_task)
})
// Wait 10 seconds after shutdown is triggered to allow any incomplete jobs to complete
.shutdown_timeout(Duration::from_secs(10))
.on_event(|e| info!("{e}"))
// Wait 5 seconds after shutdown is triggered to allow any incomplete jobs to complete
.shutdown_timeout(Duration::from_secs(5))
// Use .run() if you don't want without signals
.run_with_signal(tokio::signal::ctrl_c()) // This will wait for ctrl+c then gracefully shutdown
.await?;
Expand Down
8 changes: 4 additions & 4 deletions examples/mysql/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use apalis::layers::tracing::TraceLayer;

use apalis::prelude::*;
use apalis_sql::mysql::MySqlPool;
use apalis_sql::mysql::MysqlStorage;
Expand Down Expand Up @@ -33,10 +33,10 @@ async fn main() -> Result<()> {
let mysql: MysqlStorage<Email> = MysqlStorage::new(pool);
produce_jobs(&mysql).await?;

Monitor::new_with_executor(TokioExecutor)
.register_with_count(1, {
Monitor::new()
.register({
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.enable_tracing()
.backend(mysql)
.build_fn(send_email)
})
Expand Down
Loading
Loading