Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: re-rustfmt it all, set expiry on register instead of add with 2x
Browse files Browse the repository at this point in the history
  • Loading branch information
bbangert committed May 11, 2018
1 parent 9b37dfa commit 35b0c62
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 61 deletions.
2 changes: 1 addition & 1 deletion autopush_rs/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use serde::ser;
use serde_json;

use errors::*;
use rt::{self, AutopushError, UnwindGuard};
use protocol;
use rt::{self, AutopushError, UnwindGuard};
use server::Server;

pub struct AutopushPythonCall {
Expand Down
4 changes: 3 additions & 1 deletion autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,9 @@ where
);
transition!(AwaitMigrateUser { response, data });
} else if all_acked && webpush.flags.reset_uaid {
let response = data.srv.ddb.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid);
let response = data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid);
transition!(AwaitDropUser { response, data });
}
transition!(AwaitInput { data })
Expand Down
4 changes: 2 additions & 2 deletions autopush_rs/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
//! online.
use std::any::Any;
use std::num;
use std::error;
use std::io;
use std::num;

use cadence;
use futures::Future;
use httparse;
use serde_json;
use sentry;
use serde_json;
use tungstenite;
use uuid;

Expand Down
4 changes: 2 additions & 2 deletions autopush_rs/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
//! PUT /push/UAID - Deliver notification to a client
//! PUT /notify/UAID - Tell a client to check storage
use std::str;
use std::rc::Rc;
use std::str;

use futures::future::ok;
use futures::{Future, Stream};
use hyper::{Method, StatusCode};
use hyper;
use hyper::{Method, StatusCode};
use serde_json;
use tokio_service::Service;
use uuid::Uuid;
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,5 @@ mod protocol;
#[macro_use]
pub mod rt;
pub mod call;
pub mod server;
pub mod queue;
pub mod server;
11 changes: 8 additions & 3 deletions autopush_rs/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,14 @@ impl Notification {
if let Some(ref topic) = self.topic {
format!("01:{}:{}", chid, topic)
} else if let Some(sortkey_timestamp) = self.sortkey_timestamp {
format!("02:{}:{}",
if sortkey_timestamp == 0 { ms_since_epoch() } else { sortkey_timestamp },
chid
format!(
"02:{}:{}",
if sortkey_timestamp == 0 {
ms_since_epoch()
} else {
sortkey_timestamp
},
chid
)
} else {
// Legacy messages which we should never get anymore
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! pushing requests over to Python. A `Sender` here is saved off in the
//! `Server` for sending messages.
use std::sync::mpsc;
use std::sync::Mutex;
use std::sync::mpsc;

use call::{AutopushPythonCall, PythonCall};
use rt::{self, AutopushError};
Expand Down
6 changes: 3 additions & 3 deletions autopush_rs/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
//! functions throughout this crate and copy those idioms, otherwise there's
//! documentation on each specific function here.
use std::panic;
use std::ptr;
use std::mem;
use std::any::Any;
use std::cell::Cell;
use std::mem;
use std::panic;
use std::ptr;

/// Generic error which is used on all function calls from Python into Rust.
///
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/server/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use tokio_core::net::TcpStream;
use tokio_io::AsyncRead;

use errors::*;
use server::webpush_io::WebpushIo;
use server::tls::MaybeTlsStream;
use server::webpush_io::WebpushIo;

pub struct Dispatch {
socket: Option<MaybeTlsStream<TcpStream>>,
Expand Down
28 changes: 20 additions & 8 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use tokio_core::net::TcpListener;
use tokio_core::reactor::{Core, Handle, Timeout};
use tokio_io;
use tokio_tungstenite::{accept_hdr_async, WebSocketStream};
use tungstenite::handshake::server::Request;
use tungstenite::Message;
use tungstenite::handshake::server::Request;
use uuid::Uuid;

use client::{Client, RegisteredClient};
Expand All @@ -47,10 +47,10 @@ use rt::{self, AutopushError, UnwindGuard};
use server::dispatch::{Dispatch, RequestType};
use server::metrics::metrics_from_opts;
use server::webpush_io::WebpushIo;
use util::{self, timeout, RcObject};
use util::ddb_helpers::DynamoStorage;
use util::megaphone::{ClientServices, MegaphoneAPIResponse, Service, ServiceChangeTracker,
ServiceClientInit};
use util::{self, timeout, RcObject};

mod dispatch;
mod metrics;
Expand Down Expand Up @@ -189,16 +189,20 @@ pub extern "C" fn autopush_server_new(
router_port: opts.router_port,
statsd_host: to_s(opts.statsd_host).map(|s| s.to_string()),
statsd_port: opts.statsd_port,
message_table_names: to_s(opts.message_table_names).map(|s| s.to_string())
message_table_names: to_s(opts.message_table_names)
.map(|s| s.to_string())
.expect("message table names must be specified")
.split(",")
.map(|s| s.trim().to_string())
.collect(),
router_table_name: to_s(opts.router_table_name).map(|s| s.to_string())
router_table_name: to_s(opts.router_table_name)
.map(|s| s.to_string())
.expect("router table name must be specified"),
router_url: to_s(opts.router_url).map(|s| s.to_string())
router_url: to_s(opts.router_url)
.map(|s| s.to_string())
.expect("router url must be specified"),
endpoint_url: to_s(opts.endpoint_url).map(|s| s.to_string())
endpoint_url: to_s(opts.endpoint_url)
.map(|s| s.to_string())
.expect("endpoint url must be specified"),
ssl_key: to_s(opts.ssl_key).map(PathBuf::from),
ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from),
Expand Down Expand Up @@ -518,10 +522,18 @@ impl Server {
let key_digest = hash::hash(hash::MessageDigest::sha256(), &raw_key)
.chain_err(|| "Error creating message digest for key")?;
base.extend(key_digest.iter());
let encrypted = self.opts.fernet.encrypt(&base).trim_matches('=').to_string();
let encrypted = self.opts
.fernet
.encrypt(&base)
.trim_matches('=')
.to_string();
Ok(format!("{}v2/{}", root, encrypted))
} else {
let encrypted = self.opts.fernet.encrypt(&base).trim_matches('=').to_string();
let encrypted = self.opts
.fernet
.encrypt(&base)
.trim_matches('=')
.to_string();
Ok(format!("{}v1/{}", root, encrypted))
}
}
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/server/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{SslAcceptorExt, SslStream};

use server::{Server, ServerOptions};
use errors::*;
use server::{Server, ServerOptions};

/// Creates an `SslAcceptor`, if needed, ready to accept TLS connections.
///
Expand Down
6 changes: 3 additions & 3 deletions autopush_rs/src/util/autojson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
// }}}
use chrono;
use serde;
use serde_json;
use serde::ser::SerializeMap;
use serde_json;
use slog;
use slog::Record;
use slog::{FnValue, PushFnValue};
use slog::{OwnedKVList, SendSyncRefUnwindSafeKV, KV};
use slog::Record;
use std;
use std::{fmt, io, result};
use std::io::Cursor;
use std::{fmt, io, result};

use std::cell::RefCell;
use std::fmt::Write;
Expand Down
40 changes: 13 additions & 27 deletions autopush_rs/src/util/ddb_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,9 @@ struct RangeKey {
impl DynamoDbNotification {
fn parse_sort_key(key: &str) -> Result<RangeKey> {
lazy_static! {
static ref RE: RegexSet = RegexSet::new(&[
r"^01:\S+:\S+$",
r"^02:\d+:\S+$",
r"^\S{3,}:\S+$",
]).unwrap();
}
static ref RE: RegexSet =
RegexSet::new(&[r"^01:\S+:\S+$", r"^02:\d+:\S+$", r"^\S{3,}:\S+$",]).unwrap();
}
if !RE.is_match(key) {
return Err("Invalid chidmessageid".into()).into();
}
Expand Down Expand Up @@ -431,7 +428,7 @@ impl DynamoStorage {
timestamp: &str,
) -> MyFuture<UpdateItemOutput> {
let ddb = self.ddb.clone();
let expiry = sec_since_epoch() + MAX_EXPIRY;
let expiry = sec_since_epoch() + 2 * MAX_EXPIRY;
let attr_values = hashmap! {
":timestamp".to_string() => val!(N => timestamp),
":expiry".to_string() => val!(N => expiry),
Expand Down Expand Up @@ -644,7 +641,7 @@ impl DynamoStorage {
message_table_name: &str,
) -> MyFuture<UpdateItemOutput> {
let chid = channel_id.hyphenated().to_string();
let expiry = sec_since_epoch() + MAX_EXPIRY;
let expiry = sec_since_epoch() + 2 * MAX_EXPIRY;
let attr_values = hashmap! {
":channel_id".to_string() => val!(SS => vec![chid]),
":expiry".to_string() => val!(N => expiry),
Expand All @@ -654,7 +651,7 @@ impl DynamoStorage {
uaid: s => uaid.simple().to_string(),
chidmessageid: s => " ".to_string()
},
update_expression: Some("ADD chids :channel_id, expiry :expiry".to_string()),
update_expression: Some("ADD chids :channel_id SET expiry=:expiry".to_string()),
expression_attribute_values: Some(attr_values),
table_name: message_table_name.to_string(),
..Default::default()
Expand Down Expand Up @@ -874,16 +871,10 @@ impl DynamoStorage {
Box::new(response)
}

pub fn drop_uaid(
&self,
table_name: &str,
uaid: &Uuid,
) -> MyFuture<()> {
pub fn drop_uaid(&self, table_name: &str, uaid: &Uuid) -> MyFuture<()> {
let ddb = self.ddb.clone();
let response = DynamoStorage::drop_user(ddb, uaid, table_name)
.and_then(move |_| -> MyFuture<_> {
Box::new(future::ok(()))
})
.and_then(move |_| -> MyFuture<_> { Box::new(future::ok(())) })
.chain_err(|| "Unable to drop user record");
Box::new(response)
}
Expand Down Expand Up @@ -914,11 +905,7 @@ impl DynamoStorage {
/// sufficient properties for a delete as that is expected to have been done
/// before this is called. In the event information is missing, a future::ok
/// is returned.
pub fn delete_message(
&self,
table_name: &str,
notif: Notification,
) -> MyFuture<()> {
pub fn delete_message(&self, table_name: &str, notif: Notification) -> MyFuture<()> {
let ddb = self.ddb.clone();
let uaid = if let Some(ref uaid) = notif.uaid {
uaid.clone()
Expand All @@ -929,18 +916,17 @@ impl DynamoStorage {
let delete_input = DeleteItemInput {
table_name: table_name.to_string(),
key: ddb_item! {
uaid: s => uaid,
chidmessageid: s => chidmessageid
},
uaid: s => uaid,
chidmessageid: s => chidmessageid
},
..Default::default()
};
let response = retry_if(
move || ddb.delete_item(&delete_input),
|err: &DeleteItemError| {
matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_))
},
)
.and_then(|_| Box::new(future::ok(())))
).and_then(|_| Box::new(future::ok(())))
.chain_err(|| "Error deleting notification");
Box::new(response)
}
Expand Down
10 changes: 5 additions & 5 deletions autopush_rs/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
use std::io;
use std::time::Duration;

use hostname::get_hostname;
use futures::future::{Either, Future, IntoFuture};
use hostname::get_hostname;
use slog;
use slog::Drain;
use slog_async;
use slog_term;
use slog_scope;
use slog_stdlog;
use slog::Drain;
use slog_term;
use tokio_core::reactor::{Handle, Timeout};

use errors::*;
Expand All @@ -25,9 +25,9 @@ mod timing;
mod user_agent;

use self::aws::get_ec2_instance_id;
pub use self::timing::{ms_since_epoch, sec_since_epoch, us_since_epoch};
pub use self::send_all::MySendAll;
pub use self::rc::RcObject;
pub use self::send_all::MySendAll;
pub use self::timing::{ms_since_epoch, sec_since_epoch, us_since_epoch};
pub use self::user_agent::parse_user_agent;

/// Convenience future to time out the resolution of `f` provided within the
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/util/rc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::rc::Rc;

use futures::{Poll, Sink, StartSend, Stream};

Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/util/send_all.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use futures::stream::Fuse;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};

// This is a copy of `Future::forward`, except that it doesn't close the sink
// when it's finished.
Expand Down

0 comments on commit 35b0c62

Please sign in to comment.