Skip to content

Commit

Permalink
Refactor SlotMap for future replica support (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
XA21X authored Aug 26, 2021
1 parent bce6ecd commit 926b6c3
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 99 deletions.
9 changes: 8 additions & 1 deletion shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use core::fmt;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;

use anyhow::Result;
use async_trait::async_trait;
use futures::Future;
use serde::{Deserialize, Serialize};

use crate::concurrency::FuturesOrdered;
use crate::config::topology::TopicHolder;
use crate::error::ChainResponse;
use crate::message::Messages;
use crate::message::{Message, Messages};
use crate::transforms::cassandra::cassandra_codec_destination::{
CodecConfiguration, CodecDestination,
};
Expand Down Expand Up @@ -361,3 +364,7 @@ pub trait Transform: Send {
Ok(())
}
}

pub type ResponseFuturesOrdered = FuturesOrdered<
Pin<Box<dyn Future<Output = Result<(Message, Result<Messages>)>> + std::marker::Send>>,
>;
201 changes: 103 additions & 98 deletions shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::iter::*;
use std::pin::Pin;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use async_trait::async_trait;
use derivative::Derivative;
use futures::stream::FuturesUnordered;
use futures::{Future, TryFutureExt};
use futures::{SinkExt, StreamExt};
use futures::{SinkExt, StreamExt, TryFutureExt};
use hyper::body::Bytes;
use itertools::Itertools;
use metrics::counter;
Expand All @@ -21,20 +20,19 @@ use tokio::time::Duration;
use tokio_util::codec::Framed;
use tracing::{debug, error, info, trace, warn};

use crate::concurrency::FuturesOrdered;
use crate::config::topology::TopicHolder;
use crate::error::ChainResponse;
use crate::message::{Message, MessageDetails, Messages, QueryResponse};
use crate::protocols::redis_codec::RedisCodec;
use crate::protocols::RawFrame;
use crate::transforms::util::cluster_connection_pool::ConnectionPool;
use crate::transforms::util::{Request, Response};
use crate::transforms::ResponseFuturesOrdered;
use crate::transforms::CONTEXT_CHAIN_NAME;
use crate::transforms::{Transform, Transforms, TransformsFromConfig, Wrapper};

const SLOT_SIZE: usize = 16384;

type SlotMap = BTreeMap<u16, String>;
type ChannelMap = HashMap<String, Vec<UnboundedSender<Request>>>;

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
Expand All @@ -56,9 +54,8 @@ impl TransformsFromConfig for RedisClusterConfig {
);

let mut connection_map: ChannelMap = ChannelMap::new();
debug!("Connected to cluster: {:?}", connection_pool);

for (node, _, _) in &slots.masters {
for node in slots.masters.values() {
match connection_pool
.get_connection(&node, self.connection_count.unwrap_or(1))
.await
Expand All @@ -72,18 +69,11 @@ impl TransformsFromConfig for RedisClusterConfig {
}
}

let slot_map: SlotMap = slots
.masters
.iter()
.map(|(host, _start, end)| (*end, host.clone()))
.collect();

debug!("Mapped cluster: {:?}", slot_map);
debug!("Channels: {:?}", connection_map);

Ok(Transforms::RedisCluster(RedisCluster {
name: "RedisCluster",
slots: slot_map,
slots,
channels: connection_map,
load_scores: HashMap::new(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
Expand All @@ -109,19 +99,9 @@ pub struct RedisCluster {
impl RedisCluster {
async fn rebuild_slot_map(&mut self) -> Result<()> {
let contact_points = self.channels.keys().cloned().collect_vec();
if let Ok(mapping) = get_topology(&contact_points).await {
debug!("successfully updated map {:#?}", mapping);
let slot_map: SlotMap = mapping
.masters
.iter()
.map(|(host, _start, end)| (*end, host.clone()))
.collect();
self.slots = slot_map;
Ok(())
} else {
debug!("Couldn't update cluster slot map");
Err(anyhow!("Couldn't update cluster slot map"))
}
self.slots = get_topology(&contact_points).await?;
debug!("successfully updated map {:#?}", self.slots);
Ok(())
}

#[inline]
Expand Down Expand Up @@ -207,7 +187,7 @@ impl RedisCluster {
RawFrame::Redis(Frame::Array(ref commands)) => {
match RoutingInfo::for_command_frame(&commands) {
Some(RoutingInfo::Slot(slot)) => {
if let Some((_, lookup)) = self.slots.range(&slot..).next() {
if let Some((_, lookup)) = self.slots.masters.range(&slot..).next() {
// let idx = self.choose(lookup);
vec![lookup.clone()]
} else {
Expand All @@ -234,11 +214,42 @@ impl RedisCluster {
}
}

#[derive(Debug)]
pub struct SlotsMapping {
pub masters: Vec<(String, u16, u16)>,
pub followers: Vec<(String, u16, u16)>,
pub nodes: HashSet<String>,
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct SlotMap {
masters: BTreeMap<u16, String>,
replicas: BTreeMap<u16, String>,

// Hide redundant information.
#[derivative(Debug = "ignore")]
nodes: HashSet<String>,
}

impl SlotMap {
fn from_entries(
master_entries: Vec<(String, u16, u16)>,
replica_entries: Vec<(String, u16, u16)>,
) -> Self {
fn to_interval_map(slot_entries: Vec<(String, u16, u16)>) -> BTreeMap<u16, String> {
slot_entries
.into_iter()
.map(|(host, _start, end)| (end, host))
.collect()
}

let nodes: HashSet<_> = master_entries
.iter()
.map(|e| &e.0)
.chain(replica_entries.iter().map(|e| &e.0))
.cloned()
.collect();

Self {
masters: to_interval_map(master_entries),
replicas: to_interval_map(replica_entries),
nodes,
}
}
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -318,85 +329,82 @@ impl RoutingInfo {
}

fn build_slot_to_server(
frames: &mut Vec<Frame>,
nodes: &mut HashSet<String>,
slots: &mut Vec<(String, u16, u16)>,
frames: &[Frame],
slot_entries: &mut Vec<(String, u16, u16)>,
start: u16,
end: u16,
) {
if frames.len() < 2 {
return;
}
) -> Result<()> {
ensure!(start <= end, "invalid slot range: {}-{}", start, end);
ensure!(frames.len() >= 2, "expected at least two fields");

let ip = if let Frame::BulkString(ref ip) = frames[0] {
String::from_utf8_lossy(ip.as_ref()).to_string()
} else {
return;
bail!("unexpected type for ip");
};

if ip.is_empty() {
warn!("Master IP unknown for slots {}-{}.", start, end);
return Ok(());
}

let port = if let Frame::Integer(port) = frames[1] {
port
} else {
return;
bail!("unexpected type for port");
};
if start == end {
return;
}
nodes.insert(format!("{}:{}", ip, port));
slots.push((format!("{}:{}", ip, port), start, end));

slot_entries.push((format!("{}:{}", ip, port), start, end));

Ok(())
}

fn parse_slots(contacts_raw: Frame) -> Result<SlotsMapping> {
let mut slots: Vec<(String, u16, u16)> = vec![];
let mut replica_slots: Vec<(String, u16, u16)> = vec![];
let mut nodes: HashSet<String> = HashSet::new();

match contacts_raw {
Frame::Array(response) => {
let mut response_iter = response.into_iter();
while let Some(Frame::Array(item)) = response_iter.next() {
let mut enumerator = item.into_iter().enumerate();

let mut start: u16 = 0;
let mut end: u16 = 0;

while let Some((index, item)) = enumerator.next() {
match (index, item) {
(0, Frame::Integer(i)) => start = i as u16,
(1, Frame::Integer(i)) => end = i as u16,
(2, Frame::Array(mut master)) => {
build_slot_to_server(&mut master, &mut nodes, &mut slots, start, end)
fn parse_slots(response: Frame) -> Result<SlotMap> {
let mut master_entries: Vec<(String, u16, u16)> = vec![];
let mut replica_entries: Vec<(String, u16, u16)> = vec![];

match response {
Frame::Array(results) => {
for result in results.into_iter() {
match result {
Frame::Array(result) => {
let mut start: u16 = 0;
let mut end: u16 = 0;

for (index, item) in result.into_iter().enumerate() {
match (index, item) {
(0, Frame::Integer(i)) => start = i as u16,
(1, Frame::Integer(i)) => end = i as u16,
(2, Frame::Array(master)) => {
build_slot_to_server(&master, &mut master_entries, start, end)
.context("Failed to decode master slots")?
}
(_, Frame::Array(replica)) => {
build_slot_to_server(&replica, &mut replica_entries, start, end)
.context("Failed to decode replica slots")?
}
_ => bail!("Unexpected value in slot map"),
}
}
(n, Frame::Array(mut follow)) if n > 2 => build_slot_to_server(
&mut follow,
&mut nodes,
&mut replica_slots,
start,
end,
),
_ => return Err(anyhow!("Unexpected value in slot map")),
}
_ => bail!("Unexpected value in slot map"),
}
}
}
Frame::Error(err) => {
return Err(anyhow!("Frame error: {}", err));
bail!("Frame error: {}", err);
}
_ => {}
}

if slots.is_empty() {
if master_entries.is_empty() {
Err(anyhow!("Empty slot map!"))
} else {
Ok(SlotsMapping {
masters: slots,
followers: replica_slots,
nodes,
})
Ok(SlotMap::from_entries(master_entries, replica_entries))
}
}

async fn get_topology_from_node(stream: TcpStream) -> Result<SlotsMapping> {
async fn get_topology_from_node(stream: TcpStream) -> Result<SlotMap> {
let mut outbound_framed_codec = Framed::new(stream, RedisCodec::new(true, 1));
if outbound_framed_codec
.send(Messages::new_from_message(Message {
Expand All @@ -411,8 +419,8 @@ async fn get_topology_from_node(stream: TcpStream) -> Result<SlotsMapping> {
.is_ok()
{
if let Some(Ok(mut o)) = outbound_framed_codec.next().await {
if let RawFrame::Redis(contacts_raw) = o.messages.pop().unwrap().original {
parse_slots(contacts_raw).map_err(|e| anyhow!("couldn't decode map: {}", e))
if let RawFrame::Redis(response) = o.messages.pop().unwrap().original {
parse_slots(response).map_err(|e| anyhow!("couldn't decode map: {}", e))
} else {
Err(anyhow!("couldn't decode map"))
}
Expand All @@ -424,7 +432,7 @@ async fn get_topology_from_node(stream: TcpStream) -> Result<SlotsMapping> {
}
}

async fn get_topology(first_contact_points: &[String]) -> Result<SlotsMapping> {
async fn get_topology(first_contact_points: &[String]) -> Result<SlotMap> {
let mut results = FuturesUnordered::new();

for contact in first_contact_points {
Expand Down Expand Up @@ -485,15 +493,9 @@ impl Transform for RedisCluster {
self.rebuild_slot_map().await?;
self.rebuild_slots = false;
}
let mut responses: FuturesOrdered<
Pin<
Box<
dyn Future<Output = Result<(Message, Result<Messages, anyhow::Error>)>>
+ std::marker::Send,
>,
>,
> = FuturesOrdered::new();
// let message = message_wrapper.message.messages.pop().unwrap();

let mut responses = ResponseFuturesOrdered::new();

for message in message_wrapper.message {
let sender = self.get_channels(&message.original).await;

Expand Down Expand Up @@ -578,7 +580,10 @@ impl Transform for RedisCluster {
RawFrame::Redis(Frame::Moved { slot, host, port }) => {
debug!("Got MOVE frame {} {} {}", slot, host, port);

self.slots.insert(slot, format!("{}:{}", &host, &port));
self.slots
.masters
.insert(slot, format!("{}:{}", &host, &port));

self.rebuild_slots = true;

let one_rx = self
Expand Down

0 comments on commit 926b6c3

Please sign in to comment.