Skip to content

Commit

Permalink
fix delay issue & version bump
Browse files Browse the repository at this point in the history
Before the code assumed delay was in millis. Now it assumes it is in
seconds. It was assumes as millis before because of the original port
from js, but seen now in their docs it is seconds and not millis.
  • Loading branch information
DavidBM committed Nov 7, 2023
1 parent 9399130 commit 0fb83e9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsmq_async"
version = "6.0.0"
version = "7.0.0"
authors = [
"David Bonet <webbonet@gmail.com>"
]
Expand Down
15 changes: 6 additions & 9 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
CHANGE_MESSAGE_VISIVILITY
.key(format!("{}{}", self.ns, qname))
.key(message_id)
.key(queue.ts + seconds_hidden * 1000)
.key(queue.ts + seconds_hidden)
.invoke_async::<_, bool>(conn)
.await?;

Expand Down Expand Up @@ -207,7 +207,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.arg(&key)
.cmd("ZCOUNT")
.arg(&key)
.arg(time.0 * 1000)
.arg(time.0)
.arg("+inf")
.query_async(conn)
.await?;
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
) -> RsmqResult<Option<RsmqMessage<E>>> {
let queue = self.get_queue(conn, qname, false).await?;

let seconds_hidden = seconds_hidden.unwrap_or(queue.vt) * 1000;
let seconds_hidden = seconds_hidden.unwrap_or(queue.vt);

number_in_range(seconds_hidden, 0, 9_999_999_000)?;

Expand Down Expand Up @@ -317,7 +317,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
) -> RsmqResult<String> {
let queue = self.get_queue(conn, qname, true).await?;

let delay = delay.unwrap_or(queue.delay) * 1000;
let delay = delay.unwrap_or(queue.delay);
let key = format!("{}{}", self.ns, qname);

number_in_range(delay, 0, 9_999_999)?;
Expand Down Expand Up @@ -455,18 +455,15 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.await?;

let time_seconds = (result.1).0;
let time_microseconds = (result.1).1;

let (hmget_first, hmget_second, hmget_third) =
match (result.0.get(0), result.0.get(1), result.0.get(2)) {
(Some(Some(v0)), Some(Some(v1)), Some(Some(v2))) => (v0, v1, v2),
_ => return Err(RsmqError::QueueNotFound),
};

let ts = time_seconds * 1000 + time_microseconds / 1000;

let quid = if uid {
Some(radix_36(ts).to_string() + &RsmqFunctions::<T>::make_id(22)?)
Some(radix_36(time_seconds).to_string() + &RsmqFunctions::<T>::make_id(22)?)
} else {
None
};
Expand All @@ -479,7 +476,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
maxsize: hmget_third
.parse()
.map_err(|_| RsmqError::CannotParseMaxsize)?,
ts,
ts: time_seconds,
uid: quid,
})
}
Expand Down
59 changes: 59 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,65 @@ fn send_receiving_deleting_message() {
})
}

#[test]
fn send_receiving_delayed_message() {
let rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, None);

rsmq.create_queue("queue1", None, None, None).await.unwrap();

rsmq.send_message("queue1", "testmessage", Some(2))
.await
.unwrap();

tokio::time::sleep(std::time::Duration::from_millis(500)).await;

let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());

let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());

let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());

tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_some());

let message = message.unwrap();

rsmq.delete_message("queue1", &message.id).await.unwrap();

assert_eq!(message.message, "testmessage".to_string());

let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();

assert!(message.is_none());
rsmq.delete_queue("queue1").await.unwrap();
})
}

#[test]
fn send_receiving_deleting_message_vec_u8() {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand Down

0 comments on commit 0fb83e9

Please sign in to comment.