From eb37fa064274d9193fcb2b2b83f9870bd2ea809e Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Tue, 24 Oct 2017 16:18:20 -0700 Subject: [PATCH] feat: fill in missing metrics for autopush_rs the rust side's metrics (via cadence) currently lack the ability to pass datadog tags, currently ua.{message_data,connection.lifespan} Issue: #1054 --- autopush/tests/test_webpush_server.py | 8 +++++++- autopush/webpush_server.py | 20 +++++++++++++++--- autopush_rs/Cargo.lock | 16 +++++++++++++++ autopush_rs/Cargo.toml | 1 + autopush_rs/__init__.py | 2 ++ autopush_rs/src/call.rs | 1 + autopush_rs/src/client.rs | 29 +++++++++++++++++++++++++-- autopush_rs/src/errors.rs | 2 ++ autopush_rs/src/lib.rs | 1 + autopush_rs/src/protocol.rs | 2 +- autopush_rs/src/server/metrics.rs | 23 +++++++++++++++++++++ autopush_rs/src/server/mod.rs | 15 ++++++++++++-- 12 files changed, 111 insertions(+), 9 deletions(-) create mode 100644 autopush_rs/src/server/metrics.rs diff --git a/autopush/tests/test_webpush_server.py b/autopush/tests/test_webpush_server.py index 4df8c042..2805ff37 100644 --- a/autopush/tests/test_webpush_server.py +++ b/autopush/tests/test_webpush_server.py @@ -13,8 +13,8 @@ from autopush.db import ( DatabaseManager, - make_rotating_tablename, generate_last_connect, + make_rotating_tablename, ) from autopush.metrics import SinkMetrics from autopush.config import AutopushConfig @@ -234,6 +234,9 @@ def test_nonexisting_uaid(self): assert isinstance(result, HelloResponse) assert hello.uaid != result.uaid assert result.check_storage is False + assert result.connected_at == hello.connected_at + assert self.metrics.increment.called + assert self.metrics.increment.call_args[0][0] == 'ua.command.hello' def test_existing_uaid(self): p = self._makeFUT() @@ -245,6 +248,9 @@ def test_existing_uaid(self): assert isinstance(result, HelloResponse) assert hello.uaid.hex == result.uaid assert result.check_storage is True + assert result.connected_at == hello.connected_at + assert self.metrics.increment.called + assert self.metrics.increment.call_args[0][0] == 'ua.command.hello' def test_existing_newer_uaid(self): p = self._makeFUT() diff --git a/autopush/webpush_server.py b/autopush/webpush_server.py index b7a5fda6..1c8e6d04 100644 --- a/autopush/webpush_server.py +++ b/autopush/webpush_server.py @@ -165,6 +165,7 @@ class HelloResponse(OutputCommand): message_month = attrib() # type: str check_storage = attrib() # type: bool reset_uaid = attrib() # type: bool + connected_at = attrib() # type: int rotate_message_table = attrib(default=False) # type: bool @@ -212,7 +213,6 @@ def __init__(self, conf, db, num_threads=10): self.db = db self.db.setup_tables() self.num_threads = num_threads - self.metrics = self.db.metrics self.incoming = AutopushQueue() self.workers = [] # type: List[Thread] self.command_processor = CommandProcessor(conf, self.db) @@ -357,10 +357,12 @@ def process(self, hello): # Save the UAID as register_user removes it uaid = user_item["uaid"] # type: str success, _ = self.db.router.register_user(user_item) + flags["connected_at"] = hello.connected_at if not success: # User has already connected more recently elsewhere return HelloResponse(uaid=None, **flags) + self.metrics.increment('ua.command.hello') return HelloResponse(uaid=uaid, **flags) def lookup_user(self, hello): @@ -380,13 +382,13 @@ def lookup_user(self, hello): # All records must have a router_type and connected_at, in some odd # cases a record exists for some users without it if "router_type" not in record or "connected_at" not in record: - self.db.router.drop_user(uaid) + self.drop_user(uaid, record, 104) return None, flags # Current month must exist and be a valid prior month if ("current_month" not in record) or record["current_month"] \ not in self.db.message_tables: - self.db.router.drop_user(uaid) + self.drop_user(uaid, record, 105) return None, flags # If we got here, its a valid user that needs storage checked @@ -425,6 +427,18 @@ def create_user(self, hello): current_month=self.db.current_msg_month, ) + def drop_user(self, uaid, uaid_record, code): + # type: (str, dict, int) -> None + """Drop a user record""" + log.debug( + "Dropping User", + code=code, + uaid_hash=hasher(uaid), + uaid_record=repr(uaid_record) + ) + self.metrics.increment('ua.expiration', tags=['code:{}'.format(code)]) + self.db.router.drop_user(uaid) + class CheckStorageCommand(ProcessorCommand): def process(self, command): diff --git a/autopush_rs/Cargo.lock b/autopush_rs/Cargo.lock index c73a85ef..7edf6c92 100644 --- a/autopush_rs/Cargo.lock +++ b/autopush_rs/Cargo.lock @@ -3,6 +3,7 @@ name = "autopush" version = "0.1.0" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "cadence 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -80,6 +81,14 @@ dependencies = [ "iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "cadence" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cfg-if" version = "0.1.2" @@ -93,6 +102,11 @@ dependencies = [ "custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "custom_derive" version = "0.1.7" @@ -658,8 +672,10 @@ dependencies = [ "checksum bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4efd02e230a02e18f92fc2735f44597385ed02ad8f831e7c1c1156ee5e1ab3a5" "checksum byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff81738b726f5d099632ceaffe7fb65b90212e8dce59d518729e7e8634032d3d" "checksum bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d828f97b58cc5de3e40c421d0cf2132d6b2da4ee0e11b8632fa838f0f9333ad6" +"checksum cadence 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b48d3db3b2321bc9275c142fa2ddf04d2bcaa651a3b2acfaf8f4a1919402c88e" "checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de" "checksum conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "78ff10625fd0ac447827aa30ea8b861fead473bb60aeb73af6c1c58caf0d1299" +"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9" "checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" diff --git a/autopush_rs/Cargo.toml b/autopush_rs/Cargo.toml index 9b351140..b4ff23bf 100644 --- a/autopush_rs/Cargo.toml +++ b/autopush_rs/Cargo.toml @@ -8,6 +8,7 @@ crate-type = ["cdylib"] [dependencies] bytes = "0.4" +cadence = "0.12.1" env_logger = { version = "0.4", default-features = false } error-chain = "0.10" futures = "0.1" diff --git a/autopush_rs/__init__.py b/autopush_rs/__init__.py index 5f59fbd9..8c748519 100644 --- a/autopush_rs/__init__.py +++ b/autopush_rs/__init__.py @@ -36,6 +36,8 @@ def __init__(self, conf, queue): cfg.ssl_key = ffi_from_buffer(conf.ssl.key) cfg.url = ffi_from_buffer(conf.ws_url) cfg.json_logging = True + cfg.statsd_host = ffi_from_buffer(conf.statsd_host) + cfg.statsd_port = conf.statsd_port ptr = _call(lib.autopush_server_new, cfg) self.ffi = ffi.gc(ptr, lib.autopush_server_free) diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index 4d69ad3d..1a848d7d 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -178,6 +178,7 @@ pub struct HelloResponse { pub check_storage: bool, pub reset_uaid: bool, pub rotate_message_table: bool, + pub connected_at: u64, } #[derive(Deserialize)] diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index 6996bfce..f3ae5110 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -7,6 +7,7 @@ use std::rc::Rc; +use cadence::prelude::*; use futures::AsyncSink; use futures::future::Either; use futures::sync::mpsc; @@ -48,6 +49,7 @@ pub struct WebPushClient { // Highest version from stored, retained for use with increment // when all the unacked storeds are ack'd unacked_stored_highest: Option, + connected_at: u64, } impl WebPushClient { @@ -172,6 +174,16 @@ where if more_messages.is_some() { let mut messages = more_messages.take().unwrap(); if let Some(message) = messages.pop() { + if let Some(_) = message.topic { + self.data.srv.metrics.incr("ua.notification.topic")?; + } + // XXX: tags + self.data.srv.metrics.count( + "ua.message_data", + message.data.as_ref().map_or(0, |d| { + d.len() as i64 + }), + )?; ClientState::FinishSend( Some(ServerMessage::Notification(message)), Some(Box::new(ClientState::SendMessages(if messages.len() > 0 { @@ -220,8 +232,10 @@ where } => uaid, _ => return Err("Invalid message, must be hello".into()), }; - let ms_time = time::precise_time_ns() / 1000; - ClientState::WaitingForProcessHello(self.data.srv.hello(&ms_time, uaid.as_ref())) + let connected_at = time::precise_time_ns() / 1000; + ClientState::WaitingForProcessHello( + self.data.srv.hello(&connected_at, uaid.as_ref()), + ) } ClientState::WaitingForProcessHello(ref mut response) => { debug!("State: WaitingForProcessHello"); @@ -232,6 +246,7 @@ where check_storage, reset_uaid, rotate_message_table, + connected_at, } => { self.data.process_hello( uaid, @@ -239,6 +254,7 @@ where reset_uaid, rotate_message_table, check_storage, + connected_at, ) } call::HelloResponse { uaid: None, .. } => { @@ -466,6 +482,7 @@ where reset_uaid: bool, rotate_message_table: bool, check_storage: bool, + connected_at: u64, ) -> ClientState { let (tx, rx) = mpsc::unbounded(); let mut flags = ClientFlags::new(); @@ -480,6 +497,7 @@ where unacked_direct_notifs: Vec::new(), unacked_stored_notifs: Vec::new(), unacked_stored_highest: None, + connected_at: connected_at, }); self.srv.connect_client( RegisteredClient { uaid: uaid, tx: tx }, @@ -523,6 +541,7 @@ where } fn process_acks(&mut self, updates: Vec) -> ClientState { + self.srv.metrics.incr("ua.command.ack").ok(); let webpush = self.webpush.as_mut().unwrap(); let mut fut: Option> = None; for notif in updates.iter() { @@ -588,8 +607,14 @@ where pub fn shutdown(&mut self) { // If we made it past hello, do more cleanup + if self.webpush.is_some() { let webpush = self.webpush.take().unwrap(); + let now = time::precise_time_ns() / 1000; + let elapsed = now - webpush.connected_at; + // XXX: tags + self.srv.metrics.time("ua.connection.lifespan", elapsed).ok(); + // If there's direct unack'd messages, they need to be saved out without blocking // here self.srv.disconnet_client(&webpush.uaid); diff --git a/autopush_rs/src/errors.rs b/autopush_rs/src/errors.rs index 5781e14f..32181a75 100644 --- a/autopush_rs/src/errors.rs +++ b/autopush_rs/src/errors.rs @@ -27,6 +27,7 @@ use std::any::Any; use std::error; use std::io; +use cadence; use futures::Future; use httparse; use serde_json; @@ -38,6 +39,7 @@ error_chain! { Io(io::Error); Json(serde_json::Error); Httparse(httparse::Error); + MetricError(cadence::MetricError); } errors { diff --git a/autopush_rs/src/lib.rs b/autopush_rs/src/lib.rs index 30f8c0d2..6799bf10 100644 --- a/autopush_rs/src/lib.rs +++ b/autopush_rs/src/lib.rs @@ -61,6 +61,7 @@ //! Otherwise be sure to check out each module for more documentation! extern crate bytes; +extern crate cadence; extern crate env_logger; #[macro_use] extern crate futures; diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index 153c3be6..510758ed 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -87,7 +87,7 @@ pub struct Notification { pub topic: Option, pub timestamp: u64, #[serde(skip_serializing_if = "Option::is_none")] - data: Option, + pub data: Option, #[serde(skip_serializing_if = "Option::is_none")] headers: Option>, } diff --git a/autopush_rs/src/server/metrics.rs b/autopush_rs/src/server/metrics.rs new file mode 100644 index 00000000..3654ff32 --- /dev/null +++ b/autopush_rs/src/server/metrics.rs @@ -0,0 +1,23 @@ +//! Metrics tie-ins + +use std::net::UdpSocket; + +use cadence::{BufferedUdpMetricSink, NopMetricSink, QueuingMetricSink, StatsdClient}; + +use errors::*; +use server::ServerOptions; + +/// Create a cadence StatsdClient from the given options +pub fn metrics_from_opts(opts: &ServerOptions) -> Result { + Ok(if let Some(statsd_host) = opts.statsd_host.as_ref() { + let socket = UdpSocket::bind("0.0.0.0:0")?; + socket.set_nonblocking(true)?; + + let host = (statsd_host.as_str(), opts.statsd_port); + let udp_sink = BufferedUdpMetricSink::from(host, socket)?; + let sink = QueuingMetricSink::from(udp_sink); + StatsdClient::from_sink("autopush", sink) + } else { + StatsdClient::from_sink("autopush", NopMetricSink) + }) +} diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 9882d345..66e22c0a 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::thread; use std::time::{Instant, Duration}; +use cadence::StatsdClient; use futures::sync::oneshot; use futures::task; use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend}; @@ -30,10 +31,12 @@ use protocol::{ClientMessage, ServerMessage, ServerNotification, Notification}; use queue::{self, AutopushQueue}; use rt::{self, AutopushError, UnwindGuard}; use server::dispatch::{Dispatch, RequestType}; +use server::metrics::metrics_from_opts; use server::webpush_io::WebpushIo; use util::{self, RcObject, timeout}; mod dispatch; +mod metrics; mod tls; mod webpush_io; @@ -66,6 +69,8 @@ pub struct AutopushServerOptions { pub max_connections: u32, pub close_handshake_timeout: u32, pub json_logging: i32, + pub statsd_host: *const c_char, + pub statsd_port: u16, } pub struct Server { @@ -75,6 +80,7 @@ pub struct Server { pub tx: queue::Sender, pub opts: Arc, pub handle: Handle, + pub metrics: StatsdClient, } pub struct ServerOptions { @@ -92,6 +98,8 @@ pub struct ServerOptions { pub auto_ping_timeout: Duration, pub max_connections: Option, pub close_handshake_timeout: Option, + pub statsd_host: Option, + pub statsd_port: u16, } fn resolve(host: &str) -> IpAddr { @@ -145,6 +153,8 @@ pub extern "C" fn autopush_server_new( .to_string(), port: opts.port, router_port: opts.router_port, + statsd_host: to_s(opts.statsd_host).map(|s| s.to_string()), + statsd_port: opts.statsd_port, url: to_s(opts.url).expect("url must be specified").to_string(), ssl_key: to_s(opts.ssl_key).map(PathBuf::from), ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from), @@ -232,7 +242,7 @@ impl Server { fn start( opts: &Arc, tx: queue::Sender, - ) -> io::Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> { + ) -> Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> { let (donetx, donerx) = oneshot::channel(); let (inittx, initrx) = oneshot::channel(); @@ -278,7 +288,7 @@ impl Server { } } - fn new(opts: &Arc, tx: queue::Sender) -> io::Result<(Rc, Core)> { + fn new(opts: &Arc, tx: queue::Sender) -> Result<(Rc, Core)> { let core = Core::new()?; let srv = Rc::new(Server { opts: opts.clone(), @@ -287,6 +297,7 @@ impl Server { handle: core.handle(), tx: tx, tls_acceptor: tls::configure(opts), + metrics: metrics_from_opts(opts)?, }); let host_ip = resolve(&srv.opts.host_ip); let addr = format!("{}:{}", host_ip, srv.opts.port);