Skip to content
This repository has been archived by the owner on Jan 14, 2022. It is now read-only.

Commit

Permalink
Add tests for polling for multiple messages (#149)
Browse files Browse the repository at this point in the history
* Add tests for polling for multiple messages

This commit adds a mock Redis interface and adds tests that poll the
mock interface for multiple messages at a time.  These tests test that
Flodgatt is robust against receiving incomplete messages, including if
the message break results in receiving invalid UTF8.

* Remove temporary files
  • Loading branch information
codesections authored May 7, 2020
1 parent 4de9a94 commit daf7d1a
Show file tree
Hide file tree
Showing 28 changed files with 787 additions and 372 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.8"
version = "0.9.9"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

Expand Down
117 changes: 79 additions & 38 deletions benches/parse_redis.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use flodgatt::response::{RedisMsg, RedisParseOutput};
use flodgatt::config;
use flodgatt::request::{Content::*, Reach::*, Stream::*, Timeline};
use flodgatt::response::{Event, Manager, RedisMsg, RedisParseOutput};
use flodgatt::Id;
use futures::{Async, Stream};
use lru::LruCache;
use std::convert::TryFrom;
use std::fs;

fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> {
if let RedisParseOutput::Msg(msg) = RedisParseOutput::try_from(input).unwrap() {
Expand All @@ -12,27 +17,34 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> {
}
}

// fn parse_to_timeline(msg: RedisMsg) -> Timeline {
// let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
// let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
// assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
// tl
// }
// fn parse_to_checked_event(msg: RedisMsg) -> EventKind {
// EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
// }
fn parse_to_timeline(msg: RedisMsg) -> Timeline {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
tl
}
fn parse_to_checked_event(msg: RedisMsg) -> Event {
Event::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
}

// fn parse_to_dyn_event(msg: RedisMsg) -> EventKind {
// EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
// }
fn parse_to_dyn_event(msg: RedisMsg) -> Event {
Event::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
}

// fn redis_msg_to_event_string(msg: RedisMsg) -> String {
// msg.event_txt.to_string()
// }
fn redis_msg_to_event_string(msg: RedisMsg) -> String {
msg.event_txt.to_string()
}

// fn string_to_checked_event(event_txt: &String) -> EventKind {
// EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap())
// }
fn string_to_checked_event(event_txt: &String) -> Event {
Event::TypeSafe(serde_json::from_str(event_txt).unwrap())
}

fn input_msg(i: usize) -> Vec<u8> {
fs::read_to_string(format!("test_data/redis_input_{:03}.resp", i))
.expect("test input not found")
.as_bytes()
.to_vec()
}

fn criterion_benchmark(c: &mut Criterion) {
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS;
Expand All @@ -42,25 +54,54 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| black_box(parse_long_redis_input(input)))
});

// let msg = parse_long_redis_input(input);
// group.bench_function("parse RedisMsg to Timeline", |b| {
// b.iter(|| black_box(parse_to_timeline(msg.clone())))
// });

// group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
// b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
// });

// group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
// b.iter(|| black_box(parse_to_checked_event(msg.clone())))
// });

// group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
// b.iter(|| {
// let txt = black_box(redis_msg_to_event_string(msg.clone()));
// black_box(string_to_checked_event(&txt));
// })
// });
let msg = parse_long_redis_input(input);
group.bench_function("parse RedisMsg to Timeline", |b| {
b.iter(|| black_box(parse_to_timeline(msg.clone())))
});

group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
b.iter(|| black_box(parse_to_checked_event(msg.clone())))
});

group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
});

group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
b.iter(|| {
let txt = black_box(redis_msg_to_event_string(msg.clone()));
black_box(string_to_checked_event(&txt));
})
});

group.bench_function("parse six messages from Redis", |b| {
b.iter_batched(
|| {
let mut manager = Manager::try_from(&config::Redis::default()).expect("bench");
for i in 1..=6 {
manager.redis_conn.add(&input_msg(i));
}
manager
},
|mut m| {
black_box({
let mut i = 1;
while let Ok(Async::Ready(Some(len))) = m.redis_conn.poll_redis(m.unread_idx.1)
{
m.unread_idx = (0, m.unread_idx.1 + len);
while let Ok(Async::Ready(Some((_tl, event)))) = m.poll() {
// println!("Parsing Event #{:03}", i + 1);
// assert_eq!(event, output(i));
i += 1;
}
}

assert_eq!(i, 7)
})
},
criterion::BatchSize::SmallInput,
)
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
12 changes: 8 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,16 @@ fn main() -> Result<(), Error> {
let manager = shared_manager.clone();
let stream = Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(
move |_| match manager.lock().unwrap_or_else(RedisManager::recover).poll() {
.for_each(move |_| {
match manager
.lock()
.unwrap_or_else(RedisManager::recover)
.send_msgs()
{
Err(e) => Ok(log::error!("{}", e)),
Ok(_) => Ok(()),
},
);
}
});

warp::spawn(lazy(move || stream));
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
Expand Down
5 changes: 5 additions & 0 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ mod subscription;
pub use err::{Error, Timeline as TimelineErr};
pub use subscription::{Blocks, Subscription};
pub use timeline::Timeline;

#[cfg(feature = "bench")]
pub use timeline::{Content, Reach, Stream};

#[cfg(not(feature = "bench"))]
use timeline::{Content, Reach, Stream};

pub use self::postgres::PgPool;
Expand Down
10 changes: 4 additions & 6 deletions src/request/timeline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub(crate) use self::inner::{Content, Reach, Scope, Stream, UserData};
pub use self::inner::{Content, Reach, Scope, Stream};
use super::err::Timeline as Error;
use super::query::Query;
pub(crate) use inner::UserData;

use lru::LruCache;
use warp::reject::Rejection;
Expand All @@ -11,7 +12,7 @@ mod inner;
type Result<T> = std::result::Result<T, Error>;

#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
pub struct Timeline(pub(crate) Stream, pub(crate) Reach, pub(crate) Content);
pub struct Timeline(pub Stream, pub Reach, pub Content);

impl Timeline {
pub fn empty() -> Self {
Expand Down Expand Up @@ -61,10 +62,7 @@ impl Timeline {
})
}

pub(crate) fn from_redis_text(
timeline: &str,
cache: &mut LruCache<String, i64>,
) -> Result<Self> {
pub fn from_redis_text(timeline: &str, cache: &mut LruCache<String, i64>) -> Result<Self> {
use {Content::*, Error::*, Reach::*, Stream::*};
let mut tag_id = |t: &str| cache.get(&t.to_string()).map_or(Err(BadTag), |id| Ok(*id));

Expand Down
8 changes: 4 additions & 4 deletions src/request/timeline/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hashbrown::HashSet;
use std::convert::TryFrom;

#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
pub(crate) enum Stream {
pub enum Stream {
User(Id),
List(i64),
Direct(i64),
Expand All @@ -15,20 +15,20 @@ pub(crate) enum Stream {
}

#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
pub(crate) enum Reach {
pub enum Reach {
Local,
Federated,
}

#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
pub(crate) enum Content {
pub enum Content {
All,
Media,
Notification,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) enum Scope {
pub enum Scope {
Read,
Statuses,
Notifications,
Expand Down
4 changes: 3 additions & 1 deletion src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ mod stream;
pub use redis::Error;

#[cfg(feature = "bench")]
pub use redis::{RedisMsg, RedisParseOutput};
pub use event::EventKind;
#[cfg(feature = "bench")]
pub use redis::{Manager, RedisMsg, RedisParseOutput};
7 changes: 5 additions & 2 deletions src/response/event.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#[cfg(not(test))]
mod checked_event;
#[cfg(test)]
pub mod checked_event;
mod dynamic_event;
pub mod err;

use self::checked_event::CheckedEvent;
use self::dynamic_event::{DynEvent, EventKind};
pub use self::checked_event::CheckedEvent;
pub use self::dynamic_event::{DynEvent, EventKind};
use crate::Id;

use hashbrown::HashSet;
Expand Down
9 changes: 4 additions & 5 deletions src/response/event/checked_event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
mod account;

pub(crate) mod account;
mod announcement;
mod announcement_reaction;
mod conversation;
mod emoji;
mod id;
mod mention;
mod notification;
mod status;
mod tag;
mod visibility;
pub(crate) mod status;
pub(crate) mod tag;
pub(crate) mod visibility;

pub(self) use super::Payload;
pub(super) use announcement_reaction::AnnouncementReaction;
Expand Down
66 changes: 33 additions & 33 deletions src/response/event/checked_event/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,47 @@ use serde::{Deserialize, Serialize};

#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub(super) struct Account {
pub(crate) struct Account {
pub id: Id,
pub(super) username: String,
pub(crate) username: String,
pub acct: String,
pub(super) url: String,
pub(super) display_name: String,
pub(super) note: String,
pub(super) avatar: String,
pub(super) avatar_static: String,
pub(super) header: String,
pub(super) header_static: String,
pub(super) locked: bool,
pub(super) emojis: Vec<Emoji>,
pub(super) discoverable: Option<bool>, // Shouldn't be option?
pub(super) created_at: String,
pub(super) statuses_count: i64,
pub(super) followers_count: i64,
pub(super) following_count: i64,
pub(super) moved: Option<String>,
pub(super) fields: Option<Vec<Field>>,
pub(super) bot: Option<bool>,
pub(super) source: Option<Source>,
pub(super) group: Option<bool>, // undocumented
pub(super) last_status_at: Option<String>, // undocumented
pub(crate) url: String,
pub(crate) display_name: String,
pub(crate) note: String,
pub(crate) avatar: String,
pub(crate) avatar_static: String,
pub(crate) header: String,
pub(crate) header_static: String,
pub(crate) locked: bool,
pub(crate) emojis: Vec<Emoji>,
pub(crate) discoverable: Option<bool>, // Shouldn't be option?
pub(crate) created_at: String,
pub(crate) statuses_count: i64,
pub(crate) followers_count: i64,
pub(crate) following_count: i64,
pub(crate) moved: Option<String>,
pub(crate) fields: Option<Vec<Field>>,
pub(crate) bot: Option<bool>,
pub(crate) source: Option<Source>,
pub(crate) group: Option<bool>, // undocumented
pub(crate) last_status_at: Option<String>, // undocumented
}

#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub(super) struct Field {
pub(super) name: String,
pub(super) value: String,
pub(super) verified_at: Option<String>,
pub(crate) struct Field {
pub(crate) name: String,
pub(crate) value: String,
pub(crate) verified_at: Option<String>,
}

#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub(super) struct Source {
pub(super) note: String,
pub(super) fields: Vec<Field>,
pub(super) privacy: Option<Visibility>,
pub(super) sensitive: bool,
pub(super) language: String,
pub(super) follow_requests_count: i64,
pub(crate) struct Source {
pub(crate) note: String,
pub(crate) fields: Vec<Field>,
pub(crate) privacy: Option<Visibility>,
pub(crate) sensitive: bool,
pub(crate) language: String,
pub(crate) follow_requests_count: i64,
}
2 changes: 1 addition & 1 deletion src/response/event/checked_event/emoji.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};

#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub(super) struct Emoji {
pub(crate) struct Emoji {
shortcode: String,
url: String,
static_url: String,
Expand Down
2 changes: 1 addition & 1 deletion src/response/event/checked_event/mention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};

#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub(super) struct Mention {
pub(crate) struct Mention {
pub id: Id,
username: String,
acct: String,
Expand Down
Loading

0 comments on commit daf7d1a

Please sign in to comment.