From db635a90986063405a0178093797cdf106f91540 Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Thu, 23 May 2024 14:23:53 -0400 Subject: [PATCH] [distributed storage] support re-syncing counters after a peer is partitioned. Signed-off-by: Hiram Chirino --- limitador/proto/distributed.proto | 20 ++-- limitador/src/storage/distributed/grpc/mod.rs | 103 ++++++++++++++++-- limitador/src/storage/distributed/mod.rs | 55 ++++++++++ 3 files changed, 162 insertions(+), 16 deletions(-) diff --git a/limitador/proto/distributed.proto b/limitador/proto/distributed.proto index ff470931..889701aa 100644 --- a/limitador/proto/distributed.proto +++ b/limitador/proto/distributed.proto @@ -5,18 +5,22 @@ package limitador.service.distributed.v1; // A packet defines all the types of messages that can be sent between replication peers. message Packet { oneof message { - // the Hello message is used to introduce a peer to another peer. It is the first message sent by a peer. + // the hello message is used to introduce a peer to another peer. It is the first message sent by a peer. Hello hello = 1; - // the MembershipUpdate message is used to gossip about the other peers in the cluster: + // the membership_update message is used to gossip about the other peers in the cluster: // 1) sent after the first Hello message // 2) sent when the membership state changes MembershipUpdate membership_update = 2; - // the Ping message is used to request a pong from the other peer. - Ping ping = 3; - // the Pong message is used to respond to a ping. + // the ping message is used to request a pong from the other peer. + Empty ping = 3; + // the pong message is used to respond to a ping. Pong pong = 4; - // the CounterUpdate message is used to send counter updates. + // the counter_update message is used to send counter updates. CounterUpdate counter_update = 5; + // the re_sync_request message is used to request the peer send counter_update for all known counters. + Empty re_sync_request = 6; + // the re_sync_end message is used to signal the end of a re_sync_request. + Empty re_sync_end = 7; } } @@ -30,8 +34,8 @@ message Hello { optional string receiver_url = 3; } -// A request to a peer to respond with a Pong message. -message Ping {} +// A packet message that does not have any additional data. +message Empty {} // Pong is the response to a Ping and Hello message. message Pong { diff --git a/limitador/src/storage/distributed/grpc/mod.rs b/limitador/src/storage/distributed/grpc/mod.rs index 79d3d985..c73ed641 100644 --- a/limitador/src/storage/distributed/grpc/mod.rs +++ b/limitador/src/storage/distributed/grpc/mod.rs @@ -5,10 +5,10 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{error::Error, io::ErrorKind, pin::Pin}; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::Sender; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio::time::sleep; - use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Code, Request, Response, Status, Streaming}; use tracing::debug; @@ -17,7 +17,7 @@ use crate::storage::distributed::grpc::v1::packet::Message; use crate::storage::distributed::grpc::v1::replication_client::ReplicationClient; use crate::storage::distributed::grpc::v1::replication_server::{Replication, ReplicationServer}; use crate::storage::distributed::grpc::v1::{ - CounterUpdate, Hello, MembershipUpdate, Packet, Peer, Pong, + CounterUpdate, Empty, Hello, MembershipUpdate, Packet, Peer, Pong, }; // clippy will barf on protobuff generated code for enum variants in @@ -84,16 +84,30 @@ struct Session { impl Session { async fn close(&mut self) { - let mut state = self.replication_state.write().await; - if let Some(peer) = state.peer_trackers.get_mut(&self.peer_id) { + let mut replication_state = self.replication_state.write().await; + if let Some(peer) = replication_state.peer_trackers.get_mut(&self.peer_id) { peer.session = None; } + + // resync once we are no longer connected to any peers. + if replication_state.active_session_count() == 0 { + replication_state.re_sync_needed = true + } } async fn send(&self, message: Message) -> Result<(), Status> { self.out_stream.clone().send(Ok(message)).await } + async fn re_sync_check(&self) -> Result<(), Status> { + let mut replication_state = self.replication_state.write().await; + if replication_state.re_sync_needed && replication_state.re_sync_peer.is_none() { + replication_state.re_sync_peer = Some(self.peer_id.clone()); + self.send(Message::ReSyncRequest(Empty::default())).await?; + } + Ok(()) + } + async fn process(&mut self, in_stream: &mut Streaming) -> Result<(), Status> { // Send a MembershipUpdate to inform the peer about all the members // We should resend it again if we learn of new members. @@ -105,11 +119,13 @@ impl Session { })) .await?; - let mut udpates_to_send = self.broker_state.publisher.subscribe(); + // We may need to initiate a re-sync if we have been partitioned from the cluster. + self.re_sync_check().await?; + let mut updates = self.broker_state.publisher.subscribe(); loop { tokio::select! { - update = udpates_to_send.recv() => { + update = updates.recv() => { let update = update.map_err(|_| Status::unknown("broadcast error"))?; self.send(Message::CounterUpdate(update)).await?; } @@ -150,6 +166,59 @@ impl Session { }))) .await?; } + Some(Message::ReSyncRequest(_)) => { + debug!("peer: '{}': ReSyncRequest", self.peer_id); + let (tx, mut rx) = mpsc::channel::>(1); + let peer_id = self.peer_id.clone(); + let out_stream = self.out_stream.clone(); + tokio::spawn(async move { + let mut counter = 0u64; + while let Some(rsync_message) = rx.recv().await { + match rsync_message { + Some(update) => { + counter += 1; + if let Err(err) = out_stream + .clone() + .send(Ok(Message::CounterUpdate(update))) + .await + { + debug!( + "peer: '{}': ReSyncRequest: send error: {:?}", + peer_id, err + ); + return; + } + } + None => { + debug!( + "peer: '{}': rysnc completed, sent %d updates: {:?}", + peer_id, counter + ); + _ = out_stream + .clone() + .send(Ok(Message::ReSyncEnd(Empty::default()))) + .await; + } + } + } + }); + self.broker_state + .on_re_sync + .try_send(tx) + .map_err(|err| match err { + TrySendError::Full(_) => Status::resource_exhausted("re-sync channel full"), + TrySendError::Closed(_) => Status::unavailable("re-sync channel closed"), + })?; + } + Some(Message::ReSyncEnd(_)) => { + debug!("peer: '{}': ReSyncEnd", self.peer_id); + // peer has finished re-syncing us + { + let mut replication_state = self.replication_state.write().await; + replication_state.re_sync_needed = false; + replication_state.re_sync_peer = None; + } + } Some(Message::MembershipUpdate(update)) => { debug!("peer: '{}': MembershipUpdate", self.peer_id); // add any new peers to peer_trackers @@ -214,6 +283,12 @@ struct ReplicationState { // URLs our peers have used to connect to us. discovered_urls: HashSet, peer_trackers: HashMap, + + // if this peer has been partitioned from the cluster this should be set to true + // to signal that a re-sync should be started with one of the peers. + re_sync_needed: bool, + // This is set to the peer that a re-sync is in progress with. + re_sync_peer: Option, } impl ReplicationState { @@ -233,6 +308,13 @@ impl ReplicationState { peers.sort_by(|a, b| a.peer_id.cmp(&b.peer_id)); peers } + + fn active_session_count(&self) -> usize { + self.peer_trackers + .iter() + .filter(|(_, peer_tracker)| peer_tracker.session.is_some()) + .count() + } } fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { @@ -289,13 +371,13 @@ fn is_disconnect(err: &Status) -> bool { // MessageSender is used to abstract the difference between the server and client sender streams... #[derive(Clone)] -enum MessageSender { +pub enum MessageSender { Server(Sender>), Client(Sender), } impl MessageSender { - async fn send(self, message: Result) -> Result<(), Status> { + pub async fn send(self, message: Result) -> Result<(), Status> { match self { MessageSender::Server(sender) => { let value = message.map(|x| Packet { message: Some(x) }); @@ -324,6 +406,7 @@ struct BrokerState { id: String, publisher: broadcast::Sender, on_counter_update: Arc, + on_re_sync: Arc>>>, } #[derive(Clone)] @@ -340,6 +423,7 @@ impl Broker { listen_address: SocketAddr, peer_urls: Vec, on_counter_update: CounterUpdateFn, + on_re_sync: Sender>>, ) -> Broker { let (tx, _) = broadcast::channel(16); let publisher: broadcast::Sender = tx; @@ -351,10 +435,13 @@ impl Broker { id, publisher, on_counter_update: Arc::new(on_counter_update), + on_re_sync: Arc::new(on_re_sync), }, replication_state: Arc::new(RwLock::new(ReplicationState { discovered_urls: HashSet::new(), peer_trackers: HashMap::new(), + re_sync_needed: true, + re_sync_peer: None, })), } } diff --git a/limitador/src/storage/distributed/mod.rs b/limitador/src/storage/distributed/mod.rs index 452b3aa2..fe35b35c 100644 --- a/limitador/src/storage/distributed/mod.rs +++ b/limitador/src/storage/distributed/mod.rs @@ -5,6 +5,9 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tracing::debug; use crate::counter::Counter; use crate::limit::{Limit, Namespace}; @@ -217,6 +220,8 @@ impl CrInMemoryStorage { let limits = Arc::new(RwLock::new(LimitsMap::new())); let limits_clone = limits.clone(); + + let (re_sync_queue_tx, mut re_sync_queue_rx) = mpsc::channel(100); let broker = grpc::Broker::new( identifier.clone(), listen_address, @@ -232,6 +237,7 @@ impl CrInMemoryStorage { let value = limits.get(&update.key).unwrap(); value.merge((UNIX_EPOCH + Duration::from_secs(update.expires_at), values).into()); }), + re_sync_queue_tx, ); { @@ -241,6 +247,17 @@ impl CrInMemoryStorage { }); } + // process the re-sync requests... + { + let limits = limits.clone(); + tokio::spawn(async move { + let limits = limits.clone(); + while let Some(sender) = re_sync_queue_rx.recv().await { + process_re_sync(&limits, sender).await; + } + }); + } + Self { identifier, limits, @@ -279,6 +296,44 @@ impl CrInMemoryStorage { } } +async fn process_re_sync( + limits: &Arc, CrCounterValue>>>, + sender: Sender>, +) { + // sending all the counters to the peer might take a while, so we don't want to lock + // the limits map for too long, lets figure first get the list of keys that needs to be sent. + let keys: Vec<_> = { + let limits = limits.read().unwrap(); + limits.keys().cloned().collect() + }; + + for key in keys { + let update = { + let limits = limits.read().unwrap(); + limits.get(&key).map(|store_value| { + let (expiry, values) = store_value.clone().into_inner(); + CounterUpdate { + key: key.clone(), + values: values.into_iter().collect(), + expires_at: expiry.duration_since(UNIX_EPOCH).unwrap().as_secs(), + } + }) + }; + // skip None, it means the counter was deleted. + if let Some(update) = update { + match sender.send(Some(update)).await { + Ok(_) => {} + Err(err) => { + debug!("Failed to send re-sync counter update to peer: {:?}", err); + break; + } + } + } + } + // signal the end of the re-sync + _ = sender.send(None).await; +} + #[derive(Clone, Debug, Serialize, Deserialize)] struct CounterKey { namespace: Namespace,