From bd0f37f0206cd2d4f414be95a8e671607582906f Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Mon, 7 May 2018 14:39:57 -0700 Subject: [PATCH] refactor: some cleanup for #1202 --- autopush_rs/src/util/ddb_helpers.rs | 174 +++++++++++++--------------- 1 file changed, 80 insertions(+), 94 deletions(-) diff --git a/autopush_rs/src/util/ddb_helpers.rs b/autopush_rs/src/util/ddb_helpers.rs index c0b011ce..b55cd9c1 100644 --- a/autopush_rs/src/util/ddb_helpers.rs +++ b/autopush_rs/src/util/ddb_helpers.rs @@ -142,7 +142,7 @@ pub struct NotificationHeaders { encoding: Option, } -fn insert_to_map(map: &mut HashMap, val: Option, name: &str) { +fn insert_to_map(map: &mut HashMap, name: &str, val: Option) { if let Some(val) = val { map.insert(name.to_string(), val); } @@ -151,10 +151,10 @@ fn insert_to_map(map: &mut HashMap, val: Option, name: & impl From for HashMap { fn from(val: NotificationHeaders) -> HashMap { let mut map = HashMap::new(); - insert_to_map(&mut map, val.crypto_key, "crypto_key"); - insert_to_map(&mut map, val.encryption, "encryption"); - insert_to_map(&mut map, val.encryption_key, "encryption_key"); - insert_to_map(&mut map, val.encoding, "encoding"); + insert_to_map(&mut map, "crypto_key", val.crypto_key); + insert_to_map(&mut map, "encryption", val.encryption); + insert_to_map(&mut map, "encryption_key", val.encryption_key); + insert_to_map(&mut map, "encoding", val.encoding); map } } @@ -209,45 +209,48 @@ fn parse_sort_key(key: &str) -> Result { if !RE.is_match(key) { return Err("Invalid chidmessageid".into()).into(); } - if key.starts_with("01:") { - let v: Vec<&str> = key.split(":").collect(); - if v.len() != 3 { - return Err("Invalid topic key".into()); + + let v: Vec<&str> = key.split(":").collect(); + match v[0] { + "01" => { + if v.len() != 3 { + return Err("Invalid topic key".into()); + } + let (channel_id, topic) = (v[1], v[2]); + let channel_id = Uuid::parse_str(channel_id)?; + Ok(RangeKey { + channel_id, + topic: Some(topic.to_string()), + sortkey_timestamp: None, + legacy_version: None, + }) } - let (channel_id, topic) = (v[1], v[2]); - let channel_id = Uuid::parse_str(channel_id)?; - Ok(RangeKey { - channel_id, - topic: Some(topic.to_string()), - sortkey_timestamp: None, - legacy_version: None, - }) - } else if key.starts_with("02:") { - let v: Vec<&str> = key.split(":").collect(); - if v.len() != 3 { - return Err("Invalid topic key".into()); + "02" => { + if v.len() != 3 { + return Err("Invalid topic key".into()); + } + let (sortkey, channel_id) = (v[1], v[2]); + let channel_id = Uuid::parse_str(channel_id)?; + Ok(RangeKey { + channel_id, + topic: None, + sortkey_timestamp: Some(sortkey.parse()?), + legacy_version: None, + }) } - let (sortkey, channel_id) = (v[1], v[2]); - let channel_id = Uuid::parse_str(channel_id)?; - Ok(RangeKey { - channel_id, - topic: None, - sortkey_timestamp: Some(sortkey.parse()?), - legacy_version: None, - }) - } else { - let v: Vec<&str> = key.split(":").collect(); - if v.len() != 2 { - return Err("Invalid topic key".into()); + _ => { + if v.len() != 2 { + return Err("Invalid topic key".into()); + } + let (channel_id, legacy_version) = (v[0], v[1]); + let channel_id = Uuid::parse_str(channel_id)?; + Ok(RangeKey { + channel_id, + topic: None, + sortkey_timestamp: None, + legacy_version: Some(legacy_version.to_string()), + }) } - let (channel_id, legacy_version) = (v[0], v[1]); - let channel_id = Uuid::parse_str(channel_id)?; - Ok(RangeKey { - channel_id, - topic: None, - sortkey_timestamp: None, - legacy_version: Some(legacy_version.to_string()), - }) } } @@ -290,7 +293,7 @@ pub struct CheckStorageResponse { #[derive(Default)] pub struct FetchMessageResponse { pub timestamp: Option, - pub messages: Option>, + pub messages: Vec, } pub struct DynamoStorage { @@ -400,45 +403,32 @@ impl DynamoStorage { |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)), ).map_err(|_| "Error fetching messages".into()); let response = response.and_then(|data| { - let mut notifs: Option> = data.items.and_then(|items| { + let mut notifs: Vec = data.items.map_or_else(Vec::new, |items| { debug!("Got response of: {:?}", items); - // TODO: Capture translation errors and report them as we shouldn't have corrupt data - Some( - items - .into_iter() - .inspect(|i| debug!("Item: {:?}", i)) - .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) - .collect(), - ) + // TODO: Capture translation errors and report them as we shouldn't + // have corrupt data + items + .into_iter() + .inspect(|i| debug!("Item: {:?}", i)) + .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) + .collect() }); + if notifs.is_empty() { + return Ok(Default::default()); + } + // Load the current_timestamp from the subscription registry entry which is // the first DynamoDbNotification and remove it from the vec. - let mut timestamp = None; - if let Some(ref mut messages) = notifs { - if messages.len() == 0 { - return Ok(Default::default()); - } - let first = messages.remove(0); - if let Some(ts) = first.current_timestamp { - timestamp = Some(ts); - } - } + let timestamp = notifs.remove(0).current_timestamp; // Convert any remaining DynamoDbNotifications to Notification's - let notifs = notifs.and_then(|items| { - // TODO: Capture translation errors and report them as we shouldn't have corrupt data - let items: Vec = items - .into_iter() - .filter_map(|ddb_notif| ddb_notif.to_notif().ok()) - .collect(); - if items.len() > 0 { - Some(items) - } else { - None - } - }); + // TODO: Capture translation errors and report them as we shouldn't have corrupt data + let messages = notifs + .into_iter() + .filter_map(|ddb_notif| ddb_notif.to_notif().ok()) + .collect(); Ok(FetchMessageResponse { timestamp, - messages: notifs, + messages, }) }); Box::new(response) @@ -489,27 +479,23 @@ impl DynamoStorage { |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)), ).map_err(|_| "Error fetching messages".into()); let response = response.and_then(|data| { - let notifs: Option> = data.items.and_then(|items| { + let messages = data.items.map_or_else(Vec::new, |items| { debug!("Got response of: {:?}", items); // TODO: Capture translation errors and report them as we shouldn't have corrupt data - Some( - items - .into_iter() - .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) - .filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.to_notif().ok()) - .collect(), - ) + items + .into_iter() + .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) + .filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.to_notif().ok()) + .collect() }); - let mut timestamp = None; - if let Some(ref messages) = notifs { - if messages.len() == 0 { - return Ok(Default::default()); - } - timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max(); + if messages.is_empty() { + return Ok(Default::default()); } + + let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max(); Ok(FetchMessageResponse { timestamp, - messages: notifs, + messages, }) }); Box::new(response) @@ -533,11 +519,11 @@ impl DynamoStorage { let ddb2 = self.ddb.clone(); let response = response.and_then(move |resp| { // Return now from this future if we have messages - if let Some(messages) = resp.messages { - debug!("Topic message returns: {:?}", messages); + if !resp.messages.is_empty() { + debug!("Topic message returns: {:?}", resp.messages); return future::Either::A(future::ok(CheckStorageResponse { include_topic: true, - messages, + messages: resp.messages, timestamp: resp.timestamp, })); } @@ -548,7 +534,7 @@ impl DynamoStorage { timestamp }; let next_query = { - if resp.messages.is_none() || resp.timestamp.is_some() { + if resp.messages.is_empty() || resp.timestamp.is_some() { DynamoStorage::fetch_timestamp_messages( ddb2, table_name.as_ref(), @@ -566,7 +552,7 @@ impl DynamoStorage { let timestamp = resp.timestamp.or(timestamp); Ok(CheckStorageResponse { include_topic: false, - messages: resp.messages.unwrap_or(Vec::new()), + messages: resp.messages, timestamp, }) });