Skip to content

Commit

Permalink
fix: apply FromRequest for items in Parts (#425)
Browse files Browse the repository at this point in the history
Problem: We are missing crucial `FromRequest` impls for:
- TaskId
- Attempt
- Namespace

Also removed `Context<E>`

Solution: Implement `FromRequest` for these Types.
  • Loading branch information
geofmureithi authored Oct 3, 2024
1 parent d62281f commit 3166d7c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
16 changes: 9 additions & 7 deletions examples/fn-args/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ struct SimpleJob {}

// A task can have up to 16 arguments
async fn simple_job(
_: SimpleJob, // Required, must be of the type of the job/message
worker_id: Data<WorkerId>, // The worker running the job, added by worker
_worker_ctx: Context<TokioExecutor>, // The worker context, added by worker
_: SimpleJob, // Required, must be of the type of the job/message
worker_id: Data<WorkerId>, // The worker running the job, added by worker
_worker_ctx: Data<Context<TokioExecutor>>, // The worker context, added by worker
_sqlite: Data<SqliteStorage<SimpleJob>>, // The source, added by storage
task_id: Data<TaskId>, // The task id, added by storage
ctx: SqlContext, // The task context
count: Data<Count>, // Our custom data added via layer
task_id: TaskId, // The task id, added by storage
attempt: Attempt, // The current attempt
ctx: SqlContext, // The task context provided by the backend
count: Data<Count>, // Our custom data added via layer
) {
// increment the counter
let current = count.fetch_add(1, Ordering::Relaxed);
info!("worker: {worker_id:?}; task_id: {task_id:?}, ctx: {ctx:?}, count: {current:?}");
info!("worker: {worker_id:?}; task_id: {task_id:?}, ctx: {ctx:?}, attempt:{attempt:?} count: {current:?}");
}

async fn produce_jobs(storage: &mut SqliteStorage<SimpleJob>) {
Expand Down Expand Up @@ -62,6 +63,7 @@ async fn main() -> Result<(), std::io::Error> {
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.data(Count::default())
.data(sqlite.clone())
.backend(sqlite)
.build_fn(simple_job)
})
Expand Down
11 changes: 11 additions & 0 deletions packages/apalis-core/src/task/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::ops::Deref;

use serde::{Deserialize, Serialize};

use crate::error::Error;
use crate::request::Request;
use crate::service_fn::FromRequest;

/// A wrapper type that defines a task's namespace.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Namespace(pub String);
Expand Down Expand Up @@ -39,3 +43,10 @@ impl AsRef<str> for Namespace {
&self.0
}
}

impl<Req, Ctx> FromRequest<Request<Req, Ctx>> for Namespace {
fn from_request(req: &Request<Req, Ctx>) -> Result<Self, Error> {
let msg = "Missing `Namespace`. This is a bug, please file a report with the backend you are using".to_owned();
req.parts.namespace.clone().ok_or(Error::MissingData(msg))
}
}
8 changes: 8 additions & 0 deletions packages/apalis-core/src/task/task_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use ulid::Ulid;

use crate::{error::Error, request::Request, service_fn::FromRequest};

/// A wrapper type that defines a task id.
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct TaskId(Ulid);
Expand Down Expand Up @@ -58,6 +60,12 @@ impl<'de> Deserialize<'de> for TaskId {
}
}

impl<Req, Ctx> FromRequest<Request<Req, Ctx>> for TaskId {
fn from_request(req: &Request<Req, Ctx>) -> Result<Self, Error> {
Ok(req.parts.task_id.clone())
}
}

struct TaskIdVisitor;

impl<'de> Visitor<'de> for TaskIdVisitor {
Expand Down
7 changes: 0 additions & 7 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::monitor::{Monitor, MonitorContext};
use crate::notify::Notify;
use crate::poller::FetchNext;
use crate::request::Request;
use crate::service_fn::FromRequest;
use crate::Backend;
use futures::future::Shared;
use futures::{Future, FutureExt};
Expand Down Expand Up @@ -534,12 +533,6 @@ impl<E> fmt::Debug for Context<E> {
}
}

impl<Req, Ctx, E: Send + Sync + Clone + 'static> FromRequest<Request<Req, Ctx>> for Context<E> {
fn from_request(req: &Request<Req, Ctx>) -> Result<Self, Error> {
req.get_checked::<Self>().cloned()
}
}

pin_project! {
struct Tracked<F, E> {
worker: Context<E>,
Expand Down

0 comments on commit 3166d7c

Please sign in to comment.