Skip to content

Commit

Permalink
Fix test to run all failovers. (#937)
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit authored Aug 28, 2023
1 parent 163f5c2 commit 220c6a9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 87 deletions.
37 changes: 8 additions & 29 deletions redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use ::tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
#[cfg(feature = "tokio-comp")]
use ::tokio::net::lookup_host;
use combine::{parser::combinator::AnySendSyncPartialState, stream::PointerOffset};
use futures_util::future::select_ok;
use futures_util::{
future::FutureExt,
stream::{self, Stream, StreamExt},
stream::{Stream, StreamExt},
};
use std::net::SocketAddr;
use std::pin::Pin;
Expand Down Expand Up @@ -386,20 +387,7 @@ pub(crate) async fn connect_simple<T: RedisRuntime>(
Ok(match connection_info.addr {
ConnectionAddr::Tcp(ref host, port) => {
let socket_addrs = get_socket_addrs(host, port).await?;
stream::iter(socket_addrs)
.fold(
Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"No address found for host",
))),
|acc, socket_addr| async move {
match acc {
ok @ Ok(_) => ok,
Err(_) => <T>::connect_tcp(socket_addr).await,
}
},
)
.await?
select_ok(socket_addrs.map(<T>::connect_tcp)).await?.0
}

#[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
Expand All @@ -409,20 +397,11 @@ pub(crate) async fn connect_simple<T: RedisRuntime>(
insecure,
} => {
let socket_addrs = get_socket_addrs(host, port).await?;
stream::iter(socket_addrs)
.fold(
Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"No address found for host",
))),
|acc, socket_addr| async move {
match acc {
ok @ Ok(_) => ok,
Err(_) => <T>::connect_tcp_tls(host, socket_addr, insecure).await,
}
},
)
.await?
select_ok(
socket_addrs.map(|socket_addr| <T>::connect_tcp_tls(host, socket_addr, insecure)),
)
.await?
.0
}

#[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
Expand Down
90 changes: 44 additions & 46 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,53 @@ pub(crate) fn aggregate(values: Vec<Value>, op: AggregateOp) -> RedisResult<Valu
AggregateOp::Min => i64::MAX,
AggregateOp::Sum => 0,
};
let result = values
.into_iter()
.fold(RedisResult::Ok(initial_value), |acc, curr| {
let mut acc = acc?;
let int = match curr {
Value::Int(int) => int,
_ => {
return Err((
let result = values.into_iter().try_fold(initial_value, |acc, curr| {
let int = match curr {
Value::Int(int) => int,
_ => {
return RedisResult::Err(
(
ErrorKind::TypeError,
"expected array of integers as response",
)
.into());
}
};
acc = match op {
AggregateOp::Min => min(acc, int),
AggregateOp::Sum => acc + int,
};
Ok(acc)
})?;
.into(),
);
}
};
let acc = match op {
AggregateOp::Min => min(acc, int),
AggregateOp::Sum => acc + int,
};
Ok(acc)
})?;
Ok(Value::Int(result))
}

pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> RedisResult<Value> {
let initial_value = match op {
LogicalAggregateOp::And => true,
};
let results = values
.into_iter()
.fold(RedisResult::Ok(Vec::new()), |acc, curr| {
let acc = acc?;
let values = match curr {
Value::Bulk(values) => values,
let results = values.into_iter().try_fold(Vec::new(), |acc, curr| {
let values = match curr {
Value::Bulk(values) => values,
_ => {
return RedisResult::Err(
(
ErrorKind::TypeError,
"expected array of integers as response",
)
.into(),
);
}
};
let mut acc = if acc.is_empty() {
vec![initial_value; values.len()]
} else {
acc
};
for (index, value) in values.into_iter().enumerate() {
let int = match value {
Value::Int(int) => int,
_ => {
return Err((
ErrorKind::TypeError,
Expand All @@ -100,28 +114,12 @@ pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> R
.into());
}
};
let mut acc = if acc.is_empty() {
vec![initial_value; values.len()]
} else {
acc
acc[index] = match op {
LogicalAggregateOp::And => acc[index] && (int > 0),
};
for (index, value) in values.into_iter().enumerate() {
let int = match value {
Value::Int(int) => int,
_ => {
return Err((
ErrorKind::TypeError,
"expected array of integers as response",
)
.into());
}
};
acc[index] = match op {
LogicalAggregateOp::And => acc[index] && (int > 0),
};
}
Ok(acc)
})?;
}
Ok(acc)
})?;
Ok(Value::Bulk(
results
.into_iter()
Expand Down Expand Up @@ -636,14 +634,14 @@ mod tests {
);
}

for cmd in vec![
for cmd in [
cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
] {
assert_eq!(RoutingInfo::for_routable(cmd), Some(RoutingInfo::Random));
}

for (cmd, expected) in vec![
for (cmd, expected) in [
(
cmd("EVAL")
.arg(r#"redis.call("GET, KEYS[1]");"#)
Expand Down
2 changes: 1 addition & 1 deletion redis/tests/test_acl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ fn test_acl_cat() {
assert!(res.contains(*cat), "Category `{cat}` does not exist");
}

let expects = vec!["pfmerge", "pfcount", "pfselftest", "pfadd"];
let expects = ["pfmerge", "pfcount", "pfselftest", "pfadd"];
let res: HashSet<String> = con
.acl_cat_categoryname("hyperloglog")
.expect("Got commands of a category");
Expand Down
22 changes: 11 additions & 11 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,17 @@ async fn test_failover(env: &TestClusterContext, requests: i32, value: i32) {
async move {
if i == requests / 2 {
// Failover all the nodes, error only if all the failover requests error
node_conns
.iter_mut()
.map(do_failover)
.collect::<stream::FuturesUnordered<_>>()
.fold(
Err(anyhow::anyhow!("None")),
|acc: Result<(), _>, result: Result<(), _>| async move {
acc.or(result)
},
)
.await
let mut results = future::join_all(
node_conns
.iter_mut()
.map(|conn| Box::pin(do_failover(conn))),
)
.await;
if results.iter().all(|res| res.is_err()) {
results.pop().unwrap()
} else {
Ok::<_, anyhow::Error>(())
}
} else {
let key = format!("test-{value}-{i}");
cmd("SET")
Expand Down

0 comments on commit 220c6a9

Please sign in to comment.