Skip to content
This repository has been archived by the owner on Jan 14, 2022. It is now read-only.

Improve handling of large Redis input #143

Merged
merged 2 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.4"
version = "0.9.6"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

Expand Down
74 changes: 35 additions & 39 deletions benches/parse_redis.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use flodgatt::{
event::*,
request::{Content::*, Reach::*, Stream::*, Timeline},
response::{RedisMsg, RedisParseOutput},
};
use flodgatt::response::{RedisMsg, RedisParseOutput};
use lru::LruCache;
use std::convert::TryFrom;

Expand All @@ -16,27 +12,27 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> {
}
}

fn parse_to_timeline(msg: RedisMsg) -> Timeline {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
tl
}
fn parse_to_checked_event(msg: RedisMsg) -> EventKind {
EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
}
// fn parse_to_timeline(msg: RedisMsg) -> Timeline {
// let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
// let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
// assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
// tl
// }
// fn parse_to_checked_event(msg: RedisMsg) -> EventKind {
// EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
// }

fn parse_to_dyn_event(msg: RedisMsg) -> EventKind {
EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
}
// fn parse_to_dyn_event(msg: RedisMsg) -> EventKind {
// EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
// }

fn redis_msg_to_event_string(msg: RedisMsg) -> String {
msg.event_txt.to_string()
}
// fn redis_msg_to_event_string(msg: RedisMsg) -> String {
// msg.event_txt.to_string()
// }

fn string_to_checked_event(event_txt: &String) -> EventKind {
EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap())
}
// fn string_to_checked_event(event_txt: &String) -> EventKind {
// EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap())
// }

fn criterion_benchmark(c: &mut Criterion) {
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS;
Expand All @@ -46,25 +42,25 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| black_box(parse_long_redis_input(input)))
});

let msg = parse_long_redis_input(input);
group.bench_function("parse RedisMsg to Timeline", |b| {
b.iter(|| black_box(parse_to_timeline(msg.clone())))
});
// let msg = parse_long_redis_input(input);
// group.bench_function("parse RedisMsg to Timeline", |b| {
// b.iter(|| black_box(parse_to_timeline(msg.clone())))
// });

group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
});
// group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
// b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
// });

group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
b.iter(|| black_box(parse_to_checked_event(msg.clone())))
});
// group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
// b.iter(|| black_box(parse_to_checked_event(msg.clone())))
// });

group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
b.iter(|| {
let txt = black_box(redis_msg_to_event_string(msg.clone()));
black_box(string_to_checked_event(&txt));
})
});
// group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
// b.iter(|| {
// let txt = black_box(redis_msg_to_event_string(msg.clone()));
// black_box(string_to_checked_event(&txt));
// })
// });
}

criterion_group!(benches, criterion_benchmark);
Expand Down
6 changes: 0 additions & 6 deletions src/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ pub enum Error {
Config(config::Error),
}

impl Error {
pub fn log(msg: impl fmt::Display) {
eprintln!("{}", msg);
}
}

impl std::error::Error for Error {}

impl fmt::Debug for Error {
Expand Down
17 changes: 11 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use flodgatt::request::{Handler, Subscription};
use flodgatt::response::{RedisManager, SseStream, WsStream};
use flodgatt::Error;

use futures::{future::lazy, stream::Stream as _};
use futures::future::lazy;
use futures::stream::Stream as _;
use std::fs;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt;
Expand Down Expand Up @@ -61,10 +62,12 @@ fn main() -> Result<(), Error> {
#[cfg(feature = "stub_status")]
#[rustfmt::skip]
let status = {
let (r1, r3) = (shared_manager.clone(), shared_manager.clone());
let (r1, r2, r3) = (shared_manager.clone(), shared_manager.clone(), shared_manager.clone());
request.health().map(|| "OK")
.or(request.status()
.map(move || r1.lock().unwrap_or_else(RedisManager::recover).count()))
.or(request.status_backpresure()
.map(move || r2.lock().unwrap_or_else(RedisManager::recover).backpresure()))
.or(request.status_per_timeline()
.map(move || r3.lock().unwrap_or_else(RedisManager::recover).list()))
};
Expand All @@ -80,10 +83,12 @@ fn main() -> Result<(), Error> {
let manager = shared_manager.clone();
let stream = Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {
let mut manager = manager.lock().unwrap_or_else(RedisManager::recover);
manager.poll_broadcast().map_err(Error::log)
});
.for_each(
move |_| match manager.lock().unwrap_or_else(RedisManager::recover).poll() {
Err(e) => Ok(log::error!("{}", e)),
Ok(_) => Ok(()),
},
);

warp::spawn(lazy(move || stream));
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
Expand Down
4 changes: 4 additions & 0 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl Handler {
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline").boxed()
}

pub fn status_backpresure(&self) -> BoxedFilter<()> {
warp::path!("api" / "v1" / "streaming" / "status" / "backpresure").boxed()
}

pub fn err(r: Rejection) -> std::result::Result<impl warp::Reply, warp::Rejection> {
use StatusCode as Code;
let (msg, code) = match &r.cause().map(|cause| cause.to_string()).as_deref() {
Expand Down
1 change: 0 additions & 1 deletion src/request/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl Timeline {
}

pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String> {
// TODO -- does this need to account for namespaces?
use {Content::*, Error::*, Reach::*, Stream::*};

Ok(match self {
Expand Down
2 changes: 1 addition & 1 deletion src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mod stream;
pub use redis::Error;

#[cfg(feature = "bench")]
pub use redis::msg::{RedisMsg, RedisParseOutput};
pub use redis::{RedisMsg, RedisParseOutput};
83 changes: 46 additions & 37 deletions src/response/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub(self) use connection::RedisConn;
pub use manager::Error;
pub use manager::Manager;

#[cfg(feature = "bench")]
pub use msg::{RedisMsg, RedisParseOutput};

use connection::RedisConnErr;
use msg::RedisParseErr;

Expand All @@ -16,44 +19,50 @@ enum RedisCmd {
}

impl RedisCmd {
fn into_sendable(self, tl: &str) -> (Vec<u8>, Vec<u8>) {
fn into_sendable(self, timelines: &[String]) -> (Vec<u8>, Vec<u8>) {
match self {
RedisCmd::Subscribe => (
[
b"*2\r\n$9\r\nsubscribe\r\n$",
tl.len().to_string().as_bytes(),
b"\r\n",
tl.as_bytes(),
b"\r\n",
]
.concat(),
[
b"*3\r\n$3\r\nSET\r\n$",
(tl.len() + "subscribed:".len()).to_string().as_bytes(),
b"\r\nsubscribed:",
tl.to_string().as_bytes(),
b"\r\n$1\r\n1\r\n",
]
.concat(),
),
RedisCmd::Unsubscribe => (
[
b"*2\r\n$11\r\nunsubscribe\r\n$",
tl.len().to_string().as_bytes(),
b"\r\n",
tl.as_bytes(),
b"\r\n",
]
.concat(),
[
b"*3\r\n$3\r\nSET\r\n$",
(tl.len() + "subscribed:".len()).to_string().as_bytes(),
b"\r\nsubscribed:",
tl.to_string().as_bytes(),
b"\r\n$1\r\n0\r\n",
]
.concat(),
),
RedisCmd::Subscribe => {
let primary = {
let mut cmd = format!("*{}\r\n$9\r\nsubscribe\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl));
}
cmd
};
let secondary = {
let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!(
"${}\r\nsubscribed:{}\r\n$1\r\n$1\r\n",
"subscribed:".len() + tl.len(),
tl
));
}
cmd
};
(primary.as_bytes().to_vec(), secondary.as_bytes().to_vec())
}
RedisCmd::Unsubscribe => {
let primary = {
let mut cmd = format!("*{}\r\n$11\r\nunsubscribe\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl));
}
cmd
};
let secondary = {
let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len());
for tl in timelines {
cmd.push_str(&format!(
"${}\r\nsubscribed:{}\r\n$1\r\n$0\r\n",
"subscribed:".len() + tl.len(),
tl
));
}
cmd
};
(primary.as_bytes().to_vec(), secondary.as_bytes().to_vec())
}
}
}
}
Loading