diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index e510c1cde..73e82517c 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -1,13 +1,16 @@ use core::fmt; use std::fmt::{Debug, Formatter}; +use std::pin::Pin; use anyhow::Result; use async_trait::async_trait; +use futures::Future; use serde::{Deserialize, Serialize}; +use crate::concurrency::FuturesOrdered; use crate::config::topology::TopicHolder; use crate::error::ChainResponse; -use crate::message::Messages; +use crate::message::{Message, Messages}; use crate::transforms::cassandra::cassandra_codec_destination::{ CodecConfiguration, CodecDestination, }; @@ -361,3 +364,7 @@ pub trait Transform: Send { Ok(()) } } + +pub type ResponseFuturesOrdered = FuturesOrdered< + Pin)>> + std::marker::Send>>, +>; diff --git a/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs b/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs index 563be1fa2..9fb065d44 100644 --- a/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs +++ b/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs @@ -1,12 +1,11 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::iter::*; -use std::pin::Pin; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use async_trait::async_trait; +use derivative::Derivative; use futures::stream::FuturesUnordered; -use futures::{Future, TryFutureExt}; -use futures::{SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt, TryFutureExt}; use hyper::body::Bytes; use itertools::Itertools; use metrics::counter; @@ -21,7 +20,6 @@ use tokio::time::Duration; use tokio_util::codec::Framed; use tracing::{debug, error, info, trace, warn}; -use crate::concurrency::FuturesOrdered; use crate::config::topology::TopicHolder; use crate::error::ChainResponse; use crate::message::{Message, MessageDetails, Messages, QueryResponse}; @@ -29,12 +27,12 @@ use crate::protocols::redis_codec::RedisCodec; use crate::protocols::RawFrame; use crate::transforms::util::cluster_connection_pool::ConnectionPool; use crate::transforms::util::{Request, Response}; +use crate::transforms::ResponseFuturesOrdered; use crate::transforms::CONTEXT_CHAIN_NAME; use crate::transforms::{Transform, Transforms, TransformsFromConfig, Wrapper}; const SLOT_SIZE: usize = 16384; -type SlotMap = BTreeMap; type ChannelMap = HashMap>>; #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] @@ -56,9 +54,8 @@ impl TransformsFromConfig for RedisClusterConfig { ); let mut connection_map: ChannelMap = ChannelMap::new(); - debug!("Connected to cluster: {:?}", connection_pool); - for (node, _, _) in &slots.masters { + for node in slots.masters.values() { match connection_pool .get_connection(&node, self.connection_count.unwrap_or(1)) .await @@ -72,18 +69,11 @@ impl TransformsFromConfig for RedisClusterConfig { } } - let slot_map: SlotMap = slots - .masters - .iter() - .map(|(host, _start, end)| (*end, host.clone())) - .collect(); - - debug!("Mapped cluster: {:?}", slot_map); debug!("Channels: {:?}", connection_map); Ok(Transforms::RedisCluster(RedisCluster { name: "RedisCluster", - slots: slot_map, + slots, channels: connection_map, load_scores: HashMap::new(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -109,19 +99,9 @@ pub struct RedisCluster { impl RedisCluster { async fn rebuild_slot_map(&mut self) -> Result<()> { let contact_points = self.channels.keys().cloned().collect_vec(); - if let Ok(mapping) = get_topology(&contact_points).await { - debug!("successfully updated map {:#?}", mapping); - let slot_map: SlotMap = mapping - .masters - .iter() - .map(|(host, _start, end)| (*end, host.clone())) - .collect(); - self.slots = slot_map; - Ok(()) - } else { - debug!("Couldn't update cluster slot map"); - Err(anyhow!("Couldn't update cluster slot map")) - } + self.slots = get_topology(&contact_points).await?; + debug!("successfully updated map {:#?}", self.slots); + Ok(()) } #[inline] @@ -207,7 +187,7 @@ impl RedisCluster { RawFrame::Redis(Frame::Array(ref commands)) => { match RoutingInfo::for_command_frame(&commands) { Some(RoutingInfo::Slot(slot)) => { - if let Some((_, lookup)) = self.slots.range(&slot..).next() { + if let Some((_, lookup)) = self.slots.masters.range(&slot..).next() { // let idx = self.choose(lookup); vec![lookup.clone()] } else { @@ -234,11 +214,42 @@ impl RedisCluster { } } -#[derive(Debug)] -pub struct SlotsMapping { - pub masters: Vec<(String, u16, u16)>, - pub followers: Vec<(String, u16, u16)>, - pub nodes: HashSet, +#[derive(Clone, Derivative)] +#[derivative(Debug)] +pub struct SlotMap { + masters: BTreeMap, + replicas: BTreeMap, + + // Hide redundant information. + #[derivative(Debug = "ignore")] + nodes: HashSet, +} + +impl SlotMap { + fn from_entries( + master_entries: Vec<(String, u16, u16)>, + replica_entries: Vec<(String, u16, u16)>, + ) -> Self { + fn to_interval_map(slot_entries: Vec<(String, u16, u16)>) -> BTreeMap { + slot_entries + .into_iter() + .map(|(host, _start, end)| (end, host)) + .collect() + } + + let nodes: HashSet<_> = master_entries + .iter() + .map(|e| &e.0) + .chain(replica_entries.iter().map(|e| &e.0)) + .cloned() + .collect(); + + Self { + masters: to_interval_map(master_entries), + replicas: to_interval_map(replica_entries), + nodes, + } + } } #[derive(Debug, Clone, Copy)] @@ -318,85 +329,82 @@ impl RoutingInfo { } fn build_slot_to_server( - frames: &mut Vec, - nodes: &mut HashSet, - slots: &mut Vec<(String, u16, u16)>, + frames: &[Frame], + slot_entries: &mut Vec<(String, u16, u16)>, start: u16, end: u16, -) { - if frames.len() < 2 { - return; - } +) -> Result<()> { + ensure!(start <= end, "invalid slot range: {}-{}", start, end); + ensure!(frames.len() >= 2, "expected at least two fields"); let ip = if let Frame::BulkString(ref ip) = frames[0] { String::from_utf8_lossy(ip.as_ref()).to_string() } else { - return; + bail!("unexpected type for ip"); }; + if ip.is_empty() { + warn!("Master IP unknown for slots {}-{}.", start, end); + return Ok(()); + } + let port = if let Frame::Integer(port) = frames[1] { port } else { - return; + bail!("unexpected type for port"); }; - if start == end { - return; - } - nodes.insert(format!("{}:{}", ip, port)); - slots.push((format!("{}:{}", ip, port), start, end)); + + slot_entries.push((format!("{}:{}", ip, port), start, end)); + + Ok(()) } -fn parse_slots(contacts_raw: Frame) -> Result { - let mut slots: Vec<(String, u16, u16)> = vec![]; - let mut replica_slots: Vec<(String, u16, u16)> = vec![]; - let mut nodes: HashSet = HashSet::new(); - - match contacts_raw { - Frame::Array(response) => { - let mut response_iter = response.into_iter(); - while let Some(Frame::Array(item)) = response_iter.next() { - let mut enumerator = item.into_iter().enumerate(); - - let mut start: u16 = 0; - let mut end: u16 = 0; - - while let Some((index, item)) = enumerator.next() { - match (index, item) { - (0, Frame::Integer(i)) => start = i as u16, - (1, Frame::Integer(i)) => end = i as u16, - (2, Frame::Array(mut master)) => { - build_slot_to_server(&mut master, &mut nodes, &mut slots, start, end) +fn parse_slots(response: Frame) -> Result { + let mut master_entries: Vec<(String, u16, u16)> = vec![]; + let mut replica_entries: Vec<(String, u16, u16)> = vec![]; + + match response { + Frame::Array(results) => { + for result in results.into_iter() { + match result { + Frame::Array(result) => { + let mut start: u16 = 0; + let mut end: u16 = 0; + + for (index, item) in result.into_iter().enumerate() { + match (index, item) { + (0, Frame::Integer(i)) => start = i as u16, + (1, Frame::Integer(i)) => end = i as u16, + (2, Frame::Array(master)) => { + build_slot_to_server(&master, &mut master_entries, start, end) + .context("Failed to decode master slots")? + } + (_, Frame::Array(replica)) => { + build_slot_to_server(&replica, &mut replica_entries, start, end) + .context("Failed to decode replica slots")? + } + _ => bail!("Unexpected value in slot map"), + } } - (n, Frame::Array(mut follow)) if n > 2 => build_slot_to_server( - &mut follow, - &mut nodes, - &mut replica_slots, - start, - end, - ), - _ => return Err(anyhow!("Unexpected value in slot map")), } + _ => bail!("Unexpected value in slot map"), } } } Frame::Error(err) => { - return Err(anyhow!("Frame error: {}", err)); + bail!("Frame error: {}", err); } _ => {} } - if slots.is_empty() { + if master_entries.is_empty() { Err(anyhow!("Empty slot map!")) } else { - Ok(SlotsMapping { - masters: slots, - followers: replica_slots, - nodes, - }) + Ok(SlotMap::from_entries(master_entries, replica_entries)) } } -async fn get_topology_from_node(stream: TcpStream) -> Result { +async fn get_topology_from_node(stream: TcpStream) -> Result { let mut outbound_framed_codec = Framed::new(stream, RedisCodec::new(true, 1)); if outbound_framed_codec .send(Messages::new_from_message(Message { @@ -411,8 +419,8 @@ async fn get_topology_from_node(stream: TcpStream) -> Result { .is_ok() { if let Some(Ok(mut o)) = outbound_framed_codec.next().await { - if let RawFrame::Redis(contacts_raw) = o.messages.pop().unwrap().original { - parse_slots(contacts_raw).map_err(|e| anyhow!("couldn't decode map: {}", e)) + if let RawFrame::Redis(response) = o.messages.pop().unwrap().original { + parse_slots(response).map_err(|e| anyhow!("couldn't decode map: {}", e)) } else { Err(anyhow!("couldn't decode map")) } @@ -424,7 +432,7 @@ async fn get_topology_from_node(stream: TcpStream) -> Result { } } -async fn get_topology(first_contact_points: &[String]) -> Result { +async fn get_topology(first_contact_points: &[String]) -> Result { let mut results = FuturesUnordered::new(); for contact in first_contact_points { @@ -485,15 +493,9 @@ impl Transform for RedisCluster { self.rebuild_slot_map().await?; self.rebuild_slots = false; } - let mut responses: FuturesOrdered< - Pin< - Box< - dyn Future)>> - + std::marker::Send, - >, - >, - > = FuturesOrdered::new(); - // let message = message_wrapper.message.messages.pop().unwrap(); + + let mut responses = ResponseFuturesOrdered::new(); + for message in message_wrapper.message { let sender = self.get_channels(&message.original).await; @@ -578,7 +580,10 @@ impl Transform for RedisCluster { RawFrame::Redis(Frame::Moved { slot, host, port }) => { debug!("Got MOVE frame {} {} {}", slot, host, port); - self.slots.insert(slot, format!("{}:{}", &host, &port)); + self.slots + .masters + .insert(slot, format!("{}:{}", &host, &port)); + self.rebuild_slots = true; let one_rx = self