Skip to content

Commit

Permalink
fixes: on internal api
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Feb 8, 2024
1 parent 07da53f commit 6dbde0b
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async-std-comp = [
"apalis-sql?/async-std-comp",
"apalis-redis?/async-std-comp",
"apalis-cron?/async-std-comp",
"async-std",
]
## Compatibility with tokio and actix runtimes
tokio-comp = [
Expand Down
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ async fn produce_route_jobs(storage: &RedisStorage<Email>) -> Result<()> {
- _limit_ — 💪 Limit the amount of jobs
- _filter_ — Support filtering jobs based on a predicate
- _extensions_ — Add a global extensions to jobs
- _time_ - Use the time crate instead of the default chrono

## Storage Comparison

Expand All @@ -149,21 +148,22 @@ Since we provide a few storage solutions, here is a table comparing them:

## How apalis works

Here is a basic example of how the core parts integrate

```mermaid
sequenceDiagram
participant App
participant Worker
participant Producer
participant Backend
App->>+Producer: Add job to queue
Producer-->>+Worker: Job data
Worker->>+Producer: Update job status to 'running' via Layer
Producer-->>-Worker: Confirmation
Worker->>+App: Notify job started via Layer
App->>+Backend: Add job to queue
Backend-->>+Worker: Job data
Worker->>+Backend: Update job status to 'Running'
Worker->>+App: Started job
loop job execution
Worker-->>-App: Report job progress via Layer
Worker-->>-App: Report job progress
end
Worker->>+Producer: Update job status to 'completed' via Layer
Worker->>+Backend: Update job status to 'completed'
```

### Web UI
Expand All @@ -186,9 +186,9 @@ v 0.5
- [x] Mocking utilities
- [ ] Support for SurrealDB and Mongo
- [ ] Lock free for Postgres
- [ ] Add more utility layers
- [x] Add more utility layers
- [x] Use extractors in job fn structure
- [ ] Polish up documentation
- [x] Polish up documentation
- [ ] Improve and standardize apalis Board
- [ ] Benchmarks

Expand Down
2 changes: 1 addition & 1 deletion examples/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "1"
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["redis"]}
apalis = { path = "../../", features = ["redis", "timeout"]}
serde = "1"
env_logger = "0.10"
tracing-subscriber = "0.3.11"
Expand Down
22 changes: 19 additions & 3 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use std::{
use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use apalis::utils::TokioExecutor;

use email_service::{send_email, Email};
use tracing::info;
use tracing::{error, info};

async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
for index in 0..100 {
Expand Down Expand Up @@ -52,7 +52,23 @@ async fn main() -> Result<()> {
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
.register_with_count(6, worker)
.register_with_count(2, worker)
.on_event(|e| {
let worker_id = e.id();
match e.inner() {
Event::Start => {
info!("Worker [{worker_id}] started");
}
Event::Error(e) => {
error!("Worker [{worker_id}] encountered an error: {e}");
}

Event::Exit => {
info!("Worker [{worker_id}] exited");
}
_ => {}
}
})
.run_with_signal(async {
tokio::signal::ctrl_c().await?;
info!("Monitor starting shutdown");
Expand Down
12 changes: 2 additions & 10 deletions packages/apalis-core/src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,9 @@ const UNPLUGGED: usize = 0;
/// Tells the poller that the worker is ready for a new request
#[derive(Debug)]
pub struct FetchNext<T> {
instance: usize,
sender: async_oneshot::Sender<T>,
}

impl<T> FetchNext<T> {
/// Get the specific worker instance
pub fn instance(&self) -> usize {
self.instance
}
}

impl<T> Deref for FetchNext<T> {
type Target = async_oneshot::Sender<T>;
fn deref(&self) -> &Self::Target {
Expand All @@ -69,7 +61,7 @@ impl<T> DerefMut for FetchNext<T> {
}
impl<T> FetchNext<T> {
/// Generate a new instance of ready
pub fn new(sender: async_oneshot::Sender<T>, instance: usize) -> Self {
Self { instance, sender }
pub fn new(sender: async_oneshot::Sender<T>) -> Self {
Self { sender }
}
}
2 changes: 1 addition & 1 deletion packages/apalis-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait Storage: Backend<Request<Self::Job>> {
/// The error produced by the storage
type Error;

/// Jobs unlike messages must have Ids.
/// Jobs must have Ids.
type Identifier;

/// Pushes a job to a storage
Expand Down
75 changes: 64 additions & 11 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use self::stream::WorkerStream;
use crate::error::{BoxDynError, Error};
use crate::executor::Executor;
use crate::layers::extensions::Data;
Expand All @@ -14,14 +15,13 @@ use std::fmt::Debug;
use std::fmt::{self, Display};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context as TaskCtx, Poll, Waker};
use thiserror::Error;
use tower::{Service, ServiceBuilder, ServiceExt};

use self::stream::WorkerStream;

mod stream;
// By default a worker starts 3 futures, one for polling, one for worker stream and the other for consuming.
const WORKER_FUTURES: usize = 3;
Expand All @@ -32,27 +32,69 @@ type WorkerNotify<T> = Notify<Worker<FetchNext<T>>>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WorkerId {
name: String,
instance: Option<usize>,
}

impl FromStr for WorkerId {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<&str> = s.rsplitn(2, '-').collect();

match parts.len() {
1 => Ok(WorkerId {
name: parts[0].to_string(),
instance: None,
}),
2 => {
let name = parts[1];
let instance_str = parts[0];
let instance = instance_str.parse().ok();
Ok(WorkerId {
name: name.to_string(),
instance,
})
}
_ => Err(()),
}
}
}

impl FromData for WorkerId {}

impl Display for WorkerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.name())
f.write_str(self.name())?;
if self.instance.is_some() {
f.write_str("-")?;
f.write_str(&self.instance.unwrap().to_string())?;
}
Ok(())
}
}

impl WorkerId {
/// Build a new worker ref
pub fn new<T: AsRef<str>>(name: T) -> Self {
Self::from_str(name.as_ref()).unwrap()
}

/// Build a new worker ref
pub fn new_with_instance<T: AsRef<str>>(name: T, instance: usize) -> Self {
Self {
name: name.as_ref().to_string(),
instance: Some(instance),
}
}
/// Get the name of the worker
pub fn name(&self) -> &str {
&self.name
}

/// Get the name of the worker
pub fn instance(&self) -> &Option<usize> {
&self.instance
}
}

/// Events emitted by a worker
Expand Down Expand Up @@ -114,6 +156,16 @@ impl<T> Worker<T> {
pub fn new(id: WorkerId, state: T) -> Self {
Self { id, state }
}

/// Get the inner state
pub fn inner(&self) -> &T {
&self.state
}

/// Get the worker id
pub fn id(&self) -> &WorkerId {
&self.id
}
}

impl<T> Deref for Worker<T> {
Expand All @@ -132,13 +184,14 @@ impl<T> DerefMut for Worker<T> {
impl<E: Executor + Clone + Send + 'static> Worker<Context<E>> {
/// Start a worker
pub async fn run(self) {
let instance = self.instance;
let monitor = self.state.context.clone();
self.state.running.store(true, Ordering::Relaxed);
self.state.await;
if let Some(ctx) = monitor.as_ref() {
ctx.notify(Worker {
state: Event::Exit,
id: self.id.clone(),
id: WorkerId::new_with_instance(self.id.name, instance),
});
};
}
Expand Down Expand Up @@ -307,7 +360,7 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Start,
id: worker.id.clone(),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
});
};
let worker_layers = ServiceBuilder::new()
Expand All @@ -321,7 +374,7 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Stop,
id: worker.id.clone(),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
});
};
break;
Expand All @@ -331,8 +384,8 @@ impl<S, P> Worker<Ready<S, P>> {
let (sender, receiver) = async_oneshot::oneshot();
notifier
.notify(Worker {
id: worker.id.clone(),
state: FetchNext::new(sender, instance),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
state: FetchNext::new(sender),
})
.unwrap();

Expand All @@ -345,15 +398,15 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Error(Box::new(e)),
id: worker.id.clone(),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
});
};
}
Ok(Ok(None)) => {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Idle,
id: worker.id.clone(),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
});
};
}
Expand All @@ -366,7 +419,7 @@ impl<S, P> Worker<Ready<S, P>> {
if let Some(ctx) = worker.state.context.as_ref() {
ctx.notify(Worker {
state: Event::Error(e.into()),
id: worker.id.clone(),
id: WorkerId::new_with_instance(&worker.id.name(), instance),
});
};
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
//! #[tokio::main]
//! async fn main() {
//! let redis = std::env::var("REDIS_URL").expect("Missing REDIS_URL env variable");
//! let conn = apalis::redis::connect(redis).await?;
//! let conn = apalis::redis::connect(redis).await.unwrap();
//! let storage = RedisStorage::new(conn);
//! Monitor::<TokioExecutor>::new()
//! .register_with_count(2, {
//! WorkerBuilder::new(&format!("quick-sand"))
//! .layer(Data(0usize))
//! .data(0usize)
//! .source(storage.clone())
//! .build_fn(send_email)
//! })
Expand Down

0 comments on commit 6dbde0b

Please sign in to comment.