Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'set_ack_deadline' to Delivery, and implement for SQS/GCP #91

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
redis_cluster = ["redis", "redis/cluster-async"]
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"]
beta = []
6 changes: 6 additions & 0 deletions omniqueue/src/backends/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ impl Acker for AqsAcker {
async fn nack(&mut self) -> Result<()> {
Ok(())
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by InMemoryBackend",
))
}
}

impl AqsConsumer {
Expand Down
13 changes: 13 additions & 0 deletions omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,17 @@ impl Acker for GcpPubSubAcker {
async fn nack(&mut self) -> Result<()> {
self.recv_msg.nack().await.map_err(QueueError::generic)
}

async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> {
let duration_secs = duration.as_secs().try_into().map_err(|e| {
QueueError::Generic(Box::<dyn std::error::Error + Send + Sync>::from(format!(
"set_ack_deadline duration {duration:?} is too large: {e:?}"
)))
})?;

self.recv_msg
.modify_ack_deadline(duration_secs)
.await
.map_err(QueueError::generic)
}
}
6 changes: 6 additions & 0 deletions omniqueue/src/backends/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ impl Acker for InMemoryAcker {
.map_err(QueueError::generic)
}
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by InMemoryBackend",
))
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,10 @@ impl Acker for RabbitMqAcker {
.map(|_| ())
.map_err(QueueError::generic)
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not supported by RabbitMQ",
))
}
}
6 changes: 6 additions & 0 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {

Ok(())
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by redis fallback backend",
))
}
}

pub(super) async fn add_to_main_queue(
Expand Down
6 changes: 6 additions & 0 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ impl<R: RedisConnection> Acker for RedisStreamsAcker<R> {

Ok(())
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by redis streams backend",
))
}
}

pub(super) async fn add_to_main_queue(
Expand Down
28 changes: 28 additions & 0 deletions omniqueue/src/backends/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,34 @@ impl Acker for SqsAcker {
async fn nack(&mut self) -> Result<()> {
Ok(())
}

async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> {
if let Some(receipt_handle) = &self.receipt_handle {
let duration_secs = duration.as_secs().try_into().map_err(|e| {
QueueError::Generic(Box::<dyn std::error::Error + Send + Sync>::from(format!(
"set_ack_deadline duration {duration:?} is too large: {e:?}"
)))
})?;
self.ack_client
.change_message_visibility()
.set_visibility_timeout(Some(duration_secs))
.queue_url(&self.queue_dsn)
.receipt_handle(receipt_handle)
.send()
.await
.map_err(aws_to_queue_error)?;

Ok(())
} else {
Err(QueueError::generic(
DeleteMessageError::ReceiptHandleIsInvalid(
ReceiptHandleIsInvalid::builder()
.message("receipt handle must be Some to set ack deadline")
.build(),
),
))
}
}
}

pub struct SqsProducer {
Expand Down
3 changes: 3 additions & 0 deletions omniqueue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ pub enum QueueError {

#[error("{0}")]
Generic(Box<dyn std::error::Error + Send + Sync>),

#[error("{0}")]
Unsupported(&'static str),
}

impl QueueError {
Expand Down
17 changes: 16 additions & 1 deletion omniqueue/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, future::Future};
use std::{fmt, future::Future, time::Duration};

use async_trait::async_trait;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -59,6 +59,19 @@ impl Delivery {
self.acker.ack().await.map_err(|e| (e, self))
}

#[cfg(feature = "beta")]
/// Sets the deadline for acknowledging this [`Delivery`] to `duration`,
/// starting from the time this method is called.
///
/// The exact nature of this will vary per backend, but usually ensures
/// that the same message will not be reprocessed if `ack()` is called
/// within an interval of `duration` from the time this method is
/// called. For example, this corresponds to the 'visibility timeout' in
/// SQS, and the 'ack deadline' in GCP
pub async fn set_ack_deadline(&mut self, duration: Duration) -> Result<(), QueueError> {
self.acker.set_ack_deadline(duration).await
}

/// Explicitly does not Acknowledge the successful processing of this
/// [`Delivery`].
///
Expand Down Expand Up @@ -102,4 +115,6 @@ impl fmt::Debug for Delivery {
pub(crate) trait Acker: Send + Sync {
async fn ack(&mut self) -> Result<()>;
async fn nack(&mut self) -> Result<()>;
#[cfg_attr(not(feature = "beta"), allow(dead_code))]
async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()>;
}
Loading