Skip to content

Commit

Permalink
Fix low connection_count performance (#1459)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 8, 2024
1 parent 547704d commit 5bea9e9
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 102 deletions.
85 changes: 36 additions & 49 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ pub struct CassandraDecoder {
version: Arc<AtomicVersionState>,
compression: Arc<AtomicCompressionState>,
handshake_complete: Arc<AtomicBool>,
messages: Vec<Message>,
current_use_keyspace: Option<Identifier>,
direction: Direction,
version_counter: VersionCounter,
Expand All @@ -194,7 +193,6 @@ impl CassandraDecoder {
version,
compression,
handshake_complete,
messages: vec![],
current_use_keyspace: None,
direction,
version_counter,
Expand Down Expand Up @@ -575,58 +573,47 @@ impl Decoder for CassandraDecoder {
let handshake_complete = self.handshake_complete.load(Ordering::Relaxed);
let received_at = Instant::now();

loop {
match self.check_size(src, version, compression, handshake_complete) {
Ok(frame_len) => {
let mut messages = self
.decode_frame(
src,
frame_len,
version,
compression,
handshake_complete,
received_at,
)
.map_err(CodecReadError::Parser)?;

for message in messages.iter_mut() {
if let Ok(Metadata::Cassandra(CassandraMetadata {
opcode: Opcode::Query | Opcode::Batch,
..
})) = message.metadata()
{
if let Some(keyspace) = get_use_keyspace(message) {
self.current_use_keyspace = Some(keyspace);
}

if let Some(keyspace) = &self.current_use_keyspace {
set_default_keyspace(message, keyspace);
}
match self.check_size(src, version, compression, handshake_complete) {
Ok(frame_len) => {
let mut messages = self
.decode_frame(
src,
frame_len,
version,
compression,
handshake_complete,
received_at,
)
.map_err(CodecReadError::Parser)?;

for message in messages.iter_mut() {
if let Ok(Metadata::Cassandra(CassandraMetadata {
opcode: Opcode::Query | Opcode::Batch,
..
})) = message.metadata()
{
if let Some(keyspace) = get_use_keyspace(message) {
self.current_use_keyspace = Some(keyspace);
}
}

self.messages.append(&mut messages);
}
Err(CheckFrameSizeError::NotEnoughBytes) => {
if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
if let Some(keyspace) = &self.current_use_keyspace {
set_default_keyspace(message, keyspace);
}
}
}
Err(CheckFrameSizeError::UnsupportedVersion(version)) => {
return Err(reject_protocol_version(version));
}
Err(CheckFrameSizeError::UnsupportedCompression(msg)) => {
return Err(CodecReadError::Parser(anyhow!(msg)));
}
err => {
return Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
)))
}
Ok(Some(messages))
}
Err(CheckFrameSizeError::NotEnoughBytes) => Ok(None),
Err(CheckFrameSizeError::UnsupportedVersion(version)) => {
Err(reject_protocol_version(version))
}
Err(CheckFrameSizeError::UnsupportedCompression(msg)) => {
Err(CodecReadError::Parser(anyhow!(msg)))
}
err => Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
))),
}
}
}
Expand Down
39 changes: 17 additions & 22 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, Messages, ProtocolType};
use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use kafka_protocol::messages::ApiKey;
use metrics::Histogram;
use std::sync::mpsc;
Expand Down Expand Up @@ -59,7 +59,6 @@ impl CodecBuilder for KafkaCodecBuilder {

pub struct KafkaDecoder {
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
messages: Messages,
direction: Direction,
}

Expand All @@ -70,7 +69,6 @@ impl KafkaDecoder {
) -> Self {
KafkaDecoder {
request_header_rx,
messages: vec![],
direction,
}
}
Expand All @@ -95,31 +93,28 @@ impl Decoder for KafkaDecoder {

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
loop {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header = if let Some(rx) = self.request_header_rx.as_ref() {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header =
if let Some(rx) = self.request_header_rx.as_ref() {
Some(rx.recv().map_err(|_| {
CodecReadError::Parser(anyhow!("kafka encoder half was lost"))
})?)
} else {
None
};
self.messages.push(Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka { request_header },
Some(received_at),
));
} else if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
}
Ok(Some(vec![Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka { request_header },
Some(received_at),
)]))
} else {
Ok(None)
}
}
}
Expand Down
46 changes: 17 additions & 29 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::{Frame, MessageType};
use crate::message::{Encodable, Message, Messages};
use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use metrics::Histogram;
use redis_protocol::resp2::prelude::decode_mut;
use redis_protocol::resp2::prelude::encode_bytes;
Expand Down Expand Up @@ -47,16 +47,12 @@ pub struct RedisEncoder {
}

pub struct RedisDecoder {
messages: Messages,
direction: Direction,
}

impl RedisDecoder {
pub fn new(direction: Direction) -> Self {
Self {
messages: Vec::new(),
direction,
}
Self { direction }
}
}

Expand All @@ -66,30 +62,22 @@ impl Decoder for RedisDecoder {

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
loop {
match decode_mut(src).map_err(|e| {
CodecReadError::Parser(anyhow!(e).context("Error decoding redis frame"))
})? {
Some((frame, _size, bytes)) => {
tracing::debug!(
"{}: incoming redis message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
self.messages.push(Message::from_bytes_and_frame_at_instant(
bytes,
Frame::Redis(frame),
Some(received_at),
));
}
None => {
if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
}
}
match decode_mut(src)
.map_err(|e| CodecReadError::Parser(anyhow!(e).context("Error decoding redis frame")))?
{
Some((frame, _size, bytes)) => {
tracing::debug!(
"{}: incoming redis message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
Ok(Some(vec![Message::from_bytes_and_frame_at_instant(
bytes,
Frame::Redis(frame),
Some(received_at),
)]))
}
None => Ok(None),
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,14 @@ impl<C: CodecBuilder + 'static> Handler<C> {
debug!("Waiting for message {client_details}");
let responses = tokio::select! {
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
debug!("Received requests from client {:?}", requests);
match requests {
Some(requests) => self.process_forward(client_details, local_addr, &out_tx, requests).await?,
Some(mut requests) => {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process_forward(client_details, local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
return Ok(())
Expand Down

0 comments on commit 5bea9e9

Please sign in to comment.