diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index abd69084..0e3a9f70 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -24,8 +24,8 @@ use serde::ser; use serde_json; use errors::*; -use rt::{self, AutopushError, UnwindGuard}; use protocol; +use rt::{self, AutopushError, UnwindGuard}; use server::Server; pub struct AutopushPythonCall { diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index 7f765cfa..4e65c417 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -696,7 +696,9 @@ where ); transition!(AwaitMigrateUser { response, data }); } else if all_acked && webpush.flags.reset_uaid { - let response = data.srv.ddb.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid); + let response = data.srv + .ddb + .drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid); transition!(AwaitDropUser { response, data }); } transition!(AwaitInput { data }) diff --git a/autopush_rs/src/errors.rs b/autopush_rs/src/errors.rs index 9ee0b9af..81dd765e 100644 --- a/autopush_rs/src/errors.rs +++ b/autopush_rs/src/errors.rs @@ -24,15 +24,15 @@ //! online. use std::any::Any; -use std::num; use std::error; use std::io; +use std::num; use cadence; use futures::Future; use httparse; -use serde_json; use sentry; +use serde_json; use tungstenite; use uuid; diff --git a/autopush_rs/src/http.rs b/autopush_rs/src/http.rs index c8f59644..2a0880d5 100644 --- a/autopush_rs/src/http.rs +++ b/autopush_rs/src/http.rs @@ -7,13 +7,13 @@ //! PUT /push/UAID - Deliver notification to a client //! PUT /notify/UAID - Tell a client to check storage -use std::str; use std::rc::Rc; +use std::str; use futures::future::ok; use futures::{Future, Stream}; -use hyper::{Method, StatusCode}; use hyper; +use hyper::{Method, StatusCode}; use serde_json; use tokio_service::Service; use uuid::Uuid; diff --git a/autopush_rs/src/lib.rs b/autopush_rs/src/lib.rs index 8ddbe250..f400a38c 100644 --- a/autopush_rs/src/lib.rs +++ b/autopush_rs/src/lib.rs @@ -124,5 +124,5 @@ mod protocol; #[macro_use] pub mod rt; pub mod call; -pub mod server; pub mod queue; +pub mod server; diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index bc455afa..575020ab 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -139,9 +139,14 @@ impl Notification { if let Some(ref topic) = self.topic { format!("01:{}:{}", chid, topic) } else if let Some(sortkey_timestamp) = self.sortkey_timestamp { - format!("02:{}:{}", - if sortkey_timestamp == 0 { ms_since_epoch() } else { sortkey_timestamp }, - chid + format!( + "02:{}:{}", + if sortkey_timestamp == 0 { + ms_since_epoch() + } else { + sortkey_timestamp + }, + chid ) } else { // Legacy messages which we should never get anymore diff --git a/autopush_rs/src/queue.rs b/autopush_rs/src/queue.rs index 01b0cc7b..04f05a65 100644 --- a/autopush_rs/src/queue.rs +++ b/autopush_rs/src/queue.rs @@ -5,8 +5,8 @@ //! pushing requests over to Python. A `Sender` here is saved off in the //! `Server` for sending messages. -use std::sync::mpsc; use std::sync::Mutex; +use std::sync::mpsc; use call::{AutopushPythonCall, PythonCall}; use rt::{self, AutopushError}; diff --git a/autopush_rs/src/rt.rs b/autopush_rs/src/rt.rs index e97bd01a..c8610cdd 100644 --- a/autopush_rs/src/rt.rs +++ b/autopush_rs/src/rt.rs @@ -31,11 +31,11 @@ //! functions throughout this crate and copy those idioms, otherwise there's //! documentation on each specific function here. -use std::panic; -use std::ptr; -use std::mem; use std::any::Any; use std::cell::Cell; +use std::mem; +use std::panic; +use std::ptr; /// Generic error which is used on all function calls from Python into Rust. /// diff --git a/autopush_rs/src/server/dispatch.rs b/autopush_rs/src/server/dispatch.rs index 30402909..14494aa1 100644 --- a/autopush_rs/src/server/dispatch.rs +++ b/autopush_rs/src/server/dispatch.rs @@ -28,8 +28,8 @@ use tokio_core::net::TcpStream; use tokio_io::AsyncRead; use errors::*; -use server::webpush_io::WebpushIo; use server::tls::MaybeTlsStream; +use server::webpush_io::WebpushIo; pub struct Dispatch { socket: Option>, diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 71de4c9f..28bc75da 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -33,8 +33,8 @@ use tokio_core::net::TcpListener; use tokio_core::reactor::{Core, Handle, Timeout}; use tokio_io; use tokio_tungstenite::{accept_hdr_async, WebSocketStream}; -use tungstenite::handshake::server::Request; use tungstenite::Message; +use tungstenite::handshake::server::Request; use uuid::Uuid; use client::{Client, RegisteredClient}; @@ -47,10 +47,10 @@ use rt::{self, AutopushError, UnwindGuard}; use server::dispatch::{Dispatch, RequestType}; use server::metrics::metrics_from_opts; use server::webpush_io::WebpushIo; -use util::{self, timeout, RcObject}; use util::ddb_helpers::DynamoStorage; use util::megaphone::{ClientServices, MegaphoneAPIResponse, Service, ServiceChangeTracker, ServiceClientInit}; +use util::{self, timeout, RcObject}; mod dispatch; mod metrics; @@ -189,16 +189,20 @@ pub extern "C" fn autopush_server_new( router_port: opts.router_port, statsd_host: to_s(opts.statsd_host).map(|s| s.to_string()), statsd_port: opts.statsd_port, - message_table_names: to_s(opts.message_table_names).map(|s| s.to_string()) + message_table_names: to_s(opts.message_table_names) + .map(|s| s.to_string()) .expect("message table names must be specified") .split(",") .map(|s| s.trim().to_string()) .collect(), - router_table_name: to_s(opts.router_table_name).map(|s| s.to_string()) + router_table_name: to_s(opts.router_table_name) + .map(|s| s.to_string()) .expect("router table name must be specified"), - router_url: to_s(opts.router_url).map(|s| s.to_string()) + router_url: to_s(opts.router_url) + .map(|s| s.to_string()) .expect("router url must be specified"), - endpoint_url: to_s(opts.endpoint_url).map(|s| s.to_string()) + endpoint_url: to_s(opts.endpoint_url) + .map(|s| s.to_string()) .expect("endpoint url must be specified"), ssl_key: to_s(opts.ssl_key).map(PathBuf::from), ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from), @@ -518,10 +522,18 @@ impl Server { let key_digest = hash::hash(hash::MessageDigest::sha256(), &raw_key) .chain_err(|| "Error creating message digest for key")?; base.extend(key_digest.iter()); - let encrypted = self.opts.fernet.encrypt(&base).trim_matches('=').to_string(); + let encrypted = self.opts + .fernet + .encrypt(&base) + .trim_matches('=') + .to_string(); Ok(format!("{}v2/{}", root, encrypted)) } else { - let encrypted = self.opts.fernet.encrypt(&base).trim_matches('=').to_string(); + let encrypted = self.opts + .fernet + .encrypt(&base) + .trim_matches('=') + .to_string(); Ok(format!("{}v1/{}", root, encrypted)) } } diff --git a/autopush_rs/src/server/tls.rs b/autopush_rs/src/server/tls.rs index 55277038..315df39d 100644 --- a/autopush_rs/src/server/tls.rs +++ b/autopush_rs/src/server/tls.rs @@ -20,8 +20,8 @@ use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{SslAcceptorExt, SslStream}; -use server::{Server, ServerOptions}; use errors::*; +use server::{Server, ServerOptions}; /// Creates an `SslAcceptor`, if needed, ready to accept TLS connections. /// diff --git a/autopush_rs/src/util/autojson.rs b/autopush_rs/src/util/autojson.rs index 644e7e46..48b29733 100644 --- a/autopush_rs/src/util/autojson.rs +++ b/autopush_rs/src/util/autojson.rs @@ -19,15 +19,15 @@ // }}} use chrono; use serde; -use serde_json; use serde::ser::SerializeMap; +use serde_json; use slog; +use slog::Record; use slog::{FnValue, PushFnValue}; use slog::{OwnedKVList, SendSyncRefUnwindSafeKV, KV}; -use slog::Record; use std; -use std::{fmt, io, result}; use std::io::Cursor; +use std::{fmt, io, result}; use std::cell::RefCell; use std::fmt::Write; diff --git a/autopush_rs/src/util/ddb_helpers.rs b/autopush_rs/src/util/ddb_helpers.rs index 7f0c6d0e..a460cf66 100644 --- a/autopush_rs/src/util/ddb_helpers.rs +++ b/autopush_rs/src/util/ddb_helpers.rs @@ -293,12 +293,9 @@ struct RangeKey { impl DynamoDbNotification { fn parse_sort_key(key: &str) -> Result { lazy_static! { - static ref RE: RegexSet = RegexSet::new(&[ - r"^01:\S+:\S+$", - r"^02:\d+:\S+$", - r"^\S{3,}:\S+$", - ]).unwrap(); - } + static ref RE: RegexSet = + RegexSet::new(&[r"^01:\S+:\S+$", r"^02:\d+:\S+$", r"^\S{3,}:\S+$",]).unwrap(); + } if !RE.is_match(key) { return Err("Invalid chidmessageid".into()).into(); } @@ -431,7 +428,7 @@ impl DynamoStorage { timestamp: &str, ) -> MyFuture { let ddb = self.ddb.clone(); - let expiry = sec_since_epoch() + MAX_EXPIRY; + let expiry = sec_since_epoch() + 2 * MAX_EXPIRY; let attr_values = hashmap! { ":timestamp".to_string() => val!(N => timestamp), ":expiry".to_string() => val!(N => expiry), @@ -644,7 +641,7 @@ impl DynamoStorage { message_table_name: &str, ) -> MyFuture { let chid = channel_id.hyphenated().to_string(); - let expiry = sec_since_epoch() + MAX_EXPIRY; + let expiry = sec_since_epoch() + 2 * MAX_EXPIRY; let attr_values = hashmap! { ":channel_id".to_string() => val!(SS => vec![chid]), ":expiry".to_string() => val!(N => expiry), @@ -654,7 +651,7 @@ impl DynamoStorage { uaid: s => uaid.simple().to_string(), chidmessageid: s => " ".to_string() }, - update_expression: Some("ADD chids :channel_id, expiry :expiry".to_string()), + update_expression: Some("ADD chids :channel_id SET expiry=:expiry".to_string()), expression_attribute_values: Some(attr_values), table_name: message_table_name.to_string(), ..Default::default() @@ -874,16 +871,10 @@ impl DynamoStorage { Box::new(response) } - pub fn drop_uaid( - &self, - table_name: &str, - uaid: &Uuid, - ) -> MyFuture<()> { + pub fn drop_uaid(&self, table_name: &str, uaid: &Uuid) -> MyFuture<()> { let ddb = self.ddb.clone(); let response = DynamoStorage::drop_user(ddb, uaid, table_name) - .and_then(move |_| -> MyFuture<_> { - Box::new(future::ok(())) - }) + .and_then(move |_| -> MyFuture<_> { Box::new(future::ok(())) }) .chain_err(|| "Unable to drop user record"); Box::new(response) } @@ -914,11 +905,7 @@ impl DynamoStorage { /// sufficient properties for a delete as that is expected to have been done /// before this is called. In the event information is missing, a future::ok /// is returned. - pub fn delete_message( - &self, - table_name: &str, - notif: Notification, - ) -> MyFuture<()> { + pub fn delete_message(&self, table_name: &str, notif: Notification) -> MyFuture<()> { let ddb = self.ddb.clone(); let uaid = if let Some(ref uaid) = notif.uaid { uaid.clone() @@ -929,9 +916,9 @@ impl DynamoStorage { let delete_input = DeleteItemInput { table_name: table_name.to_string(), key: ddb_item! { - uaid: s => uaid, - chidmessageid: s => chidmessageid - }, + uaid: s => uaid, + chidmessageid: s => chidmessageid + }, ..Default::default() }; let response = retry_if( @@ -939,8 +926,7 @@ impl DynamoStorage { |err: &DeleteItemError| { matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)) }, - ) - .and_then(|_| Box::new(future::ok(()))) + ).and_then(|_| Box::new(future::ok(()))) .chain_err(|| "Error deleting notification"); Box::new(response) } diff --git a/autopush_rs/src/util/mod.rs b/autopush_rs/src/util/mod.rs index 7f87e9c7..4656fe1d 100644 --- a/autopush_rs/src/util/mod.rs +++ b/autopush_rs/src/util/mod.rs @@ -2,14 +2,14 @@ use std::io; use std::time::Duration; -use hostname::get_hostname; use futures::future::{Either, Future, IntoFuture}; +use hostname::get_hostname; use slog; +use slog::Drain; use slog_async; -use slog_term; use slog_scope; use slog_stdlog; -use slog::Drain; +use slog_term; use tokio_core::reactor::{Handle, Timeout}; use errors::*; @@ -25,9 +25,9 @@ mod timing; mod user_agent; use self::aws::get_ec2_instance_id; -pub use self::timing::{ms_since_epoch, sec_since_epoch, us_since_epoch}; -pub use self::send_all::MySendAll; pub use self::rc::RcObject; +pub use self::send_all::MySendAll; +pub use self::timing::{ms_since_epoch, sec_since_epoch, us_since_epoch}; pub use self::user_agent::parse_user_agent; /// Convenience future to time out the resolution of `f` provided within the diff --git a/autopush_rs/src/util/rc.rs b/autopush_rs/src/util/rc.rs index 80a5d9d3..ad82524f 100644 --- a/autopush_rs/src/util/rc.rs +++ b/autopush_rs/src/util/rc.rs @@ -1,5 +1,5 @@ -use std::rc::Rc; use std::cell::{RefCell, RefMut}; +use std::rc::Rc; use futures::{Poll, Sink, StartSend, Stream}; diff --git a/autopush_rs/src/util/send_all.rs b/autopush_rs/src/util/send_all.rs index 8aac547a..bc1bdec0 100644 --- a/autopush_rs/src/util/send_all.rs +++ b/autopush_rs/src/util/send_all.rs @@ -1,5 +1,5 @@ -use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::stream::Fuse; +use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; // This is a copy of `Future::forward`, except that it doesn't close the sink // when it's finished.