Skip to content

Commit

Permalink
Fix timeout in RedisSinkSingle
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 2, 2022
1 parent 7394ad1 commit c5a1f46
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions shotover-proxy/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,26 @@ impl Transform for RedisSinkSingle {

// self.outbound is gauranteed to be Some by the previous block
let outbound_framed_codec = self.outbound.as_mut().unwrap();
let requests_len = message_wrapper.messages.len();
outbound_framed_codec.send(message_wrapper.messages).await?;

match outbound_framed_codec.next().fuse().await {
Some(mut a) => {
if let Ok(messages) = &mut a {
for message in messages {
let mut result = vec![];
while result.len() < requests_len {
match outbound_framed_codec.next().fuse().await {
Some(Ok(mut messages)) => {
for message in &mut messages {
if let Some(Frame::Redis(RedisFrame::Error(_))) = message.frame() {
self.failed_requests.increment(1);
}
}
result.extend(messages.into_iter());
}
a
Some(Err(err)) => {
return Err(anyhow!("encountered error in redis stream: {err:?}"))
}
None => return Err(anyhow!("couldnt get frame")),
}
None => Err(anyhow!("couldnt get frame")),
}
Ok(result)
}
}

0 comments on commit c5a1f46

Please sign in to comment.