Skip to content
This repository has been archived by the owner on Jan 14, 2022. It is now read-only.

Commit

Permalink
Improve handling of large Redis input (#143)
Browse files Browse the repository at this point in the history
* Implement faster buffered input

This commit implements a modified ring buffer for input from Redis.
Specifically, Flodgatt now limits the amount of data it fetches from
Redis in one syscall to 8 KiB (two pages on most systems). Flodgatt
will process all complete messages it receives from Redis and then
re-use the same buffer for the next time it retrieves data.  If
Flodgatt received a partial message, it will copy the partial message
to the beginning of the buffer before its next read.

This change has little effect on Flodgatt under light load (because it
was rare for Redis to have more than 8 KiB of messages available at
any one time).  However, my hope is that this will significantly
reduce memory use on the largest instances.

* Improve handling of backpresure

This commit alters how Flodgatt behaves if it receives enough messages
for a single client to fill that clients channel. (Because the clients
regularly send their messages, should only occur if a single client
receives a large number of messages nearly simultaneously; this is
rare, but could occur, especially on large instances).

Previously, Flodgatt would drop messages in the rare case when the
client's channel was full.  Now, Flodgatt will pause the current Redis
poll and yield control back to the client streams, allowing the
clients to empty their channels; Flodgatt will then resume polling
Redis/sending the messages it previously received.  With the approach,
Flodgatt will never drop messages.

However, the risk to this approach is that, by never dropping
messages, Flodgatt does not have any way to reduce the amount of work
it needs to do when under heavy load – it delays the work slightly,
but doesn't reduce it.  What this means is that it would be
*theoretically* possible for Flodgatt to fall increasingly behind, if
it is continuously receiving more messages than it can process.  Due
to how quickly Flodgatt can process messages, though, I suspect this
would only come up if an admin were running Flodgatt in a
*significantly* resource constrained environment, but I wanted to
mention it for the sake of completeness.

This commit also adds a new /status/backpressure endpoint that
displays the current length of the Redis input buffer (which should
typically be low or 0).  Like the other /status endpoints, this
endpoint is only enabled when Flodgatt is compiled with the
`stub_status` feature.
  • Loading branch information
codesections authored Apr 27, 2020
1 parent d8b07b4 commit 4a13412
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 257 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.4"
version = "0.9.6"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

Expand Down
74 changes: 35 additions & 39 deletions benches/parse_redis.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use flodgatt::{
event::*,
request::{Content::*, Reach::*, Stream::*, Timeline},
response::{RedisMsg, RedisParseOutput},
};
use flodgatt::response::{RedisMsg, RedisParseOutput};
use lru::LruCache;
use std::convert::TryFrom;

Expand All @@ -16,27 +12,27 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> {
}
}

fn parse_to_timeline(msg: RedisMsg) -> Timeline {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
tl
}
fn parse_to_checked_event(msg: RedisMsg) -> EventKind {
EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
}
// fn parse_to_timeline(msg: RedisMsg) -> Timeline {
// let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
// let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
// assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
// tl
// }
// fn parse_to_checked_event(msg: RedisMsg) -> EventKind {
// EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
// }

fn parse_to_dyn_event(msg: RedisMsg) -> EventKind {
EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
}
// fn parse_to_dyn_event(msg: RedisMsg) -> EventKind {
// EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
// }

fn redis_msg_to_event_string(msg: RedisMsg) -> String {
msg.event_txt.to_string()
}
// fn redis_msg_to_event_string(msg: RedisMsg) -> String {
// msg.event_txt.to_string()
// }

fn string_to_checked_event(event_txt: &String) -> EventKind {
EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap())
}
// fn string_to_checked_event(event_txt: &String) -> EventKind {
// EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap())
// }

fn criterion_benchmark(c: &mut Criterion) {
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS;
Expand All @@ -46,25 +42,25 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| black_box(parse_long_redis_input(input)))
});

let msg = parse_long_redis_input(input);
group.bench_function("parse RedisMsg to Timeline", |b| {
b.iter(|| black_box(parse_to_timeline(msg.clone())))
});
// let msg = parse_long_redis_input(input);
// group.bench_function("parse RedisMsg to Timeline", |b| {
// b.iter(|| black_box(parse_to_timeline(msg.clone())))
// });

group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
});
// group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
// b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
// });

group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
b.iter(|| black_box(parse_to_checked_event(msg.clone())))
});
// group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
// b.iter(|| black_box(parse_to_checked_event(msg.clone())))
// });

group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
b.iter(|| {
let txt = black_box(redis_msg_to_event_string(msg.clone()));
black_box(string_to_checked_event(&txt));
})
});
// group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
// b.iter(|| {
// let txt = black_box(redis_msg_to_event_string(msg.clone()));
// black_box(string_to_checked_event(&txt));
// })
// });
}

criterion_group!(benches, criterion_benchmark);
Expand Down
6 changes: 0 additions & 6 deletions src/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ pub enum Error {
Config(config::Error),
}

impl Error {
pub fn log(msg: impl fmt::Display) {
eprintln!("{}", msg);
}
}

impl std::error::Error for Error {}

impl fmt::Debug for Error {
Expand Down
17 changes: 11 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use flodgatt::request::{Handler, Subscription};
use flodgatt::response::{RedisManager, SseStream, WsStream};
use flodgatt::Error;

use futures::{future::lazy, stream::Stream as _};
use futures::future::lazy;
use futures::stream::Stream as _;
use std::fs;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt;
Expand Down Expand Up @@ -61,10 +62,12 @@ fn main() -> Result<(), Error> {
#[cfg(feature = "stub_status")]
#[rustfmt::skip]
let status = {
let (r1, r3) = (shared_manager.clone(), shared_manager.clone());
let (r1, r2, r3) = (shared_manager.clone(), shared_manager.clone(), shared_manager.clone());
request.health().map(|| "OK")
.or(request.status()
.map(move || r1.lock().unwrap_or_else(RedisManager::recover).count()))
.or(request.status_backpresure()
.map(move || r2.lock().unwrap_or_else(RedisManager::recover).backpresure()))
.or(request.status_per_timeline()
.map(move || r3.lock().unwrap_or_else(RedisManager::recover).list()))
};
Expand All @@ -80,10 +83,12 @@ fn main() -> Result<(), Error> {
let manager = shared_manager.clone();
let stream = Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {
let mut manager = manager.lock().unwrap_or_else(RedisManager::recover);
manager.poll_broadcast().map_err(Error::log)
});
.for_each(
move |_| match manager.lock().unwrap_or_else(RedisManager::recover).poll() {
Err(e) => Ok(log::error!("{}", e)),
Ok(_) => Ok(()),
},
);

warp::spawn(lazy(move || stream));
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
Expand Down
4 changes: 4 additions & 0 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl Handler {
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline").boxed()
}

pub fn status_backpresure(&self) -> BoxedFilter<()> {
warp::path!("api" / "v1" / "streaming" / "status" / "backpresure").boxed()
}

pub fn err(r: Rejection) -> std::result::Result<impl warp::Reply, warp::Rejection> {
use StatusCode as Code;
let (msg, code) = match &r.cause().map(|cause| cause.to_string()).as_deref() {
Expand Down
1 change: 0 additions & 1 deletion src/request/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl Timeline {
}

pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String> {
// TODO -- does this need to account for namespaces?
use {Content::*, Error::*, Reach::*, Stream::*};

Ok(match self {
Expand Down
2 changes: 1 addition & 1 deletion src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mod stream;
pub use redis::Error;

#[cfg(feature = "bench")]
pub use redis::msg::{RedisMsg, RedisParseOutput};
pub use redis::{RedisMsg, RedisParseOutput};
83 changes: 46 additions & 37 deletions src/response/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub(self) use connection::RedisConn;
pub use manager::Error;
pub use manager::Manager;

#[cfg(feature = "bench")]
pub use msg::{RedisMsg, RedisParseOutput};

use connection::RedisConnErr;
use msg::RedisParseErr;

Expand All @@ -16,44 +19,50 @@ enum RedisCmd {
}

impl RedisCmd {
fn into_sendable(self, tl: &str) -> (Vec<u8>, Vec<u8>) {
fn into_sendable(self, timelines: &[String]) -> (Vec<u8>, Vec<u8>) {
match self {
RedisCmd::Subscribe => (
[
b"*2\r\n$9\r\nsubscribe\r\n$",
tl.len().to_string().as_bytes(),
b"\r\n",
tl.as_bytes(),
b"\r\n",
]
.concat(),
[
b"*3\r\n$3\r\nSET\r\n$",
(tl.len() + "subscribed:".len()).to_string().as_bytes(),
b"\r\nsubscribed:",
tl.to_string().as_bytes(),
b"\r\n$1\r\n1\r\n",
]
.concat(),
),
RedisCmd::Unsubscribe => (
[
b"*2\r\n$11\r\nunsubscribe\r\n$",
tl.len().to_string().as_bytes(),
b"\r\n",
tl.as_bytes(),
b"\r\n",
]
.concat(),
[
b"*3\r\n$3\r\nSET\r\n$",
(tl.len() + "subscribed:".len()).to_string().as_bytes(),
b"\r\nsubscribed:",
tl.to_string().as_bytes(),
b"\r\n$1\r\n0\r\n",
]
.concat(),
),
RedisCmd::Subscribe => {
let primary = {
let mut cmd = format!("*{}\r\n$9\r\nsubscribe\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl));
}
cmd
};
let secondary = {
let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!(
"${}\r\nsubscribed:{}\r\n$1\r\n$1\r\n",
"subscribed:".len() + tl.len(),
tl
));
}
cmd
};
(primary.as_bytes().to_vec(), secondary.as_bytes().to_vec())
}
RedisCmd::Unsubscribe => {
let primary = {
let mut cmd = format!("*{}\r\n$11\r\nunsubscribe\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl));
}
cmd
};
let secondary = {
let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!(
"${}\r\nsubscribed:{}\r\n$1\r\n$0\r\n",
"subscribed:".len() + tl.len(),
tl
));
}
cmd
};
(primary.as_bytes().to_vec(), secondary.as_bytes().to_vec())
}
}
}
}
Loading

0 comments on commit 4a13412

Please sign in to comment.