Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial static rate limit #208

Merged
merged 5 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions compose.brick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ services:
scylla-server:
environment:
- SCYLLA_SIREN_HOST_URL=192.168.100.1:1883
- SCYLLA_RATE_LIMIT_MODE=none

2 changes: 2 additions & 0 deletions compose.router.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ services:
- siren
environment:
- SCYLLA_SIREN_HOST_URL=siren:1883
- SCYLLA_RATE_LIMIT_MODE=static
- SCYLLA_STATIC_RATE_LIMIT_VALUE=100

client:
extends:
Expand Down
2 changes: 2 additions & 0 deletions compose.tpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ services:
scylla-server:
environment:
- SCYLLA_SIREN_HOST_URL=host.docker.internal:1883
- SCYLLA_RATE_LIMIT_MODE=static
- SCYLLA_STATIC_RATE_LIMIT_VALUE=100
extra_hosts:
- "host.docker.internal:host-gateway" # for external siren
init: false # not supported on buildroot for some reason, further investigation needed
7 changes: 7 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ services:
- SOURCE_DATABASE_URL=postgresql://postgres:password@odyssey-timescale:5432/postgres
# - PROD_SIREN_HOST_URL=siren:1883
- SCYLLA_PROD=true
#- SCYLLA_SATURATE_BATCH=false
#-SCYLLA_DATA_UPLOAD_DISABLE=false
#-SCYLLA_SIREN_HOST_URL=localhost:1883
#-SCYLLA_BATCH_UPSERT_TIME=10
- SCYLLA_RATE_LIMIT_MODE=static
- SCYLLA_STATIC_RATE_LIMIT_VALUE=50
#-SCYLLA_SOCKET_DISCARD_PERCENT=0
RChandler234 marked this conversation as resolved.
Show resolved Hide resolved
- RUST_LOG=warn,scylla_server=debug
cpu_shares: 1024
stop_grace_period: 2m
Expand Down
10 changes: 10 additions & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@ pub mod serverdata;

/// The type descriptor of the database passed to the middlelayer through axum state
pub type Database = std::sync::Arc<prisma::PrismaClient>;

#[derive(clap::ValueEnum, Debug, PartialEq, Copy, Clone, Default)]
#[clap(rename_all = "kebab_case")]
pub enum RateLimitMode {
/// static rate limiting based on a set value
Static,
/// no rate limiting
#[default]
None,
}
36 changes: 31 additions & 5 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use scylla_server::{
},
prisma::PrismaClient,
processors::{
db_handler, mock_processor::MockProcessor, mqtt_processor::MqttProcessor, ClientData,
db_handler,
mock_processor::MockProcessor,
mqtt_processor::{MqttProcessor, MqttProcessorOptions},
ClientData,
},
services::run_service::{self, public_run},
Database,
Database, RateLimitMode,
};
use socketioxide::{extract::SocketRef, SocketIo};
use tokio::{signal, sync::mpsc};
Expand Down Expand Up @@ -70,6 +73,25 @@ struct ScyllaArgs {
)]
batch_upsert_time: u64,

/// The rate limit mode to use
#[arg(
short = 'm',
long,
env = "SCYLLA_RATE_LIMIT_MODE",
default_value_t = RateLimitMode::None,
value_enum,
)]
rate_limit_mode: RateLimitMode,

/// The static rate limit number to use in ms
#[arg(
short = 'v',
long,
env = "SCYLLA_STATIC_RATE_LIMIT_VALUE",
default_value = "100"
)]
static_rate_limit_value: u64,

/// The percent of messages discarded when sent from the socket
#[arg(
short = 'd',
Expand Down Expand Up @@ -185,11 +207,15 @@ async fn main() {
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
cli.siren_host_url,
curr_run.id,
io,
token.clone(),
((cli.socketio_discard_percent as f32 / 100.0) * 255.0) as u8,
MqttProcessorOptions {
mqtt_path: cli.siren_host_url,
initial_run: curr_run.id,
static_rate_limit_time: cli.static_rate_limit_value,
rate_limit_mode: cli.rate_limit_mode,
upload_ratio: cli.socketio_discard_percent,
},
);
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
Expand Down
131 changes: 95 additions & 36 deletions scylla-server/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use core::fmt;
use std::{sync::Arc, time::Duration};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};

use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json};
use protobuf::Message;
Expand All @@ -9,50 +12,85 @@ use rumqttc::v5::{
AsyncClient, Event, EventLoop, MqttOptions,
};
use socketioxide::SocketIo;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::{
sync::mpsc::{Receiver, Sender},
time::Instant,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn, Level};

use crate::{
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata,
services::run_service,
services::run_service, RateLimitMode,
};

use super::ClientData;
use std::borrow::Cow;

/// The chief processor of incoming mqtt data, this handles
/// - mqtt state
/// - reception via mqtt and subsequent parsing
/// - labeling of data with runs
/// - sending data over socket
/// - sending data over the channel to a db handler
///
/// It also is the main form of rate limiting
pub struct MqttProcessor {
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
curr_run: i32,
io: SocketIo,
cancel_token: CancellationToken,
/// Upload ratio, below is not uploaded above is uploaded
/// Upload ratio, below is not socket sent above is socket sent
upload_ratio: u8,
/// static rate limiter
rate_limiter: HashMap<String, Instant>,
/// time to rate limit in ms
rate_limit_time: u64,
/// rate limit mode
rate_limit_mode: RateLimitMode,
}

/// processor options, these are static immutable settings
pub struct MqttProcessorOptions {
/// URI of the mqtt server
pub mqtt_path: String,
/// the initial run id
pub initial_run: i32,
/// the static rate limit time interval in ms
pub static_rate_limit_time: u64,
/// the rate limit mode
pub rate_limit_mode: RateLimitMode,
/// the upload ratio for the socketio
pub upload_ratio: u8,
}

impl MqttProcessor {
/// Creates a new mqtt receiver and socketio and db sender
/// * `channel` - The mpsc channel to send the database data to
/// * `mqtt_path` - The mqtt URI, including port, (without the mqtt://) to subscribe to
/// * `db` - The database to store the data in
/// * `new_run_channel` - The channel for new run notifications
/// * `io` - The socketio layer to send the data to
/// * `cancel_token` - The token which indicates cancellation of the task
/// * `opts` - The mqtt processor options to use
/// Returns the instance and options to create a client, which is then used in the process_mqtt loop
pub fn new(
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
mqtt_path: String,
initial_run: i32,
io: SocketIo,
cancel_token: CancellationToken,
upload_ratio: u8,
opts: MqttProcessorOptions,
) -> (MqttProcessor, MqttOptions) {
// create the mqtt client and configure it
let mut mqtt_opts = MqttOptions::new(
"ScyllaServer",
mqtt_path.split_once(':').expect("Invalid Siren URL").0,
mqtt_path
format!(
"ScyllaServer-{:?}",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis()
),
opts.mqtt_path.split_once(':').expect("Invalid Siren URL").0,
opts.mqtt_path
.split_once(':')
.unwrap()
.1
Expand All @@ -66,16 +104,19 @@ impl MqttProcessor {
.set_session_expiry_interval(Some(u32::MAX))
.set_topic_alias_max(Some(600));

// TODO mess with incoming message cap if db, etc. cannot keep up
let rate_map: HashMap<String, Instant> = HashMap::new();

(
MqttProcessor {
channel,
new_run_channel,
curr_run: initial_run,
curr_run: opts.initial_run,
io,
cancel_token,
upload_ratio,
upload_ratio: opts.upload_ratio,
rate_limiter: rate_map,
rate_limit_time: opts.static_rate_limit_time,
rate_limit_mode: opts.rate_limit_mode,
},
mqtt_opts,
)
Expand Down Expand Up @@ -110,11 +151,8 @@ impl MqttProcessor {
trace!("Received mqtt message: {:?}", msg);
// parse the message into the data and the node name it falls under
let msg = match self.parse_msg(msg) {
Ok(msg) => msg,
Err(err) => {
warn!("Message parse error: {:?}", err);
continue;
}
Some(msg) => msg,
None => continue
};
latency_ringbuffer.push(chrono::offset::Utc::now().timestamp_millis() - msg.timestamp);
self.send_db_msg(msg.clone()).await;
Expand Down Expand Up @@ -170,26 +208,46 @@ impl MqttProcessor {
/// * `msg` - The mqtt message to parse
/// returns the ClientData, or the Err of something that can be debug printed
#[instrument(skip(self), level = Level::TRACE)]
fn parse_msg(&self, msg: Publish) -> Result<ClientData, impl fmt::Debug> {
let topic = std::str::from_utf8(&msg.topic)
.map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))?;
fn parse_msg(&mut self, msg: Publish) -> Option<ClientData> {
let Ok(topic) = std::str::from_utf8(&msg.topic) else {
warn!("Could not parse topic, topic: {:?}", msg.topic);
return None;
};

// ignore command messages, less confusing in logs than just failing to decode protobuf
if topic.starts_with(CALYPSO_BIDIR_CMD_PREFIX) {
return Err(format!("Skipping command message: {}", topic));
debug!("Skipping command message: {}", topic);
return None;
}

let split = topic
.split_once('/')
.ok_or(&format!("Could not parse nesting: {:?}", msg.topic))?;
// handle static rate limiting mode
if self.rate_limit_mode == RateLimitMode::Static {
// check if we have a previous time for a message based on its topic
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(old) = self.rate_limiter.get(topic) {
// if the message is less than the rate limit, skip it and do not update the map
if old.elapsed() < Duration::from_millis(self.rate_limit_time) {
trace!("Static rate limit skipping message with topic {}", topic);
return None;
} else {
// if the message is past the rate limit, continue with the parsing of it and mark the new time last received
self.rate_limiter.insert(topic.to_string(), Instant::now());
}
} else {
// here is the first insertion of the topic (the first time we receive the topic in scylla's lifetime)
self.rate_limiter.insert(topic.to_string(), Instant::now());
}
}

let Some(split) = topic.split_once('/') else {
warn!("Could not parse nesting: {:?}", msg.topic);
return None;
};

// look at data after topic as if we dont have a topic the protobuf is useless anyways
let data = serverdata::ServerData::parse_from_bytes(&msg.payload).map_err(|f| {
format!(
"Could not parse message payload:{:?} error: {}",
msg.topic, f
)
})?;
let Ok(data) = serverdata::ServerData::parse_from_bytes(&msg.payload) else {
warn!("Could not parse message payload:{:?}", msg.topic);
return None;
};

// get the node and datatype from the topic extracted at the beginning
let node = split.0;
Expand Down Expand Up @@ -225,14 +283,15 @@ impl MqttProcessor {
debug!("Timestamp before year 2000: {}", unix_time);
let sys_time = chrono::offset::Utc::now().timestamp_millis();
if sys_time < 963014966000 {
return Err("System has no good time, discarding message!".to_string());
warn!("System has no good time, discarding message!");
return None;
}
sys_time
} else {
unix_time
};

Ok(ClientData {
Some(ClientData {
run_id: self.curr_run,
name: data_type,
unit: data.unit,
Expand Down
Loading