Skip to content

Commit

Permalink
Merge 1a1f590 into 7b953fd
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi authored Jul 9, 2024
2 parents 7b953fd + 1a1f590 commit a3d33b2
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ members = [
"examples/tracing",
# "examples/rest-api",
"examples/async-std-runtime",
"examples/basics", "examples/redis-with-msg-pack", "examples/redis-deadpool",
"examples/basics", "examples/redis-with-msg-pack", "examples/redis-deadpool", "examples/redis-mq-example",
]


Expand Down
24 changes: 24 additions & 0 deletions examples/redis-mq-example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "redis-mq-example"
version = "0.1.0"
edition = "2021"

[dependencies]
apalis = { path = "../../", features = ["redis"]}
apalis-core = { path = "../../packages/apalis-core", features = ["json"] }
rsmq_async = "11.1.0"
anyhow = "1"
tokio = { version = "1", features = ["full"] }
serde = "1"
env_logger = "0.10"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
email-service = { path = "../email-service" }
rmp-serde = "1.3"
tower = "0.4"
futures = "0.3"


[dependencies.tracing]
default-features = false
version = "0.1"
179 changes: 179 additions & 0 deletions examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use std::{marker::PhantomData, sync::Arc, time::Duration};

use anyhow::Result;
use apalis::{
layers::tracing::TraceLayer,
prelude::*,
redis::{self, Config, RedisCodec, RedisJob},
};

use apalis_core::{
codec::json::JsonCodec,
layers::{Ack, AckLayer},
};
use email_service::{send_email, Email};
use futures::{channel::mpsc, SinkExt};
use rsmq_async::{Rsmq, RsmqConnection, RsmqError};
use tokio::time::sleep;
use tower::layer::util::Identity;
use tracing::{error, info};

struct RedisMq<T> {
conn: Rsmq,
msg_type: PhantomData<T>,
config: redis::Config,
codec: RedisCodec<T>,
}

// Manually implement Clone for RedisMq
impl<T> Clone for RedisMq<T> {
fn clone(&self) -> Self {
RedisMq {
conn: self.conn.clone(),
msg_type: PhantomData,
config: self.config.clone(),
codec: self.codec.clone(),
}
}
}

impl<M: Send + 'static> Backend<Request<M>> for RedisMq<M> {
type Stream = RequestStream<Request<M>>;

type Layer = AckLayer<Self, M>;

fn poll(mut self, worker_id: WorkerId) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.get_buffer_size());
let stream: RequestStream<Request<M>> = Box::pin(rx);
let layer = AckLayer::new(self.clone(), worker_id);
let heartbeat = async move {
loop {
sleep(*self.config.get_poll_interval()).await;
let msg: Option<Request<M>> = self
.conn
.receive_message("email", None)
.await
.unwrap()
.map(|r| {
let mut req: Request<_> = self.codec.decode(&r.message).unwrap().into();
req.insert(r.id);
req
});
tx.send(Ok(msg)).await.unwrap();
}
};
Poller::new_with_layer(stream, heartbeat, layer)
}
}

impl<T: Send> Ack<T> for RedisMq<T> {
type Acknowledger = String;

type Error = RsmqError;

async fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
) -> Result<(), Self::Error> {
println!("Attempting to ACK {}", data);
self.conn.delete_message("email", data).await?;
Ok(())
}
}

impl<Message: Send + 'static> MessageQueue<Message> for RedisMq<Message> {
type Error = RsmqError;

async fn enqueue(&mut self, message: Message) -> Result<(), Self::Error> {
let bytes = self
.codec
.encode(&RedisJob {
ctx: Default::default(),
job: message,
})
.unwrap();
self.conn.send_message("email", bytes, None).await?;
Ok(())
}

async fn dequeue(&mut self) -> Result<Option<Message>, Self::Error> {
let codec = self.codec.clone();
Ok(self.conn.receive_message("email", None).await?.map(|r| {
let req: Request<Message> = codec.decode(&r.message).unwrap().into();
req.take()
}))
}

async fn size(&mut self) -> Result<usize, Self::Error> {
self.conn
.get_queue_attributes("email")
.await?
.msgs
.try_into()
.map_err(|_| RsmqError::InvalidFormat("Could not convert to usize".to_owned()))
}
}

async fn produce_jobs(mq: &mut RedisMq<Email>) -> Result<()> {
for index in 0..1 {
mq.enqueue(Email {
to: index.to_string(),
text: "Test background job from apalis".to_string(),
subject: "Background email job".to_string(),
})
.await?;
}
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");

tracing_subscriber::fmt::init();

let mut conn = rsmq_async::Rsmq::new(Default::default()).await?;
let _ = conn.create_queue("email", None, None, None).await;
let mut mq = RedisMq {
conn,
msg_type: PhantomData,
codec: RedisCodec::new(Box::new(JsonCodec)),
config: Config::default(),
};
// This can be in another part of the program
// produce_jobs(&mut mq).await?;

let worker = WorkerBuilder::new("rango-tango")
.layer(TraceLayer::new())
.with_mq(mq)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
.register_with_count(2, worker)
.on_event(|e| {
let worker_id = e.id();
match e.inner() {
Event::Start => {
info!("Worker [{worker_id}] started");
}
Event::Error(e) => {
error!("Worker [{worker_id}] encountered an error: {e}");
}

Event::Exit => {
info!("Worker [{worker_id}] exited");
}
_ => {}
}
})
.shutdown_timeout(Duration::from_millis(5000))
.run_with_signal(async {
tokio::signal::ctrl_c().await?;
info!("Monitor starting shutdown");
Ok(())
})
.await?;
info!("Monitor shutdown complete");
Ok(())
}
13 changes: 6 additions & 7 deletions packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,16 @@ impl<T: Send + 'static + Sync> Backend<Request<T>> for MemoryStorage<T> {

impl<Message: Send + 'static + Sync> MessageQueue<Message> for MemoryStorage<Message> {
type Error = ();
async fn enqueue(&self, message: Message) -> Result<(), Self::Error> {
self.inner.sender.clone().try_send(message).unwrap();
async fn enqueue(&mut self, message: Message) -> Result<(), Self::Error> {
self.inner.sender.try_send(message).unwrap();
Ok(())
}

async fn dequeue(&self) -> Result<Option<Message>, ()> {
Err(())
// self.inner.receiver.lock().await.next().await
async fn dequeue(&mut self) -> Result<Option<Message>, ()> {
Ok(self.inner.receiver.lock().await.next().await)
}

async fn size(&self) -> Result<usize, ()> {
Ok(self.inner.clone().count().await)
async fn size(&mut self) -> Result<usize, ()> {
Ok(self.inner.receiver.lock().await.size_hint().0)
}
}
6 changes: 3 additions & 3 deletions packages/apalis-core/src/mq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ pub trait MessageQueue<Message>: Backend<Request<Message>> {
type Error;

/// Enqueues a message to the queue.
fn enqueue(&self, message: Message) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn enqueue(&mut self, message: Message) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Attempts to dequeue a message from the queue.
/// Returns `None` if the queue is empty.
fn dequeue(&self) -> impl Future<Output = Result<Option<Message>, Self::Error>> + Send;
fn dequeue(&mut self) -> impl Future<Output = Result<Option<Message>, Self::Error>> + Send;

/// Returns the current size of the queue.
fn size(&self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
fn size(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
}
8 changes: 4 additions & 4 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ struct RedisScript {
/// The actual structure of a Redis job
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RedisJob<J> {
ctx: Context,
job: J,
pub ctx: Context,
pub job: J,
}

impl<T> From<RedisJob<T>> for Request<T> {
Expand Down Expand Up @@ -126,8 +126,8 @@ impl<T> TryFrom<Request<T>> for RedisJob<T> {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct Context {
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct Context {
id: TaskId,
attempts: usize,
}
Expand Down
2 changes: 1 addition & 1 deletion src/layers/tracing/make_span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<B> MakeSpan<B> for DefaultMakeSpan {
tracing::span!(
parent: span,
$level,
"job",
"task",
)
};
}
Expand Down

0 comments on commit a3d33b2

Please sign in to comment.