Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jul 15, 2022
1 parent 21b5cbd commit 64bf3bd
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions shotover-proxy/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::frame::RedisFrame;
use crate::message::{Message, Messages};
use crate::tls::{AsyncStream, TlsConfig, TlsConnector};
use crate::transforms::{Transform, Transforms, Wrapper};
use anyhow::Result;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
Expand Down Expand Up @@ -142,19 +142,26 @@ impl Transform for RedisSinkSingle {
} else {
MessageType::Other
};
connection.sent_message_type_tx.send(ty).unwrap();
connection
.sent_message_type_tx
.send(ty)
.map_err(|_| anyhow!("Failed to send message type because RedisSinkSingle response processing task is dead"))?;
}

let messages_len = message_wrapper.messages.len();
connection
.outbound_tx
.send(message_wrapper.messages)
.await
.ok();
.context("Failed to send messages to redis destination")?;

let mut result = Vec::with_capacity(messages_len);
while result.len() < messages_len {
let mut message = connection.response_messages_rx.recv().await.unwrap();
let mut message = connection
.response_messages_rx
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message because RedisSinkSingle response processing task is dead"))?;
if let Some(Frame::Redis(RedisFrame::Error(_))) = message.frame() {
self.failed_requests.increment(1);
}
Expand Down

0 comments on commit 64bf3bd

Please sign in to comment.