diff --git a/src/functions.rs b/src/functions.rs index 906904e..faa7047 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -51,7 +51,7 @@ impl RsmqFunctions { number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?; CHANGE_MESSAGE_VISIVILITY - .key(format!("{}{}", self.ns, qname)) + .key(format!("{}:{}", self.ns, qname)) .key(message_id) .key(queue.ts + hidden) .invoke_async::<_, bool>(conn) @@ -77,7 +77,7 @@ impl RsmqFunctions { ) -> RsmqResult<()> { valid_name_format(qname)?; - let key = format!("{}{}:Q", self.ns, qname); + let key = format!("{}:{}:Q", self.ns, qname); let hidden = get_redis_duration(hidden, &Duration::from_secs(30)); let delay = get_redis_duration(delay, &Duration::ZERO); let maxsize = maxsize.unwrap_or(65536); @@ -131,7 +131,7 @@ impl RsmqFunctions { } redis::cmd("SADD") - .arg(format!("{}QUEUES", self.ns)) + .arg(format!("{}:QUEUES", self.ns)) .arg(qname) .query_async(conn) .await?; @@ -143,7 +143,7 @@ impl RsmqFunctions { /// /// Important to use when you are using receive_message. pub async fn delete_message(&self, conn: &mut T, qname: &str, id: &str) -> RsmqResult { - let key = format!("{}{}", self.ns, qname); + let key = format!("{}:{}", self.ns, qname); let results: (u16, u16) = pipe() .atomic() @@ -167,7 +167,7 @@ impl RsmqFunctions { /// Deletes the queue and all the messages on it pub async fn delete_queue(&self, conn: &mut T, qname: &str) -> RsmqResult<()> { - let key = format!("{}{}", self.ns, qname); + let key = format!("{}:{}", self.ns, qname); let results: (u16, u16) = pipe() .atomic() @@ -175,7 +175,7 @@ impl RsmqFunctions { .arg(format!("{}:Q", &key)) .arg(key) .cmd("SREM") - .arg(format!("{}QUEUES", self.ns)) + .arg(format!("{}:QUEUES", self.ns)) .arg(qname) .query_async(conn) .await?; @@ -193,7 +193,7 @@ impl RsmqFunctions { conn: &mut T, qname: &str, ) -> RsmqResult { - let key = format!("{}{}", self.ns, qname); + let key = format!("{}:{}", self.ns, qname); let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?; @@ -249,7 +249,7 @@ impl RsmqFunctions { /// Returns a list of queues in the namespace pub async fn list_queues(&self, conn: &mut T) -> RsmqResult> { let queues = redis::cmd("SMEMBERS") - .arg(format!("{}QUEUES", self.ns)) + .arg(format!("{}:QUEUES", self.ns)) .query_async(conn) .await?; @@ -265,7 +265,7 @@ impl RsmqFunctions { let queue = self.get_queue(conn, qname, false).await?; let result: (bool, String, Vec, u64, u64) = POP_MESSAGE - .key(format!("{}{}", self.ns, qname)) + .key(format!("{}:{}", self.ns, qname)) .key(queue.ts) .invoke_async(conn) .await?; @@ -300,7 +300,7 @@ impl RsmqFunctions { number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?; let result: (bool, String, Vec, u64, u64) = RECEIVE_MESSAGE - .key(format!("{}{}", self.ns, qname)) + .key(format!("{}:{}", self.ns, qname)) .key(queue.ts) .key(queue.ts + hidden) .invoke_async(conn) @@ -332,7 +332,7 @@ impl RsmqFunctions { let queue = self.get_queue(conn, qname, true).await?; let delay = get_redis_duration(delay, &queue.delay); - let key = format!("{}{}", self.ns, qname); + let key = format!("{}:{}", self.ns, qname); number_in_range(delay, 0, JS_COMPAT_MAX_TIME_MILLIS)?; @@ -406,7 +406,7 @@ impl RsmqFunctions { ) -> RsmqResult { self.get_queue(conn, qname, false).await?; - let queue_name = format!("{}{}:Q", self.ns, qname); + let queue_name = format!("{}:{}:Q", self.ns, qname); let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?; @@ -462,7 +462,7 @@ impl RsmqFunctions { let result: (Vec>, (u64, u64)) = pipe() .atomic() .cmd("HMGET") - .arg(format!("{}{}:Q", self.ns, qname)) + .arg(format!("{}:{}:Q", self.ns, qname)) .arg("vt") .arg("delay") .arg("maxsize")