diff --git a/Cargo.lock b/Cargo.lock index 0cf4dd3..86deab7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.4" +version = "0.9.6" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index d7e1d43..abda086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.4" +version = "0.9.6" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index f5f1980..48c6aeb 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -1,9 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use flodgatt::{ - event::*, - request::{Content::*, Reach::*, Stream::*, Timeline}, - response::{RedisMsg, RedisParseOutput}, -}; +use flodgatt::response::{RedisMsg, RedisParseOutput}; use lru::LruCache; use std::convert::TryFrom; @@ -16,27 +12,27 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> { } } -fn parse_to_timeline(msg: RedisMsg) -> Timeline { - let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); - assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); - tl -} -fn parse_to_checked_event(msg: RedisMsg) -> EventKind { - EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) -} +// fn parse_to_timeline(msg: RedisMsg) -> Timeline { +// let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; +// let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); +// assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); +// tl +// } +// fn parse_to_checked_event(msg: RedisMsg) -> EventKind { +// EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) +// } -fn parse_to_dyn_event(msg: RedisMsg) -> EventKind { - EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) -} +// fn parse_to_dyn_event(msg: RedisMsg) -> EventKind { +// EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) +// } -fn redis_msg_to_event_string(msg: RedisMsg) -> String { - msg.event_txt.to_string() -} +// fn redis_msg_to_event_string(msg: RedisMsg) -> String { +// msg.event_txt.to_string() +// } -fn string_to_checked_event(event_txt: &String) -> EventKind { - EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap()) -} +// fn string_to_checked_event(event_txt: &String) -> EventKind { +// EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap()) +// } fn criterion_benchmark(c: &mut Criterion) { let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; @@ -46,25 +42,25 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| black_box(parse_long_redis_input(input))) }); - let msg = parse_long_redis_input(input); - group.bench_function("parse RedisMsg to Timeline", |b| { - b.iter(|| black_box(parse_to_timeline(msg.clone()))) - }); + // let msg = parse_long_redis_input(input); + // group.bench_function("parse RedisMsg to Timeline", |b| { + // b.iter(|| black_box(parse_to_timeline(msg.clone()))) + // }); - group.bench_function("parse RedisMsg -> DynamicEvent", |b| { - b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) - }); + // group.bench_function("parse RedisMsg -> DynamicEvent", |b| { + // b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) + // }); - group.bench_function("parse RedisMsg -> CheckedEvent", |b| { - b.iter(|| black_box(parse_to_checked_event(msg.clone()))) - }); + // group.bench_function("parse RedisMsg -> CheckedEvent", |b| { + // b.iter(|| black_box(parse_to_checked_event(msg.clone()))) + // }); - group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { - b.iter(|| { - let txt = black_box(redis_msg_to_event_string(msg.clone())); - black_box(string_to_checked_event(&txt)); - }) - }); + // group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { + // b.iter(|| { + // let txt = black_box(redis_msg_to_event_string(msg.clone())); + // black_box(string_to_checked_event(&txt)); + // }) + // }); } criterion_group!(benches, criterion_benchmark); diff --git a/src/err.rs b/src/err.rs index bb4cf61..e9120d0 100644 --- a/src/err.rs +++ b/src/err.rs @@ -13,12 +13,6 @@ pub enum Error { Config(config::Error), } -impl Error { - pub fn log(msg: impl fmt::Display) { - eprintln!("{}", msg); - } -} - impl std::error::Error for Error {} impl fmt::Debug for Error { diff --git a/src/main.rs b/src/main.rs index 3d151c1..87ae0a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,8 @@ use flodgatt::request::{Handler, Subscription}; use flodgatt::response::{RedisManager, SseStream, WsStream}; use flodgatt::Error; -use futures::{future::lazy, stream::Stream as _}; +use futures::future::lazy; +use futures::stream::Stream as _; use std::fs; use std::net::SocketAddr; use std::os::unix::fs::PermissionsExt; @@ -61,10 +62,12 @@ fn main() -> Result<(), Error> { #[cfg(feature = "stub_status")] #[rustfmt::skip] let status = { - let (r1, r3) = (shared_manager.clone(), shared_manager.clone()); + let (r1, r2, r3) = (shared_manager.clone(), shared_manager.clone(), shared_manager.clone()); request.health().map(|| "OK") .or(request.status() .map(move || r1.lock().unwrap_or_else(RedisManager::recover).count())) + .or(request.status_backpresure() + .map(move || r2.lock().unwrap_or_else(RedisManager::recover).backpresure())) .or(request.status_per_timeline() .map(move || r3.lock().unwrap_or_else(RedisManager::recover).list())) }; @@ -80,10 +83,12 @@ fn main() -> Result<(), Error> { let manager = shared_manager.clone(); let stream = Interval::new(Instant::now(), poll_freq) .map_err(|e| log::error!("{}", e)) - .for_each(move |_| { - let mut manager = manager.lock().unwrap_or_else(RedisManager::recover); - manager.poll_broadcast().map_err(Error::log) - }); + .for_each( + move |_| match manager.lock().unwrap_or_else(RedisManager::recover).poll() { + Err(e) => Ok(log::error!("{}", e)), + Ok(_) => Ok(()), + }, + ); warp::spawn(lazy(move || stream)); warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) diff --git a/src/request.rs b/src/request.rs index 2b3e58a..bd5c435 100644 --- a/src/request.rs +++ b/src/request.rs @@ -118,6 +118,10 @@ impl Handler { warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline").boxed() } + pub fn status_backpresure(&self) -> BoxedFilter<()> { + warp::path!("api" / "v1" / "streaming" / "status" / "backpresure").boxed() + } + pub fn err(r: Rejection) -> std::result::Result { use StatusCode as Code; let (msg, code) = match &r.cause().map(|cause| cause.to_string()).as_deref() { diff --git a/src/request/timeline.rs b/src/request/timeline.rs index 154a839..87b96d0 100644 --- a/src/request/timeline.rs +++ b/src/request/timeline.rs @@ -35,7 +35,6 @@ impl Timeline { } pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result { - // TODO -- does this need to account for namespaces? use {Content::*, Error::*, Reach::*, Stream::*}; Ok(match self { diff --git a/src/response.rs b/src/response.rs index be209c3..85b2898 100644 --- a/src/response.rs +++ b/src/response.rs @@ -14,4 +14,4 @@ mod stream; pub use redis::Error; #[cfg(feature = "bench")] -pub use redis::msg::{RedisMsg, RedisParseOutput}; +pub use redis::{RedisMsg, RedisParseOutput}; diff --git a/src/response/redis.rs b/src/response/redis.rs index 3f44f4d..6ce2ea7 100644 --- a/src/response/redis.rs +++ b/src/response/redis.rs @@ -7,6 +7,9 @@ pub(self) use connection::RedisConn; pub use manager::Error; pub use manager::Manager; +#[cfg(feature = "bench")] +pub use msg::{RedisMsg, RedisParseOutput}; + use connection::RedisConnErr; use msg::RedisParseErr; @@ -16,44 +19,50 @@ enum RedisCmd { } impl RedisCmd { - fn into_sendable(self, tl: &str) -> (Vec, Vec) { + fn into_sendable(self, timelines: &[String]) -> (Vec, Vec) { match self { - RedisCmd::Subscribe => ( - [ - b"*2\r\n$9\r\nsubscribe\r\n$", - tl.len().to_string().as_bytes(), - b"\r\n", - tl.as_bytes(), - b"\r\n", - ] - .concat(), - [ - b"*3\r\n$3\r\nSET\r\n$", - (tl.len() + "subscribed:".len()).to_string().as_bytes(), - b"\r\nsubscribed:", - tl.to_string().as_bytes(), - b"\r\n$1\r\n1\r\n", - ] - .concat(), - ), - RedisCmd::Unsubscribe => ( - [ - b"*2\r\n$11\r\nunsubscribe\r\n$", - tl.len().to_string().as_bytes(), - b"\r\n", - tl.as_bytes(), - b"\r\n", - ] - .concat(), - [ - b"*3\r\n$3\r\nSET\r\n$", - (tl.len() + "subscribed:".len()).to_string().as_bytes(), - b"\r\nsubscribed:", - tl.to_string().as_bytes(), - b"\r\n$1\r\n0\r\n", - ] - .concat(), - ), + RedisCmd::Subscribe => { + let primary = { + let mut cmd = format!("*{}\r\n$9\r\nsubscribe\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl)); + } + cmd + }; + let secondary = { + let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!( + "${}\r\nsubscribed:{}\r\n$1\r\n$1\r\n", + "subscribed:".len() + tl.len(), + tl + )); + } + cmd + }; + (primary.as_bytes().to_vec(), secondary.as_bytes().to_vec()) + } + RedisCmd::Unsubscribe => { + let primary = { + let mut cmd = format!("*{}\r\n$11\r\nunsubscribe\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl)); + } + cmd + }; + let secondary = { + let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!( + "${}\r\nsubscribed:{}\r\n$1\r\n$0\r\n", + "subscribed:".len() + tl.len(), + tl + )); + } + cmd + }; + (primary.as_bytes().to_vec(), secondary.as_bytes().to_vec()) + } } } } diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index 49c8cfe..c7edbf4 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -1,19 +1,15 @@ mod err; pub(crate) use err::RedisConnErr; -use super::msg::{RedisParseErr, RedisParseOutput}; use super::Error as ManagerErr; -use super::Event; use super::RedisCmd; use crate::config::Redis; use crate::request::Timeline; use futures::{Async, Poll}; use lru::LruCache; -use std::convert::{TryFrom, TryInto}; use std::io::{self, Read, Write}; use std::net::TcpStream; -use std::str; use std::time::Duration; type Result = std::result::Result; @@ -22,11 +18,13 @@ type Result = std::result::Result; pub(super) struct RedisConn { primary: TcpStream, secondary: TcpStream, - redis_namespace: Option, - tag_id_cache: LruCache, + pub(super) namespace: Option, + // TODO: eventually, it might make sense to have Mastodon publish to timelines with + // the tag number instead of the tag name. This would save us from dealing + // with a cache here and would be consistent with how lists/users are handled. + pub(super) tag_id_cache: LruCache, tag_name_cache: LruCache, - redis_input: Vec, - cursor: usize, + pub(super) input: Vec, } impl RedisConn { @@ -36,79 +34,32 @@ impl RedisConn { let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; conn.set_nonblocking(true) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let redis_conn = Self { + Ok(Self { primary: conn, secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, tag_id_cache: LruCache::new(1000), tag_name_cache: LruCache::new(1000), - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. - redis_namespace: redis_cfg.namespace.clone().0, - redis_input: vec![0_u8; 5000], - cursor: 0, - }; - Ok(redis_conn) + namespace: redis_cfg.namespace.clone().0, + input: vec![0; 4096 * 4], + }) } - - pub(super) fn poll_redis(&mut self) -> Poll, ManagerErr> { - loop { - match self.primary.read(&mut self.redis_input[self.cursor..]) { - Ok(n) => { - self.cursor += n; - if self.redis_input.len() - 1 == self.cursor { - self.redis_input.resize(self.redis_input.len() * 2, 0); - } else { - break; - } - } - Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => { - break; - } - Err(e) => break log::error!("{}", e), - }; + pub(super) fn poll_redis(&mut self, start: usize) -> Poll { + const BLOCK: usize = 4096 * 2; + if self.input.len() < start + BLOCK { + self.input.resize(self.input.len() * 2, 0); + log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); + // log::info!("Current buffer: {}", String::from_utf8_lossy(&self.input)); } - // at this point, we have the raw bytes; now, parse a msg - let input = &self.redis_input[..self.cursor]; - - let (input, invalid_bytes) = str::from_utf8(&input) - .map(|input| (input, "".as_bytes())) - .unwrap_or_else(|e| { - let (valid, invalid) = input.split_at(e.valid_up_to()); - (str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid) - }); - - use {Async::*, RedisParseOutput::*}; - let (res, leftover) = match RedisParseOutput::try_from(input) { - Ok(Msg(msg)) => match &self.redis_namespace { - Some(ns) if msg.timeline_txt.starts_with(&[ns, ":timeline:"].concat()) => { - let trimmed_tl = &msg.timeline_txt[ns.len() + ":timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl, &mut self.tag_id_cache)?; - let event = msg.event_txt.try_into()?; - (Ok(Ready(Some((tl, event)))), (msg.leftover_input)) - } - None => { - let trimmed_tl = &msg.timeline_txt["timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl, &mut self.tag_id_cache)?; - let event = msg.event_txt.try_into()?; - (Ok(Ready(Some((tl, event)))), (msg.leftover_input)) - } - Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input), - }, - - Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover), - Err(RedisParseErr::Incomplete) => (Ok(NotReady), input), - Err(other_parse_err) => (Err(ManagerErr::RedisParseErr(other_parse_err)), input), - }; - - // Store leftover in same buffer and set cursor to start after leftover next time - self.cursor = 0; - for byte in &[leftover.as_bytes(), invalid_bytes].concat() { - self.redis_input[self.cursor] = *byte; - self.cursor += 1; + use Async::*; + match self.primary.read(&mut self.input[start..start + BLOCK]) { + Ok(n) => Ok(Ready(n)), + Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady), + Err(e) => { + Ready(log::error!("{}", e)); + Ok(Ready(0)) + } } - res } pub(super) fn update_cache(&mut self, hashtag: String, id: i64) { @@ -116,15 +67,20 @@ impl RedisConn { self.tag_name_cache.put(id, hashtag); } - pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> { - let namespace = self.redis_namespace.take(); - let hashtag = timeline.tag().and_then(|id| self.tag_name_cache.get(&id)); - let tl = match &namespace { - Some(ns) => format!("{}:{}", ns, timeline.to_redis_raw_timeline(hashtag)?), - None => timeline.to_redis_raw_timeline(hashtag)?, - }; + pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { + let namespace = self.namespace.take(); + let timelines: Result> = timelines + .iter() + .map(|tl| { + let hashtag = tl.tag().and_then(|id| self.tag_name_cache.get(&id)); + match &namespace { + Some(ns) => Ok(format!("{}:{}", ns, tl.to_redis_raw_timeline(hashtag)?)), + None => Ok(tl.to_redis_raw_timeline(hashtag)?), + } + }) + .collect(); - let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl); + let (primary_cmd, secondary_cmd) = cmd.into_sendable(&timelines?[..]); self.primary.write_all(&primary_cmd)?; // We also need to set a key to tell the Puma server that we've subscribed or @@ -145,6 +101,7 @@ impl RedisConn { Self::validate_connection(&mut conn, &addr)?; conn.set_read_timeout(Some(Duration::from_millis(10))) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + Self::set_connection_name(&mut conn, &addr)?; Ok(conn) } @@ -172,14 +129,27 @@ impl RedisConn { fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { conn.write_all(b"PING\r\n") .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 7]; - conn.read_exact(&mut buffer) + let mut buffer = vec![0_u8; 100]; + conn.read(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let reply = String::from_utf8_lossy(&buffer); + match &*reply { + r if r.starts_with("+PONG\r\n") => Ok(()), + r if r.starts_with("-NOAUTH") => Err(RedisConnErr::MissingPassword), + r if r.starts_with("HTTP/1.") => Err(RedisConnErr::NotRedis(addr.to_string())), + _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), + } + } + + fn set_connection_name(conn: &mut TcpStream, addr: &str) -> Result<()> { + conn.write_all(b"*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$8\r\nflodgatt\r\n") + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0_u8; 100]; + conn.read(&mut buffer) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; let reply = String::from_utf8_lossy(&buffer); match &*reply { - "+PONG\r\n" => Ok(()), - "-NOAUTH" => Err(RedisConnErr::MissingPassword), - "HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())), + r if r.starts_with("+OK\r\n") => Ok(()), _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), } } diff --git a/src/response/redis/connection/err.rs b/src/response/redis/connection/err.rs index 893dd30..da98940 100644 --- a/src/response/redis/connection/err.rs +++ b/src/response/redis/connection/err.rs @@ -31,7 +31,7 @@ impl fmt::Display for RedisConnErr { addr, inner ), InvalidRedisReply(unexpected_reply) => format!( - "Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`", + "Received and unexpected reply from Redis: `{}`", unexpected_reply ), UnknownRedisErr(io_err) => { diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index 4a3d3f1..ceaf243 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -4,37 +4,111 @@ mod err; pub use err::Error; -use super::Event; -use super::{RedisCmd, RedisConn}; +use super::msg::{RedisParseErr, RedisParseOutput}; +use super::{Event, RedisCmd, RedisConn}; use crate::config; use crate::request::{Subscription, Timeline}; pub(self) use super::EventErr; -use futures::Async; +use futures::{Async, Poll, Stream}; use hashbrown::{HashMap, HashSet}; +use std::convert::{TryFrom, TryInto}; +use std::str; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; type Result = std::result::Result; +type EventChannel = Sender>; /// The item that streams from Redis and is polled by the `ClientAgent` pub struct Manager { - redis_connection: RedisConn, - timelines: HashMap>>>, + redis_conn: RedisConn, + timelines: HashMap>, ping_time: Instant, channel_id: u32, + unread_idx: (usize, usize), +} + +impl Stream for Manager { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll, Error> { + if self.ping_time.elapsed() > Duration::from_secs(30) { + self.send_pings()? + } + + while let Async::Ready(msg_len) = self.redis_conn.poll_redis(self.unread_idx.1)? { + self.unread_idx.1 += msg_len; + let input = &self.redis_conn.input[..self.unread_idx.1]; + let mut unread = str::from_utf8(input).unwrap_or_else(|e| { + str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`") + }); + + while !unread.is_empty() { + let tag_id_cache = &mut self.redis_conn.tag_id_cache; + let redis_namespace = &self.redis_conn.namespace; + + use {Error::InvalidId, RedisParseOutput::*}; + unread = match RedisParseOutput::try_from(unread) { + Ok(Msg(msg)) => { + let trimmed_tl = match redis_namespace { + Some(ns) if msg.timeline_txt.starts_with(ns) => { + Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..]) + } + None => Some(&msg.timeline_txt["timeline:".len()..]), + Some(_non_matching_ns) => None, + }; + + if let Some(trimmed_tl) = trimmed_tl { + let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?; + let event: Arc = Arc::new(msg.event_txt.try_into()?); + let channels = self.timelines.get_mut(&tl).ok_or(InvalidId)?; + for (_id, channel) in channels { + if let Ok(Async::NotReady) = channel.poll_ready() { + log::warn!("{:?} channel full", tl); + return Ok(Async::NotReady); + } + let _ = channel.try_send(event.clone()); // err just means channel will be closed + } + } else { + // skip messages for different Redis namespaces + } + msg.leftover_input + } + Ok(NonMsg(leftover_input)) => leftover_input, + Err(RedisParseErr::Incomplete) => { + log::info!("Copying partial message"); + let (read, unread) = self.redis_conn.input[..self.unread_idx.1] + .split_at_mut(self.unread_idx.0); + for (i, b) in unread.iter().enumerate() { + read[i] = *b; + } + self.unread_idx = (0, unread.len()); + break; + } + Err(e) => Err(e)?, + }; + self.unread_idx.0 = self.unread_idx.1 - unread.len(); + } + + self.unread_idx = (0, 0) // reaching here means last msg was complete; reuse the full buffer + } + Ok(Async::Ready(Some(()))) + } } impl Manager { /// Create a new `Manager`, with its own Redis connections (but no active subscriptions). pub fn try_from(redis_cfg: &config::Redis) -> Result { Ok(Self { - redis_connection: RedisConn::new(redis_cfg)?, + redis_conn: RedisConn::new(redis_cfg)?, timelines: HashMap::new(), ping_time: Instant::now(), channel_id: 0, + unread_idx: (0, 0), }) } @@ -42,10 +116,10 @@ impl Manager { Arc::new(Mutex::new(self)) } - pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender>) { + pub fn subscribe(&mut self, subscription: &Subscription, channel: EventChannel) { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { - self.redis_connection.update_cache(hashtag, id); + self.redis_conn.update_cache(hashtag, id); }; let channels = self.timelines.entry(tl).or_default(); @@ -53,66 +127,37 @@ impl Manager { self.channel_id += 1; if channels.len() == 1 { - self.redis_connection - .send_cmd(RedisCmd::Subscribe, &tl) + self.redis_conn + .send_cmd(RedisCmd::Subscribe, &[tl]) .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)); + log::info!("Subscribed to {:?}", tl); }; } - pub(crate) fn unsubscribe(&mut self, tl: &Timeline) -> Result<()> { - self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?; - self.timelines.remove(&tl); - Ok(log::info!("Ended stream for {:?}", tl)) - } - - pub fn poll_broadcast(&mut self) -> Result<()> { - let mut completed_timelines = HashSet::new(); - let log_send_err = |tl, e| Some(log::error!("cannot send to {:?}: {}", tl, e)).is_some(); - - if self.ping_time.elapsed() > Duration::from_secs(30) { - self.ping_time = Instant::now(); - for (tl, channels) in self.timelines.iter_mut() { - channels.retain(|_, chan| match chan.try_send(Arc::new(Event::Ping)) { - Ok(()) => true, - Err(e) if !e.is_closed() => log_send_err(*tl, e), - Err(_) => false, - }); - - // NOTE: this takes two cycles to close a connection after the client - // times out: on the first cycle, this fn sends the Event to the - // response::Ws thread without any error, but that thread encounters an - // error sending to the client and ends. On the *second* cycle, this fn - // gets the error it's waiting on to clean up the connection. This isn't - // ideal, but is harmless, since the only reason we haven't cleaned up the - // connection is that no messages are being sent to that client. - if channels.is_empty() { - completed_timelines.insert(*tl); - } - } - }; - - loop { - match self.redis_connection.poll_redis() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some((tl, event)))) => { - let sendable_event = Arc::new(event); - let channels = self.timelines.get_mut(&tl).ok_or(Error::InvalidId)?; - channels.retain(|_, chan| match chan.try_send(sendable_event.clone()) { - Ok(()) => true, - Err(e) if !e.is_closed() => log_send_err(tl, e), - Err(_) => false, - }); - if channels.is_empty() { - completed_timelines.insert(tl); - } - } - Ok(Async::Ready(None)) => (), // cmd or msg for other namespace - Err(err) => log::error!("{}", err), // drop msg, log err, and proceed + fn send_pings(&mut self) -> Result<()> { + // NOTE: this takes two cycles to close a connection after the client times out: on + // the first cycle, this successfully sends the Event to the response::Ws thread but + // that thread fatally errors sending to the client. On the *second* cycle, this + // gets the error. This isn't ideal, but is harmless. + + self.ping_time = Instant::now(); + let mut subscriptions_to_close = HashSet::new(); + self.timelines.retain(|tl, channels| { + channels.retain(|_, chan| chan.try_send(Arc::new(Event::Ping)).is_ok()); + + if channels.is_empty() { + subscriptions_to_close.insert(*tl); + false + } else { + true } - } - - for tl in &mut completed_timelines.iter() { - self.unsubscribe(tl)?; + }); + if !subscriptions_to_close.is_empty() { + let timelines: Vec<_> = subscriptions_to_close.into_iter().collect(); + &self + .redis_conn + .send_cmd(RedisCmd::Unsubscribe, &timelines[..])?; + log::info!("Unsubscribed from {:?}", timelines); } Ok(()) } @@ -129,6 +174,13 @@ impl Manager { ) } + pub fn backpresure(&self) -> String { + format!( + "Input buffer size: {} KiB", + (self.unread_idx.1 - self.unread_idx.0) / 1024 + ) + } + pub fn list(&self) -> String { let max_len = self .timelines diff --git a/src/response/redis/manager/err.rs b/src/response/redis/manager/err.rs index 9f7dcad..b4dc064 100644 --- a/src/response/redis/manager/err.rs +++ b/src/response/redis/manager/err.rs @@ -1,18 +1,18 @@ use super::super::{RedisConnErr, RedisParseErr}; use super::{Event, EventErr}; -use crate::request::{Timeline, TimelineErr}; +use crate::request::TimelineErr; use std::fmt; +use std::sync::Arc; + #[derive(Debug)] pub enum Error { InvalidId, - TimelineErr(TimelineErr), EventErr(EventErr), RedisParseErr(RedisParseErr), RedisConnErr(RedisConnErr), - ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>), - ChannelSendErr2(tokio::sync::mpsc::error::UnboundedTrySendError), + ChannelSendErr(tokio::sync::mpsc::error::TrySendError>), } impl std::error::Error for Error {} @@ -30,22 +30,16 @@ impl fmt::Display for Error { RedisConnErr(inner) => write!(f, "{}", inner), TimelineErr(inner) => write!(f, "{}", inner), ChannelSendErr(inner) => write!(f, "{}", inner), - ChannelSendErr2(inner) => write!(f, "{}", inner), }?; Ok(()) } } -impl From> for Error { - fn from(error: tokio::sync::watch::error::SendError<(Timeline, Event)>) -> Self { +impl From>> for Error { + fn from(error: tokio::sync::mpsc::error::TrySendError>) -> Self { Self::ChannelSendErr(error) } } -impl From> for Error { - fn from(error: tokio::sync::mpsc::error::UnboundedTrySendError) -> Self { - Self::ChannelSendErr2(error) - } -} impl From for Error { fn from(error: EventErr) -> Self { diff --git a/src/response/redis/msg.rs b/src/response/redis/msg.rs index bfa2344..8af186b 100644 --- a/src/response/redis/msg.rs +++ b/src/response/redis/msg.rs @@ -82,8 +82,11 @@ fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParse fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> { let s = s.get(start..).ok_or(Incomplete)?; + if s.len() < 2 { + Err(Incomplete)?; + } if !s.starts_with("\r\n") { - return Err(RedisParseErr::InvalidLineEnd); + Err(InvalidLineEnd)?; } Ok(s.get("\r\n".len()..).ok_or(Incomplete)?) } diff --git a/src/response/stream/ws.rs b/src/response/stream/ws.rs index e5b1227..f350def 100644 --- a/src/response/stream/ws.rs +++ b/src/response/stream/ws.rs @@ -50,9 +50,12 @@ impl Ws { e => log::warn!("WebSocket send error: {}", e), }) } - fn filtered(&mut self, update: &impl Payload) -> bool { + fn filtered(&mut self, update: &T) -> bool { let (blocks, allowed_langs) = (&self.0.blocks, &self.0.allowed_langs); - let skip = |msg| Some(log::info!("{:?} msg skipped - {}", self.0.timeline, msg)).is_some(); + let skip = |msg| { + // Some(log::info!("{:?} msg skipped - {}\n{:?}", self.0.timeline, msg, update)).is_some() + Some(log::info!("{:?} msg skipped - {}", self.0.timeline, msg)).is_some() + }; match self.0.timeline { tl if tl.is_public()