Skip to content

Commit

Permalink
Revert codec changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 18, 2024
1 parent bd2bfdf commit b82d02f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 58 deletions.
22 changes: 2 additions & 20 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{mpsc, Arc};
use std::sync::mpsc;
use std::time::Instant;

use super::{CodecWriteError, Direction};
Expand All @@ -10,26 +10,12 @@ use bytes::BytesMut;
use metrics::Histogram;
use redis_protocol::resp2::prelude::decode_mut;
use redis_protocol::resp2::prelude::encode_bytes;
use tokio::sync::Notify;
use tokio_util::codec::{Decoder, Encoder};

#[derive(Clone)]
pub struct RedisCodecBuilder {
direction: Direction,
message_latency: Histogram,
force_run_chain: Option<Arc<Notify>>,
}

impl RedisCodecBuilder {
pub fn new_sink(destination_name: String, force_run_chain: Arc<Notify>) -> Self {
let direction = Direction::Sink;
let message_latency = super::message_latency(direction, destination_name);
Self {
direction,
message_latency,
force_run_chain: Some(force_run_chain),
}
}
}

impl CodecBuilder for RedisCodecBuilder {
Expand All @@ -41,7 +27,6 @@ impl CodecBuilder for RedisCodecBuilder {
Self {
direction,
message_latency,
force_run_chain: None,
}
}

Expand All @@ -54,7 +39,7 @@ impl CodecBuilder for RedisCodecBuilder {
}
};
(
RedisDecoder::new(rx, self.direction, self.force_run_chain.clone()),
RedisDecoder::new(rx, self.direction),
RedisEncoder::new(tx, self.direction, self.message_latency.clone()),
)
}
Expand Down Expand Up @@ -89,7 +74,6 @@ pub struct RedisEncoder {
pub struct RedisDecoder {
// Some when Sink (because it receives responses)
request_header_rx: Option<mpsc::Receiver<RequestInfo>>,
force_run_chain: Option<Arc<Notify>>,
direction: Direction,
is_subscribed: bool,
}
Expand All @@ -98,13 +82,11 @@ impl RedisDecoder {
pub fn new(
request_header_rx: Option<mpsc::Receiver<RequestInfo>>,
direction: Direction,
force_run_chain: Option<Arc<Notify>>,
) -> Self {
Self {
direction,
request_header_rx,
is_subscribed: false,
force_run_chain,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ pub fn spawn_read_write_tasks<
in_tx: mpsc::Sender<Messages>,
mut out_rx: UnboundedReceiver<Messages>,
out_tx: UnboundedSender<Messages>,
force_run_chain: Option<Arc<Notify>>,
) {
let (decoder, encoder) = codec.build();
let mut reader = FramedRead::new(rx, decoder);
Expand Down Expand Up @@ -487,6 +488,9 @@ pub fn spawn_read_write_tasks<
// main task has shutdown, this task is no longer needed
return;
}
if let Some(force_run_chain) = force_run_chain.as_ref() {
force_run_chain.notify_one();
}
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
Expand Down Expand Up @@ -652,6 +656,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
in_tx,
out_rx,
out_tx.clone(),
None,
);
} else {
let (rx, tx) = stream.into_split();
Expand All @@ -662,6 +667,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
in_tx,
out_rx,
out_tx.clone(),
None,
);
};
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/redis/cluster_ports_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ mod test {
#[test]
fn test_rewrite_port_slots() {
let slots_pcap: &[u8] = b"*3\r\n*4\r\n:10923\r\n:16383\r\n*3\r\n$12\r\n192.168.80.6\r\n:6379\r\n$40\r\n3a7c357ed75d2aa01fca1e14ef3735a2b2b8ffac\r\n*3\r\n$12\r\n192.168.80.3\r\n:6379\r\n$40\r\n77c01b0ddd8668fff05e3f6a8aaf5f3ccd454a79\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$12\r\n192.168.80.5\r\n:6379\r\n$40\r\n969c6215d064e68593d384541ceeb57e9520dbed\r\n*3\r\n$12\r\n192.168.80.2\r\n:6379\r\n$40\r\n3929f69990a75be7b2d49594c57fe620862e6fd6\r\n*4\r\n:0\r\n:5460\r\n*3\r\n$12\r\n192.168.80.7\r\n:6379\r\n$40\r\n15d52a65d1fc7a53e34bf9193415aa39136882b2\r\n*3\r\n$12\r\n192.168.80.4\r\n:6379\r\n$40\r\ncd023916a3528fae7e606a10d8289a665d6c47b0\r\n";
let mut codec = RedisDecoder::new(None, Direction::Sink, None);
let mut codec = RedisDecoder::new(None, Direction::Sink);
let mut message = codec
.decode(&mut slots_pcap.into())
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ mod test {
// Wireshark capture from a Redis cluster with 3 masters and 3 replicas.
let slots_pcap: &[u8] = b"*3\r\n*4\r\n:10923\r\n:16383\r\n*3\r\n$12\r\n192.168.80.6\r\n:6379\r\n$40\r\n3a7c357ed75d2aa01fca1e14ef3735a2b2b8ffac\r\n*3\r\n$12\r\n192.168.80.3\r\n:6379\r\n$40\r\n77c01b0ddd8668fff05e3f6a8aaf5f3ccd454a79\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$12\r\n192.168.80.5\r\n:6379\r\n$40\r\n969c6215d064e68593d384541ceeb57e9520dbed\r\n*3\r\n$12\r\n192.168.80.2\r\n:6379\r\n$40\r\n3929f69990a75be7b2d49594c57fe620862e6fd6\r\n*4\r\n:0\r\n:5460\r\n*3\r\n$12\r\n192.168.80.7\r\n:6379\r\n$40\r\n15d52a65d1fc7a53e34bf9193415aa39136882b2\r\n*3\r\n$12\r\n192.168.80.4\r\n:6379\r\n$40\r\ncd023916a3528fae7e606a10d8289a665d6c47b0\r\n";

let mut codec = RedisDecoder::new(None, Direction::Sink, None);
let mut codec = RedisDecoder::new(None, Direction::Sink);

let mut message = codec
.decode(&mut slots_pcap.into())
Expand Down
99 changes: 63 additions & 36 deletions shotover/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::codec::{CodecBuilder, Direction};
use crate::frame::{Frame, RedisFrame};
use crate::message::{Message, Messages};
use crate::server::spawn_read_write_tasks;
Expand Down Expand Up @@ -113,58 +114,84 @@ impl Transform for RedisSinkSingle {
}

async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
// Return immediately if we have no messages.
// If we tried to send no messages we would block forever waiting for a reply that will never come.
if requests_wrapper.requests.is_empty() {
return Ok(requests_wrapper.requests);
}

if self.connection.is_none() {
let (in_tx, in_rx) = mpsc::channel::<Messages>(10_000);
let (out_tx, out_rx) = mpsc::unbounded_channel::<Messages>();
let codec = RedisCodecBuilder::new_sink(
"RedisSinkSingle".to_owned(),
self.force_run_chain.clone(),
);
let codec = RedisCodecBuilder::new(Direction::Sink, "RedisSinkSingle".to_owned());
if let Some(tls) = self.tls.as_mut() {
let tls_stream = tls.connect(self.connect_timeout, &self.address).await?;
let (rx, tx) = split(tls_stream);
spawn_read_write_tasks(codec, rx, tx, in_tx, out_rx, out_tx.clone());
spawn_read_write_tasks(
codec,
rx,
tx,
in_tx,
out_rx,
out_tx.clone(),
Some(self.force_run_chain.clone()),
);
} else {
let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?;
let (rx, tx) = tcp_stream.into_split();
spawn_read_write_tasks(codec, rx, tx, in_tx, out_rx, out_tx.clone());
spawn_read_write_tasks(
codec,
rx,
tx,
in_tx,
out_rx,
out_tx.clone(),
Some(self.force_run_chain.clone()),
);
}
self.connection = Some(Connection { in_rx, out_tx });
}

let connection = self.connection.as_mut().unwrap();

let requests_count = requests_wrapper.requests.len();
connection
.out_tx
.send(requests_wrapper.requests)
.map_err(|err| anyhow!("Failed to send messages to redis destination: {err:?}"))?;

let mut result = vec![];
let mut responses_count = 0;
while responses_count < requests_count {
let mut responses = connection
.in_rx
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message because RedisSinkSingle response processing task is dead"))?;

for response in &mut responses {
if let Some(Frame::Redis(RedisFrame::Error(_))) = response.frame() {
self.failed_requests.increment(1);
if requests_wrapper.requests.is_empty() {
// there are no requests, so no point sending any, but we should check for any responses without awaiting
if let Ok(mut responses) = self.connection.as_mut().unwrap().in_rx.try_recv() {
for response in &mut responses {
if let Some(Frame::Redis(RedisFrame::Error(_))) = response.frame() {
self.failed_requests.increment(1);
}
}
if response.request_id().is_some() {
responses_count += 1;
Ok(responses)
} else {
Ok(vec![])
}
} else {
let requests_count = requests_wrapper.requests.len();
self.connection
.as_mut()
.unwrap()
.out_tx
.send(requests_wrapper.requests)
.map_err(|err| anyhow!("Failed to send messages to redis destination: {err:?}"))?;

let mut result = vec![];
let mut responses_count = 0;
while responses_count < requests_count {
let mut responses = self
.connection
.as_mut()
.unwrap()
.in_rx
.recv()
.await
.ok_or_else(|| {
anyhow!("Failed to receive message because recv task is dead")
})?;

for response in &mut responses {
if let Some(Frame::Redis(RedisFrame::Error(_))) = response.frame() {
self.failed_requests.increment(1);
}
if response.request_id().is_some() {
responses_count += 1;
}
}
result.extend(responses);
}
result.extend(responses);
Ok(result)
}
Ok(result)
}
}

0 comments on commit b82d02f

Please sign in to comment.