Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RedisCache: port to MessageId invariants #1516

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ impl Message {
}
}

/// This method should be called when generating a new request travelling down a seperate chain to an original request.
/// The generated request will share the same MessageId as the message it is diverged from.
pub fn from_frame_diverged(frame: Frame, diverged_from: &Message) -> Self {
Message {
codec_state: frame.as_codec_state(),
inner: Some(MessageInner::Modified { frame }),
meta_timestamp: None,
received_from_source_or_sink_at: diverged_from.received_from_source_or_sink_at,
id: diverged_from.id(),
request_id: None,
}
}

/// Same as [`Message::from_bytes`] but `received_from_source_or_sink_at` is set to None.
pub fn from_bytes(bytes: Bytes, codec_state: CodecState) -> Self {
Self::from_bytes_at_instant(bytes, codec_state, None)
Expand Down
4 changes: 0 additions & 4 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,6 @@ pub trait Transform: Send {
/// - When writing protocol generic transforms: always ensure this is upheld.
/// - When writing a transform specific to a protocol that is out of order: you can disregard this requirement
/// * This is currently only cassandra
/// * Deprecated invariants:
/// + Many transforms rely on the number of responses equalling the number of requests and that requests will be in the same order as the responses.
/// Currently shotover maintains this gaurantee for backwards compatibility
/// but the gaurantee will be removed as soon as the transforms have been altered to no longer rely on it.
///
/// # Naming
/// Transform also have different naming conventions.
Expand Down
218 changes: 120 additions & 98 deletions shotover/src/transforms/redis/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::chain::TransformChainConfig;
use crate::frame::{CassandraFrame, CassandraOperation, Frame, MessageType, RedisFrame};
use crate::message::{Message, Messages};
use crate::message::{Message, MessageIdMap, Messages, Metadata};
use crate::transforms::chain::{TransformChain, TransformChainBuilder};
use crate::transforms::{
Transform, TransformBuilder, TransformConfig, TransformContextConfig, Wrapper,
Expand Down Expand Up @@ -124,6 +124,9 @@ impl TransformBuilder for SimpleRedisCacheBuilder {
cache_chain: self.cache_chain.build(),
caching_schema: self.caching_schema.clone(),
missed_requests: self.missed_requests.clone(),
pending_cache_requests: Default::default(),
cache_hit_cassandra_responses: vec![],
cache_miss_cassandra_requests: vec![],
})
}

Expand Down Expand Up @@ -151,122 +154,144 @@ pub struct SimpleRedisCache {
cache_chain: TransformChain,
caching_schema: HashMap<FQName, TableCacheSchema>,
missed_requests: Counter,
pending_cache_requests: MessageIdMap<Message>,

/// cleared by the end of every `Transform::transform` call, stored here to avoid reallocation
cache_hit_cassandra_responses: Vec<Message>,
/// cleared by the end of every `Transform::transform` call, stored here to avoid reallocation
cache_miss_cassandra_requests: Vec<Message>,
}

impl SimpleRedisCache {
fn build_cache_query(&mut self, cassandra_messages: &mut Messages) -> (Messages, Vec<usize>) {
let mut indices = Vec::with_capacity(cassandra_messages.len());
let redis_requests = cassandra_messages
.iter_mut()
.enumerate()
.filter_map(|(i, message)| {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Query { query, .. },
..
})) = message.frame()
{
if let CacheableState::CacheRow = is_cacheable(query) {
if let Some(table_name) = query.get_table_name() {
if let Some(table_cache_schema) = self.caching_schema.get(table_name) {
match build_redis_key_from_cql3(query, table_cache_schema) {
Ok(address) => {
indices.push(i);
return Some(Message::from_frame_at_instant(
Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString("HGET".into()),
RedisFrame::BulkString(address.key),
RedisFrame::BulkString(address.field),
])),
message.received_from_source_or_sink_at,
));
}
Err(_e) => {} // TODO match Err(()) here or just have build_redis_key_from_cql3 return Option
}
fn build_cache_query(&mut self, request: &mut Message) -> Option<Message> {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Query { query, .. },
..
})) = request.frame()
{
if let CacheableState::CacheRow = is_cacheable(query) {
if let Some(table_name) = query.get_table_name() {
if let Some(table_cache_schema) = self.caching_schema.get(table_name) {
match build_redis_key_from_cql3(query, table_cache_schema) {
Ok(address) => {
return Some(Message::from_frame_diverged(
Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString("HGET".into()),
RedisFrame::BulkString(address.key),
RedisFrame::BulkString(address.field),
])),
request,
));
}
Err(_e) => {} // TODO match Err(()) here or just have build_redis_key_from_cql3 return Option
}
}
}
None
})
.collect();
(redis_requests, indices)
}
}

None
}

fn unwrap_cache_response(
&self,
mut redis_responses: Messages,
redis_indices: Vec<usize>,
cassandra_requests: &mut Messages,
) -> Vec<(Message, usize)> {
redis_responses
.iter_mut()
.zip(redis_indices)
.filter_map(|(redis_response, redis_index)| {
match redis_response.frame() {
Some(Frame::Redis(redis_frame)) => {
match redis_frame {
RedisFrame::Error(err) => {
error!("Redis cache server returned error: {err:?}");
None
}
RedisFrame::BulkString(redis_bytes) => {
match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None) {
Ok(mut response_frame) => {
if let Some(Frame::Cassandra(request_frame)) =
cassandra_requests[redis_index].frame()
{
if response_frame.version == request_frame.version {
response_frame.stream_id = request_frame.stream_id;
Some((
Message::from_frame_at_instant(
Frame::Cassandra(response_frame),
redis_response.received_from_source_or_sink_at
),
redis_index,
))
fn unwrap_cache_response(&mut self, redis_responses: Messages) {
for mut redis_response in redis_responses {
let original_request = self
.pending_cache_requests
.remove(
&redis_response
.request_id()
.expect("This must have a request, since we dont use redis pubsub"),
)
.expect("There must be a pending request, since we store a pending request for all redis requests");
let cassandra_frame = match redis_response.frame() {
Some(Frame::Redis(redis_frame)) => {
match redis_frame {
RedisFrame::Error(err) => {
error!("Redis cache server returned error: {err:?}");
None
}
RedisFrame::BulkString(redis_bytes) => {
match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None)
{
Ok(mut response_frame) => {
match original_request.metadata() {
Ok(Metadata::Cassandra(meta)) => {
if response_frame.version == meta.version {
response_frame.stream_id = meta.stream_id;
Some(response_frame)
} else {
// TODO: we should have some logic to convert to the
// expected version instead of just failing here
error!("Failed to use cache as mismatch between request version and cached response version");
None
}
} else {
error!("Failed to use cache as not cassandra request");
}
Ok(_) => {
error!("Not a cassandra request");
None
}
Err(err) => {
error!("invalid request {err:?}");
None
}
}
Err(err) => {
error!("Failed to decode cached cassandra message {err:?}");
None
}
}
Err(err) => {
error!("Failed to decode cached cassandra message {err:?}");
None
}
}
RedisFrame::Null => {
self.missed_requests.increment(1);
None
}
_ => None,
}
RedisFrame::Null => {
self.missed_requests.increment(1);
None
}
_ => None,
}
_ => None,
}
})
.collect()
_ => None,
};
match cassandra_frame {
Some(cassandra_frame) => {
self.cache_hit_cassandra_responses
.push(Message::from_frame_diverged(
Frame::Cassandra(cassandra_frame),
&redis_response,
));
}
None => self.cache_miss_cassandra_requests.push(original_request),
}
}
}

async fn read_from_cache(
&mut self,
cassandra_requests: &mut Messages,
local_addr: SocketAddr,
) -> Result<Vec<(Message, usize)>> {
let (redis_requests, redis_indices) = self.build_cache_query(cassandra_requests);
) -> Result<()> {
let mut redis_requests = Vec::with_capacity(cassandra_requests.len());

for mut cassandra_request in cassandra_requests.drain(..) {
match self.build_cache_query(&mut cassandra_request) {
// The request is cacheable, store the cassandra request for later and send the redis request
Some(redis_request) => {
self.pending_cache_requests
.insert(cassandra_request.id(), cassandra_request);
redis_requests.push(redis_request);
}
// The request is not cacheable, add it directly to the cache miss list
None => self.cache_miss_cassandra_requests.push(cassandra_request),
}
}

let redis_responses = self
.cache_chain
.process_request(Wrapper::new_with_addr(redis_requests, local_addr))
.await?;

Ok(self.unwrap_cache_response(redis_responses, redis_indices, cassandra_requests))
self.unwrap_cache_response(redis_responses);

Ok(())
}

/// Clears the cache for the entire table
Expand Down Expand Up @@ -584,31 +609,28 @@ impl Transform for SimpleRedisCache {
}

async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let cache_responses = self
.read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr)
self.read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr)
.await
.unwrap_or_else(|err| {
error!("Failed to fetch from cache: {err:?}");
vec![]
});

// remove requests we succesfully got back a cached response for
for (_, cache_index) in cache_responses.iter().rev() {
requests_wrapper.requests.remove(*cache_index);
}

.unwrap_or_else(|err| error!("Failed to fetch from cache: {err:?}"));

// send the cache misses to cassandra
// since requests_wrapper.requests is now empty we can just swap the two vectors to avoid reallocations
assert!(requests_wrapper.requests.is_empty());
std::mem::swap(
&mut requests_wrapper.requests,
&mut self.cache_miss_cassandra_requests,
);
let mut responses = self
.execute_upstream_and_write_to_cache(requests_wrapper)
.await?;

// mix cached response in with our non cached responses
for (cache_response, cache_index) in cache_responses.into_iter() {
responses.insert(cache_index, cache_response);
}
// add the cache hits to the final response
responses.append(&mut self.cache_hit_cassandra_responses);

Ok(responses)
}
}

#[cfg(test)]
mod test {
use crate::frame::cassandra::parse_statement_single;
Expand Down
Loading