Skip to content

Commit

Permalink
fixup! Add xadd_options and xtrim_options
Browse files Browse the repository at this point in the history
  • Loading branch information
urkle authored and nihohit committed Oct 7, 2024
1 parent f644268 commit 60daaa7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
39 changes: 14 additions & 25 deletions redis/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,47 +1423,36 @@ implement_commands! {

/// Add a stream message with options.
///
/// ```text
/// XADD key [NOMKSTREAM] [<MAXLEN|MINID> [~|=] threshold [LIMIT count]] <* | ID> field value [field value] ...
/// Items can be any list type, e.g.
/// ```rust
/// // static items
/// let items = &[("key", "val"), ("key2", "val2")];
/// # use std::collections::BTreeMap;
/// // A map (Can be BTreeMap, HashMap, etc)
/// let mut map: BTreeMap<&str, &str> = BTreeMap::new();
/// map.insert("ab", "cd");
/// map.insert("ef", "gh");
/// map.insert("ij", "kl");
/// ```
#[cfg(feature = "streams")]
#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
fn xadd_options<
K: ToRedisArgs, ID: ToRedisArgs, F: ToRedisArgs, V: ToRedisArgs
>(
key: K,
id: ID,
items: &'a [(F, V)],
options: &'a streams::StreamAddOptions
) {
cmd("XADD")
.arg(key)
.arg(options)
.arg(id)
.arg(items)
}


/// Add a stream message with options.
///
/// ```text
/// XADD key [NOMKSTREAM] [<MAXLEN|MINID> [~|=] threshold [LIMIT count]] <* | ID> field value [field value] ...
/// ```
#[cfg(feature = "streams")]
#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
fn xadd_map_options<
K: ToRedisArgs, ID: ToRedisArgs, BTM: ToRedisArgs
fn xadd_options<
K: ToRedisArgs, ID: ToRedisArgs, I: ToRedisArgs
>(
key: K,
id: ID,
map: BTM,
items: I,
options: &'a streams::StreamAddOptions
) {
cmd("XADD")
.arg(key)
.arg(options)
.arg(id)
.arg(map)
.arg(items)
}


Expand Down
31 changes: 27 additions & 4 deletions redis/tests/test_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ fn test_xgroup_create() {

// no key exists... this call breaks the connection pipe for some reason
let reply: RedisResult<StreamInfoStreamReply> = con.xinfo_stream("k10");
assert!(reply.is_err());
assert!(
matches!(&reply, Err(e) if e.kind() == redis::ErrorKind::ResponseError
&& e.code() == Some("ERR")
&& e.detail() == Some("no such key"))
);

// redo the connection because the above error
con = ctx.connection();
Expand Down Expand Up @@ -426,14 +430,21 @@ fn test_xadd_options() {
let ctx = TestContext::new();
let mut con = ctx.connection();

// NoMKStream will cause this command to fail
let result: RedisResult<String> = con.xadd_options(
// NoMKStream will return a nil when the stream does not exist
let result: RedisResult<redis::Value> = con.xadd_options(
"k1",
"*",
&[("h", "w")],
&StreamAddOptions::default().nomkstream(),
);
assert!(result.is_err());
assert_eq!(result, Ok(redis::Value::Nil));

let result: RedisResult<StreamInfoStreamReply> = con.xinfo_stream("k1");
assert!(
matches!(&result, Err(e) if e.kind() == redis::ErrorKind::ResponseError
&& e.code() == Some("ERR")
&& e.detail() == Some("no such key"))
);

fn setup_stream(con: &mut Connection) {
let _: RedisResult<()> = con.del("k1");
Expand Down Expand Up @@ -475,6 +486,18 @@ fn test_xadd_options() {
let info: StreamInfoStreamReply = con.xinfo_stream("k1").unwrap();
assert_eq!(info.length, 6);
assert_eq!(info.first_entry.id, "1-5");

// test adding from a map
let mut map: BTreeMap<&str, &str> = BTreeMap::new();
map.insert("ab", "cd");
map.insert("ef", "gh");
map.insert("ij", "kl");
let _: RedisResult<String> = con.xadd_options("k1", "3-1", map, &StreamAddOptions::default());

let info: StreamInfoStreamReply = con.xinfo_stream("k1").unwrap();
assert_eq!(info.length, 7);
assert_eq!(info.first_entry.id, "1-5");
assert_eq!(info.last_entry.id, "3-1");
}

#[test]
Expand Down

0 comments on commit 60daaa7

Please sign in to comment.